blob: 6f1cee955d1f74fc700b047aafb641ab982fe933 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.DoubleGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.FloatGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.NullableNumericGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
/**
* Class that knows how to process a groupBy query on a single {@link StorageAdapter}. It returns a {@link Sequence}
* of {@link ResultRow} objects that are not guaranteed to be in any particular order, and may not even be fully
* grouped. It is expected that a downstream {@link GroupByMergingQueryRunnerV2} will finish grouping these results.
*
* This code runs on data servers, like Historicals.
*
* Used by
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}.
*/
public class GroupByQueryEngineV2
{
private static final GroupByStrategyFactory STRATEGY_FACTORY = new GroupByStrategyFactory();
private static GroupByColumnSelectorPlus[] createGroupBySelectorPlus(
ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] baseSelectorPlus,
int dimensionStart
)
{
GroupByColumnSelectorPlus[] retInfo = new GroupByColumnSelectorPlus[baseSelectorPlus.length];
int curPos = 0;
for (int i = 0; i < retInfo.length; i++) {
retInfo[i] = new GroupByColumnSelectorPlus(baseSelectorPlus[i], curPos, dimensionStart + i);
curPos += retInfo[i].getColumnSelectorStrategy().getGroupingKeySize();
}
return retInfo;
}
private GroupByQueryEngineV2()
{
// No instantiation
}
public static Sequence<ResultRow> process(
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig
)
{
if (storageAdapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
try {
final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded(
query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, null)
);
final DateTime fudgeTimestamp = fudgeTimestampString == null
? null
: DateTimes.utc(Long.parseLong(fudgeTimestampString));
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());
final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
VectorGroupByEngine.canVectorize(query, storageAdapter, filter)
);
final Sequence<ResultRow> result;
if (doVectorize) {
result = VectorGroupByEngine.process(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
filter,
interval,
querySpecificConfig
);
} else {
result = processNonVectorized(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
querySpecificConfig,
filter,
interval
);
}
return result.withBaggage(bufferHolder);
}
catch (Throwable e) {
bufferHolder.close();
throw e;
}
}
private static Sequence<ResultRow> processNonVectorized(
final GroupByQuery query,
final StorageAdapter storageAdapter,
final ByteBuffer processingBuffer,
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
@Nullable final Filter filter,
final Interval interval
)
{
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
filter,
interval,
query.getVirtualColumns(),
query.getGranularity(),
false,
null
);
return cursors.flatMap(
cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ResultRow, GroupByEngineIterator<?>>()
{
@Override
public GroupByEngineIterator make()
{
final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] selectorPlus = DimensionHandlerUtils
.createColumnSelectorPluses(
STRATEGY_FACTORY,
query.getDimensions(),
columnSelectorFactory
);
final GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(
selectorPlus,
query.getResultRowDimensionStart()
);
final int cardinalityForArrayAggregation = getCardinalityForArrayAggregation(
querySpecificConfig,
query,
storageAdapter,
processingBuffer
);
if (cardinalityForArrayAggregation >= 0) {
return new ArrayAggregateIterator(
query,
querySpecificConfig,
cursor,
processingBuffer,
fudgeTimestamp,
dims,
isAllSingleValueDims(columnSelectorFactory, query.getDimensions()),
cardinalityForArrayAggregation
);
} else {
return new HashAggregateIterator(
query,
querySpecificConfig,
cursor,
processingBuffer,
fudgeTimestamp,
dims,
isAllSingleValueDims(columnSelectorFactory, query.getDimensions())
);
}
}
@Override
public void cleanup(GroupByEngineIterator iterFromMake)
{
iterFromMake.close();
}
}
)
);
}
/**
* Returns the cardinality of array needed to do array-based aggregation, or -1 if array-based aggregation
* is impossible.
*/
public static int getCardinalityForArrayAggregation(
GroupByQueryConfig querySpecificConfig,
GroupByQuery query,
StorageAdapter storageAdapter,
ByteBuffer buffer
)
{
if (querySpecificConfig.isForceHashAggregation()) {
return -1;
}
final List<DimensionSpec> dimensions = query.getDimensions();
final ColumnCapabilities columnCapabilities;
final int cardinality;
// Find cardinality
if (dimensions.isEmpty()) {
columnCapabilities = null;
cardinality = 1;
} else if (dimensions.size() == 1) {
// Only real columns can use array-based aggregation, since virtual columns cannot currently report their
// cardinality. We need to check if a virtual column exists with the same name, since virtual columns can shadow
// real columns, and we might miss that since we're going directly to the StorageAdapter (which only knows about
// real columns).
if (query.getVirtualColumns().exists(Iterables.getOnlyElement(dimensions).getDimension())) {
return -1;
}
final String columnName = Iterables.getOnlyElement(dimensions).getDimension();
columnCapabilities = storageAdapter.getColumnCapabilities(columnName);
cardinality = storageAdapter.getDimensionCardinality(columnName);
} else {
// Cannot use array-based aggregation with more than one dimension.
return -1;
}
// Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality
if (columnCapabilities != null && columnCapabilities.getType().equals(ValueType.STRING) && cardinality > 0) {
final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs().toArray(new AggregatorFactory[0]);
final long requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity(
cardinality,
aggregatorFactories
);
// Check that all keys and aggregated values can be contained in the buffer
if (requiredBufferCapacity < 0 || requiredBufferCapacity > buffer.capacity()) {
return -1;
} else {
return cardinality;
}
} else {
return -1;
}
}
/**
* Checks whether all "dimensions" are either single-valued, or if allowed, nonexistent. Since non-existent column
* selectors will show up as full of nulls they are effectively single valued, however they can also be null during
* broker merge, for example with an 'inline' datasource subquery.
*/
public static boolean isAllSingleValueDims(
final ColumnInspector inspector,
final List<DimensionSpec> dimensions
)
{
return dimensions
.stream()
.allMatch(
dimension -> {
if (dimension.mustDecorate()) {
// DimensionSpecs that decorate may turn singly-valued columns into multi-valued selectors.
// To be safe, we must return false here.
return false;
}
// Now check column capabilities, which must be present and explicitly not multi-valued
final ColumnCapabilities columnCapabilities = inspector.getColumnCapabilities(dimension.getDimension());
return columnCapabilities != null && columnCapabilities.hasMultipleValues().isFalse();
});
}
public static void convertRowTypesToOutputTypes(
final List<DimensionSpec> dimensionSpecs,
final ResultRow resultRow,
final int resultRowDimensionStart
)
{
for (int i = 0; i < dimensionSpecs.size(); i++) {
DimensionSpec dimSpec = dimensionSpecs.get(i);
final int resultRowIndex = resultRowDimensionStart + i;
final ValueType outputType = dimSpec.getOutputType();
resultRow.set(
resultRowIndex,
DimensionHandlerUtils.convertObjectToType(resultRow.get(resultRowIndex), outputType)
);
}
}
/**
* check if a column will operate correctly with {@link LimitedBufferHashGrouper} for query limit pushdown
*/
public static boolean canPushDownLimit(ColumnSelectorFactory columnSelectorFactory, String columnName)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName);
if (capabilities != null) {
// strings can be pushed down if dictionaries are sorted and unique per id
if (capabilities.getType() == ValueType.STRING) {
return capabilities.areDictionaryValuesSorted().and(capabilities.areDictionaryValuesUnique()).isTrue();
}
// party on
return true;
}
// we don't know what we don't know, don't assume otherwise
return false;
}
private static class GroupByStrategyFactory
implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy>
{
@Override
public GroupByColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities,
ColumnValueSelector selector
)
{
ValueType type = capabilities.getType();
switch (type) {
case STRING:
DimensionSelector dimSelector = (DimensionSelector) selector;
if (dimSelector.getValueCardinality() >= 0) {
return new StringGroupByColumnSelectorStrategy(dimSelector::lookupName, capabilities);
} else {
return new DictionaryBuildingStringGroupByColumnSelectorStrategy();
}
case LONG:
return makeNullableNumericStrategy(new LongGroupByColumnSelectorStrategy());
case FLOAT:
return makeNullableNumericStrategy(new FloatGroupByColumnSelectorStrategy());
case DOUBLE:
return makeNullableNumericStrategy(new DoubleGroupByColumnSelectorStrategy());
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
private GroupByColumnSelectorStrategy makeNullableNumericStrategy(GroupByColumnSelectorStrategy delegate)
{
if (NullHandling.sqlCompatible()) {
return new NullableNumericGroupByColumnSelectorStrategy(delegate);
} else {
return delegate;
}
}
}
private abstract static class GroupByEngineIterator<KeyType> implements Iterator<ResultRow>, Closeable
{
protected final GroupByQuery query;
protected final GroupByQueryConfig querySpecificConfig;
protected final Cursor cursor;
protected final ByteBuffer buffer;
protected final Grouper.KeySerde<ByteBuffer> keySerde;
protected final GroupByColumnSelectorPlus[] dims;
protected final DateTime timestamp;
@Nullable
protected CloseableGrouperIterator<KeyType, ResultRow> delegate = null;
protected final boolean allSingleValueDims;
public GroupByEngineIterator(
final GroupByQuery query,
final GroupByQueryConfig querySpecificConfig,
final Cursor cursor,
final ByteBuffer buffer,
@Nullable final DateTime fudgeTimestamp,
final GroupByColumnSelectorPlus[] dims,
final boolean allSingleValueDims
)
{
this.query = query;
this.querySpecificConfig = querySpecificConfig;
this.cursor = cursor;
this.buffer = buffer;
this.keySerde = new GroupByEngineKeySerde(dims, query);
this.dims = dims;
// Time is the same for every row in the cursor
this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime();
this.allSingleValueDims = allSingleValueDims;
}
private CloseableGrouperIterator<KeyType, ResultRow> initNewDelegate()
{
final Grouper<KeyType> grouper = newGrouper();
grouper.init();
if (allSingleValueDims) {
aggregateSingleValueDims(grouper);
} else {
aggregateMultiValueDims(grouper);
}
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
final int resultRowDimensionStart = query.getResultRowDimensionStart();
final int resultRowAggregatorStart = query.getResultRowAggregatorStart();
return new CloseableGrouperIterator<>(
grouper.iterator(false),
entry -> {
final ResultRow resultRow = ResultRow.create(query.getResultRowSizeWithoutPostAggregators());
// Add timestamp, if necessary.
if (resultRowHasTimestamp) {
resultRow.set(0, timestamp.getMillis());
}
// Add dimensions, and convert their types if necessary.
putToRow(entry.getKey(), resultRow);
convertRowTypesToOutputTypes(query.getDimensions(), resultRow, resultRowDimensionStart);
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
resultRow.set(resultRowAggregatorStart + i, entry.getValues()[i]);
}
return resultRow;
},
grouper
);
}
@Override
public ResultRow next()
{
if (delegate == null || !delegate.hasNext()) {
throw new NoSuchElementException();
}
return delegate.next();
}
@Override
public boolean hasNext()
{
if (delegate != null && delegate.hasNext()) {
return true;
} else {
if (!cursor.isDone()) {
if (delegate != null) {
delegate.close();
}
delegate = initNewDelegate();
return delegate.hasNext();
} else {
return false;
}
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
if (delegate != null) {
delegate.close();
}
}
/**
* Create a new grouper.
*/
protected abstract Grouper<KeyType> newGrouper();
/**
* Grouping dimensions are all single-valued, and thus the given grouper don't have to worry about multi-valued
* dimensions.
*/
protected abstract void aggregateSingleValueDims(Grouper<KeyType> grouper);
/**
* Grouping dimensions can be multi-valued, and thus the given grouper should handle them properly during
* aggregation.
*/
protected abstract void aggregateMultiValueDims(Grouper<KeyType> grouper);
/**
* Add the key to the result row. Some pre-processing like deserialization might be done for the key before
* adding to the map.
*/
protected abstract void putToRow(KeyType key, ResultRow resultRow);
protected int getSingleValue(IndexedInts indexedInts)
{
Preconditions.checkArgument(indexedInts.size() < 2, "should be single value");
return indexedInts.size() == 1 ? indexedInts.get(0) : GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE;
}
}
private static class HashAggregateIterator extends GroupByEngineIterator<ByteBuffer>
{
private static final Logger LOGGER = new Logger(HashAggregateIterator.class);
private final int[] stack;
private final Object[] valuess;
private final ByteBuffer keyBuffer;
private int stackPointer = Integer.MIN_VALUE;
protected boolean currentRowWasPartiallyAggregated = false;
public HashAggregateIterator(
GroupByQuery query,
GroupByQueryConfig querySpecificConfig,
Cursor cursor,
ByteBuffer buffer,
@Nullable DateTime fudgeTimestamp,
GroupByColumnSelectorPlus[] dims,
boolean allSingleValueDims
)
{
super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
final int dimCount = query.getDimensions().size();
stack = new int[dimCount];
valuess = new Object[dimCount];
keyBuffer = ByteBuffer.allocate(keySerde.keySize());
}
@Override
protected Grouper<ByteBuffer> newGrouper()
{
Grouper<ByteBuffer> grouper = null;
final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory();
final DefaultLimitSpec limitSpec = query.isApplyLimitPushDown() &&
querySpecificConfig.isApplyLimitPushDownToSegment() ?
(DefaultLimitSpec) query.getLimitSpec() : null;
final boolean canDoLimitPushdown;
if (limitSpec != null) {
// there is perhaps a more graceful way this could be handled a bit more selectively, but for now just avoid
// pushdown if it will prove problematic by checking grouping and ordering columns
canDoLimitPushdown = Stream.concat(
query.getDimensions().stream().map(DimensionSpec::getDimension),
limitSpec.getColumns().stream().map(OrderByColumnSpec::getDimension)
).allMatch(col -> GroupByQueryEngineV2.canPushDownLimit(selectorFactory, col));
} else {
canDoLimitPushdown = false;
}
if (canDoLimitPushdown) {
// Sanity check; must not have "offset" at this point.
Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
LimitedBufferHashGrouper<ByteBuffer> limitGrouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
AggregatorAdapters.factorizeBuffered(
selectorFactory,
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
limitSpec.getLimit(),
DefaultLimitSpec.sortingOrderHasNonGroupingFields(
limitSpec,
query.getDimensions()
)
);
if (limitGrouper.validateBufferCapacity(buffer.capacity())) {
grouper = limitGrouper;
} else {
LOGGER.warn(
"Limit is not applied in segment scan phase due to limited buffer capacity for query [%s].",
query.getId()
);
}
}
if (grouper == null) {
grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
AggregatorAdapters.factorizeBuffered(
selectorFactory,
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
true
);
}
return grouper;
}
@Override
protected void aggregateSingleValueDims(Grouper<ByteBuffer> grouper)
{
while (!cursor.isDone()) {
for (GroupByColumnSelectorPlus dim : dims) {
final GroupByColumnSelectorStrategy strategy = dim.getColumnSelectorStrategy();
strategy.writeToKeyBuffer(
dim.getKeyBufferPosition(),
strategy.getOnlyValue(dim.getSelector()),
keyBuffer
);
}
keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer).isOk()) {
return;
}
cursor.advance();
}
}
@Override
protected void aggregateMultiValueDims(Grouper<ByteBuffer> grouper)
{
while (!cursor.isDone()) {
if (!currentRowWasPartiallyAggregated) {
// Set up stack, valuess, and first grouping in keyBuffer for this row
stackPointer = stack.length - 1;
for (int i = 0; i < dims.length; i++) {
GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
strategy.initColumnValues(
dims[i].getSelector(),
i,
valuess
);
strategy.initGroupingKeyColumnValue(
dims[i].getKeyBufferPosition(),
i,
valuess[i],
keyBuffer,
stack
);
}
}
// Aggregate groupings for this row
boolean doAggregate = true;
while (stackPointer >= -1) {
// Aggregate additional grouping for this row
if (doAggregate) {
keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer).isOk()) {
// Buffer full while aggregating; break out and resume later
currentRowWasPartiallyAggregated = true;
return;
}
doAggregate = false;
}
if (stackPointer >= 0) {
doAggregate = dims[stackPointer].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey(
dims[stackPointer].getKeyBufferPosition(),
valuess[stackPointer],
stack[stackPointer],
keyBuffer
);
if (doAggregate) {
stack[stackPointer]++;
for (int i = stackPointer + 1; i < stack.length; i++) {
dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue(
dims[i].getKeyBufferPosition(),
i,
valuess[i],
keyBuffer,
stack
);
}
stackPointer = stack.length - 1;
} else {
stackPointer--;
}
} else {
stackPointer--;
}
}
// Advance to next row
cursor.advance();
currentRowWasPartiallyAggregated = false;
}
}
@Override
protected void putToRow(ByteBuffer key, ResultRow resultRow)
{
for (GroupByColumnSelectorPlus selectorPlus : dims) {
selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey(
selectorPlus,
key,
resultRow,
selectorPlus.getKeyBufferPosition()
);
}
}
}
private static class ArrayAggregateIterator extends GroupByEngineIterator<Integer>
{
private final int cardinality;
@Nullable
private final GroupByColumnSelectorPlus dim;
@Nullable
private IndexedInts multiValues;
private int nextValIndex;
public ArrayAggregateIterator(
GroupByQuery query,
GroupByQueryConfig querySpecificConfig,
Cursor cursor,
ByteBuffer buffer,
@Nullable DateTime fudgeTimestamp,
GroupByColumnSelectorPlus[] dims,
boolean allSingleValueDims,
int cardinality
)
{
super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
this.cardinality = cardinality;
if (dims.length == 1) {
this.dim = dims[0];
} else if (dims.length == 0) {
this.dim = null;
} else {
throw new IAE("Group key should be a single dimension");
}
}
@Override
protected IntGrouper newGrouper()
{
return new BufferArrayGrouper(
Suppliers.ofInstance(buffer),
AggregatorAdapters.factorizeBuffered(cursor.getColumnSelectorFactory(), query.getAggregatorSpecs()),
cardinality
);
}
@Override
protected void aggregateSingleValueDims(Grouper<Integer> grouper)
{
aggregateSingleValueDims((IntGrouper) grouper);
}
@Override
protected void aggregateMultiValueDims(Grouper<Integer> grouper)
{
aggregateMultiValueDims((IntGrouper) grouper);
}
private void aggregateSingleValueDims(IntGrouper grouper)
{
while (!cursor.isDone()) {
final int key;
if (dim != null) {
// dim is always an indexed string dimension
final IndexedInts indexedInts = ((DimensionSelector) dim.getSelector()).getRow();
key = getSingleValue(indexedInts);
} else {
key = 0;
}
if (!grouper.aggregate(key).isOk()) {
return;
}
cursor.advance();
}
}
private void aggregateMultiValueDims(IntGrouper grouper)
{
if (dim == null) {
throw new ISE("dim must exist");
}
if (multiValues == null) {
// dim is always an indexed string dimension
multiValues = ((DimensionSelector) dim.getSelector()).getRow();
nextValIndex = 0;
}
while (!cursor.isDone()) {
int multiValuesSize = multiValues.size();
if (multiValuesSize == 0) {
if (!grouper.aggregate(GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE).isOk()) {
return;
}
} else {
for (; nextValIndex < multiValuesSize; nextValIndex++) {
if (!grouper.aggregate(multiValues.get(nextValIndex)).isOk()) {
return;
}
}
}
cursor.advance();
if (!cursor.isDone()) {
// dim is always an indexed string dimension
multiValues = ((DimensionSelector) dim.getSelector()).getRow();
nextValIndex = multiValues.size() == 0 ? -1 : 0;
}
}
}
@Override
protected void putToRow(Integer key, ResultRow resultRow)
{
if (dim != null) {
if (key != GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE) {
resultRow.set(dim.getResultRowPosition(), ((DimensionSelector) dim.getSelector()).lookupName(key));
} else {
resultRow.set(dim.getResultRowPosition(), NullHandling.defaultStringValue());
}
}
}
}
private static class GroupByEngineKeySerde implements Grouper.KeySerde<ByteBuffer>
{
private final int keySize;
private final GroupByColumnSelectorPlus[] dims;
private final GroupByQuery query;
public GroupByEngineKeySerde(final GroupByColumnSelectorPlus[] dims, GroupByQuery query)
{
this.dims = dims;
int keySize = 0;
for (GroupByColumnSelectorPlus selectorPlus : dims) {
keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize();
}
this.keySize = keySize;
this.query = query;
}
@Override
public int keySize()
{
return keySize;
}
@Override
public Class<ByteBuffer> keyClazz()
{
return ByteBuffer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(ByteBuffer key)
{
return key;
}
@Override
public ByteBuffer fromByteBuffer(ByteBuffer buffer, int position)
{
final ByteBuffer dup = buffer.duplicate();
dup.position(position).limit(position + keySize);
return dup.slice();
}
@Override
public Grouper.BufferComparator bufferComparator()
{
Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down");
DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec();
return GrouperBufferComparatorUtils.bufferComparator(
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst(),
query.getDimensions().size(),
getDimensionComparators(limitSpec)
);
}
@Override
public Grouper.BufferComparator bufferComparatorWithAggregators(
AggregatorFactory[] aggregatorFactories,
int[] aggregatorOffsets
)
{
Preconditions.checkState(query.isApplyLimitPushDown(), "no limit push down");
DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec();
return GrouperBufferComparatorUtils.bufferComparatorWithAggregators(
query.getAggregatorSpecs().toArray(new AggregatorFactory[0]),
aggregatorOffsets,
limitSpec,
query.getDimensions(),
getDimensionComparators(limitSpec),
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst(),
keySize
);
}
private Grouper.BufferComparator[] getDimensionComparators(DefaultLimitSpec limitSpec)
{
Grouper.BufferComparator[] dimComparators = new Grouper.BufferComparator[dims.length];
for (int i = 0; i < dims.length; i++) {
final String dimName = query.getDimensions().get(i).getOutputName();
StringComparator stringComparator = DefaultLimitSpec.getComparatorForDimName(limitSpec, dimName);
dimComparators[i] = dims[i].getColumnSelectorStrategy().bufferComparator(
dims[i].getKeyBufferPosition(),
stringComparator
);
}
return dimComparators;
}
@Override
public void reset()
{
// No state, nothing to reset
}
}
}