blob: f516ea5b63ba644cb95b6609202ac5daa57526e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae.vector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorCursor;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;
import java.util.stream.Collectors;
public class VectorGroupByEngine
{
private VectorGroupByEngine()
{
// No instantiation.
}
public static boolean canVectorize(
final GroupByQuery query,
final StorageAdapter adapter,
@Nullable final Filter filter
)
{
Function<String, ColumnCapabilities> capabilitiesFunction = name ->
query.getVirtualColumns().getColumnCapabilitiesWithFallback(adapter, name);
return canVectorizeDimensions(capabilitiesFunction, query.getDimensions())
&& query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
&& VirtualColumns.shouldVectorize(query, query.getVirtualColumns(), adapter)
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
}
public static boolean canVectorizeDimensions(
final Function<String, ColumnCapabilities> capabilitiesFunction,
final List<DimensionSpec> dimensions
)
{
return dimensions
.stream()
.allMatch(
dimension -> {
if (dimension.mustDecorate()) {
// group by on multi value dimensions are not currently supported
// DimensionSpecs that decorate may turn singly-valued columns into multi-valued selectors.
// To be safe, we must return false here.
return false;
}
// Now check column capabilities.
final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension());
// null here currently means the column does not exist, nil columns can be vectorized
if (columnCapabilities == null) {
return true;
}
// strings must be single valued, dictionary encoded, and have unique dictionary entries
if (ValueType.STRING.equals(columnCapabilities.getType())) {
return columnCapabilities.hasMultipleValues().isFalse() &&
columnCapabilities.isDictionaryEncoded().isTrue() &&
columnCapabilities.areDictionaryValuesUnique().isTrue();
}
return columnCapabilities.hasMultipleValues().isFalse();
});
}
public static Sequence<ResultRow> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
final ByteBuffer processingBuffer,
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config
)
{
if (!canVectorize(query, storageAdapter, filter)) {
throw new ISE("Cannot vectorize");
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ResultRow, CloseableIterator<ResultRow>>()
{
@Override
public CloseableIterator<ResultRow> make()
{
final VectorCursor cursor = storageAdapter.makeVectorCursor(
Filters.toFilter(query.getDimFilter()),
interval,
query.getVirtualColumns(),
false,
QueryContexts.getVectorSize(query),
null
);
if (cursor == null) {
// Return empty iterator.
return new CloseableIterator<ResultRow>()
{
@Override
public boolean hasNext()
{
return false;
}
@Override
public ResultRow next()
{
throw new NoSuchElementException();
}
@Override
public void close()
{
// Nothing to do.
}
};
}
try {
final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
dimensionSpec ->
DimensionHandlerUtils.makeVectorProcessor(
dimensionSpec,
GroupByVectorColumnProcessorFactory.instance(),
columnSelectorFactory
)
).collect(Collectors.toList());
return new VectorGroupByEngineIterator(
query,
config,
storageAdapter,
cursor,
interval,
dimensions,
processingBuffer,
fudgeTimestamp
);
}
catch (Throwable e) {
try {
cursor.close();
}
catch (Throwable e2) {
e.addSuppressed(e2);
}
throw e;
}
}
@Override
public void cleanup(CloseableIterator<ResultRow> iterFromMake)
{
try {
iterFromMake.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
);
}
@VisibleForTesting
static class VectorGroupByEngineIterator implements CloseableIterator<ResultRow>
{
private final GroupByQuery query;
private final GroupByQueryConfig querySpecificConfig;
private final StorageAdapter storageAdapter;
private final VectorCursor cursor;
private final List<GroupByVectorColumnSelector> selectors;
private final ByteBuffer processingBuffer;
private final DateTime fudgeTimestamp;
private final int keySize;
private final WritableMemory keySpace;
private final VectorGrouper vectorGrouper;
@Nullable
private final VectorCursorGranularizer granulizer;
// Granularity-bucket iterator and current bucket.
private final Iterator<Interval> bucketIterator;
@Nullable
private Interval bucketInterval;
private int partiallyAggregatedRows = -1;
@Nullable
private CloseableGrouperIterator<Memory, ResultRow> delegate = null;
VectorGroupByEngineIterator(
final GroupByQuery query,
final GroupByQueryConfig config,
final StorageAdapter storageAdapter,
final VectorCursor cursor,
final Interval queryInterval,
final List<GroupByVectorColumnSelector> selectors,
final ByteBuffer processingBuffer,
@Nullable final DateTime fudgeTimestamp
)
{
this.query = query;
this.querySpecificConfig = config;
this.storageAdapter = storageAdapter;
this.cursor = cursor;
this.selectors = selectors;
this.processingBuffer = processingBuffer;
this.fudgeTimestamp = fudgeTimestamp;
this.keySize = selectors.stream().mapToInt(GroupByVectorColumnSelector::getGroupingKeySize).sum();
this.keySpace = WritableMemory.allocate(keySize * cursor.getMaxVectorSize());
this.vectorGrouper = makeGrouper();
this.granulizer = VectorCursorGranularizer.create(storageAdapter, cursor, query.getGranularity(), queryInterval);
if (granulizer != null) {
this.bucketIterator = granulizer.getBucketIterable().iterator();
} else {
this.bucketIterator = Collections.emptyIterator();
}
this.bucketInterval = this.bucketIterator.hasNext() ? this.bucketIterator.next() : null;
}
@Override
public ResultRow next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
return delegate.next();
}
@Override
public boolean hasNext()
{
if (delegate != null && delegate.hasNext()) {
return true;
} else {
final boolean moreToRead = !cursor.isDone() || partiallyAggregatedRows >= 0;
if (bucketInterval != null && moreToRead) {
while (delegate == null || !delegate.hasNext()) {
if (delegate != null) {
delegate.close();
vectorGrouper.reset();
}
delegate = initNewDelegate();
}
return true;
} else {
return false;
}
}
}
@Override
public void close() throws IOException
{
Closer closer = Closer.create();
closer.register(vectorGrouper);
if (delegate != null) {
closer.register(delegate);
}
closer.register(cursor);
closer.close();
}
@VisibleForTesting
VectorGrouper makeGrouper()
{
final VectorGrouper grouper;
final int cardinalityForArrayAggregation = GroupByQueryEngineV2.getCardinalityForArrayAggregation(
querySpecificConfig,
query,
storageAdapter,
processingBuffer
);
if (cardinalityForArrayAggregation >= 0) {
grouper = new BufferArrayGrouper(
Suppliers.ofInstance(processingBuffer),
AggregatorAdapters.factorizeVector(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
cardinalityForArrayAggregation
);
} else {
grouper = new HashVectorGrouper(
Suppliers.ofInstance(processingBuffer),
keySize,
AggregatorAdapters.factorizeVector(
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets()
);
}
grouper.initVectorized(cursor.getMaxVectorSize());
return grouper;
}
private CloseableGrouperIterator<Memory, ResultRow> initNewDelegate()
{
// Method must not be called unless there's a current bucketInterval.
assert bucketInterval != null;
final DateTime timestamp = fudgeTimestamp != null
? fudgeTimestamp
: query.getGranularity().toDateTime(bucketInterval.getStartMillis());
while (!cursor.isDone()) {
final int startOffset;
if (partiallyAggregatedRows < 0) {
granulizer.setCurrentOffsets(bucketInterval);
startOffset = granulizer.getStartOffset();
} else {
startOffset = granulizer.getStartOffset() + partiallyAggregatedRows;
}
if (granulizer.getEndOffset() > startOffset) {
// Write keys to the keySpace.
int keyOffset = 0;
for (final GroupByVectorColumnSelector selector : selectors) {
selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset());
keyOffset += selector.getGroupingKeySize();
}
// Aggregate this vector.
final AggregateResult result = vectorGrouper.aggregateVector(
keySpace,
startOffset,
granulizer.getEndOffset()
);
if (result.isOk()) {
partiallyAggregatedRows = -1;
} else {
if (partiallyAggregatedRows < 0) {
partiallyAggregatedRows = result.getCount();
} else {
partiallyAggregatedRows += result.getCount();
}
}
} else {
partiallyAggregatedRows = -1;
}
if (partiallyAggregatedRows >= 0) {
break;
} else if (!granulizer.advanceCursorWithinBucket()) {
// Advance bucketInterval.
bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null;
break;
}
}
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
final int resultRowDimensionStart = query.getResultRowDimensionStart();
final int resultRowAggregatorStart = query.getResultRowAggregatorStart();
return new CloseableGrouperIterator<>(
vectorGrouper.iterator(),
entry -> {
final ResultRow resultRow = ResultRow.create(query.getResultRowSizeWithoutPostAggregators());
// Add timestamp, if necessary.
if (resultRowHasTimestamp) {
resultRow.set(0, timestamp.getMillis());
}
// Add dimensions.
int keyOffset = 0;
for (int i = 0; i < selectors.size(); i++) {
final GroupByVectorColumnSelector selector = selectors.get(i);
selector.writeKeyToResultRow(
entry.getKey(),
keyOffset,
resultRow,
resultRowDimensionStart + i
);
keyOffset += selector.getGroupingKeySize();
}
// Convert dimension values to desired output types, possibly.
GroupByQueryEngineV2.convertRowTypesToOutputTypes(
query.getDimensions(),
resultRow,
resultRowDimensionStart
);
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
resultRow.set(resultRowAggregatorStart + i, entry.getValues()[i]);
}
return resultRow;
},
() -> {} // Grouper will be closed when VectorGroupByEngineIterator is closed.
);
}
}
}