blob: 3a34b76c874a2dfa6986ffac840d0eb02c051590 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
/**
*/
public class GroupByQueryEngine
{
private static final int MISSING_VALUE = -1;
private final Supplier<GroupByQueryConfig> config;
private final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool;
@Inject
public GroupByQueryEngine(
Supplier<GroupByQueryConfig> config,
@Global NonBlockingPool<ByteBuffer> intermediateResultsBufferPool
)
{
this.config = config;
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}
public Sequence<Row> process(final GroupByQuery query, final StorageAdapter storageAdapter)
{
if (storageAdapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
}
Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimFilter()));
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
filter,
intervals.get(0),
query.getVirtualColumns(),
query.getGranularity(),
false,
null
);
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
return Sequences.concat(
Sequences.withBaggage(
Sequences.map(
cursors,
new Function<Cursor, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(final Cursor cursor)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, RowIterator>()
{
@Override
public RowIterator make()
{
return new RowIterator(query, cursor, bufferHolder.get(), config.get());
}
@Override
public void cleanup(RowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
}
);
}
}
),
new Closeable()
{
@Override
public void close()
{
CloseQuietly.close(bufferHolder);
}
}
)
);
}
private static class RowUpdater
{
private final ByteBuffer metricValues;
private final BufferAggregator[] aggregators;
private final PositionMaintainer positionMaintainer;
private final Map<ByteBuffer, Integer> positions = new TreeMap<>();
// GroupBy queries tend to do a lot of reads from this. We co-store a hash map to make those reads go faster.
private final Map<ByteBuffer, Integer> positionsHash = new HashMap<>();
public RowUpdater(
ByteBuffer metricValues,
BufferAggregator[] aggregators,
PositionMaintainer positionMaintainer
)
{
this.metricValues = metricValues;
this.aggregators = aggregators;
this.positionMaintainer = positionMaintainer;
}
public int getNumRows()
{
return positions.size();
}
public Map<ByteBuffer, Integer> getPositions()
{
return positions;
}
@Nullable
private List<ByteBuffer> updateValues(ByteBuffer key, List<DimensionSelector> dims)
{
if (dims.size() > 0) {
final DimensionSelector dimSelector = dims.get(0);
final IndexedInts row = dimSelector.getRow();
final int rowSize = row.size();
if (rowSize == 0) {
ByteBuffer newKey = key.duplicate();
newKey.putInt(MISSING_VALUE);
return updateValues(newKey, dims.subList(1, dims.size()));
} else {
List<ByteBuffer> retVal = null;
for (int i = 0; i < rowSize; i++) {
ByteBuffer newKey = key.duplicate();
int dimValue = row.get(i);
newKey.putInt(dimValue);
List<ByteBuffer> unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
if (unaggregatedBuffers != null) {
if (retVal == null) {
retVal = new ArrayList<>();
}
retVal.addAll(unaggregatedBuffers);
}
}
return retVal;
}
} else {
key.clear();
Integer position = positionsHash.get(key);
int[] increments = positionMaintainer.getIncrements();
int thePosition;
if (position == null) {
ByteBuffer keyCopy = ByteBuffer.allocate(key.limit());
keyCopy.put(key.asReadOnlyBuffer());
keyCopy.clear();
position = positionMaintainer.getNext();
if (position == null) {
return Collections.singletonList(keyCopy);
}
positions.put(keyCopy, position);
positionsHash.put(keyCopy, position);
thePosition = position;
for (int i = 0; i < aggregators.length; ++i) {
aggregators[i].init(metricValues, thePosition);
thePosition += increments[i];
}
}
thePosition = position;
for (int i = 0; i < aggregators.length; ++i) {
aggregators[i].aggregate(metricValues, thePosition);
thePosition += increments[i];
}
return null;
}
}
}
private static class PositionMaintainer
{
private final int[] increments;
private final int increment;
private final int max;
private long nextVal;
public PositionMaintainer(
int start,
int[] increments,
int max
)
{
this.nextVal = (long) start;
this.increments = increments;
int theIncrement = 0;
for (int inc : increments) {
theIncrement += inc;
}
increment = theIncrement;
this.max = max - increment; // Make sure there is enough room for one more increment
}
@Nullable
public Integer getNext()
{
if (nextVal > max) {
return null;
} else {
int retVal = (int) nextVal;
nextVal += increment;
return retVal;
}
}
public int getIncrement()
{
return increment;
}
public int[] getIncrements()
{
return increments;
}
}
private static class RowIterator implements CloseableIterator<Row>
{
private final GroupByQuery query;
private final Cursor cursor;
private final ByteBuffer metricsBuffer;
private final int maxIntermediateRows;
private final List<DimensionSelector> dimensions;
private final ArrayList<String> dimNames;
private final BufferAggregator[] aggregators;
private final String[] metricNames;
private final int[] sizesRequired;
@Nullable
private List<ByteBuffer> unprocessedKeys;
private Iterator<Row> delegate;
public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config)
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
this.query = query;
this.cursor = cursor;
this.metricsBuffer = metricsBuffer;
this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows();
unprocessedKeys = null;
delegate = Collections.emptyIterator();
List<DimensionSpec> dimensionSpecs = query.getDimensions();
dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
for (final DimensionSpec dimSpec : dimensionSpecs) {
if (dimSpec.getOutputType() != ValueType.STRING) {
throw new UnsupportedOperationException(
"GroupBy v1 only supports dimensions with an outputType of STRING."
);
}
final DimensionSelector selector = cursor.getColumnSelectorFactory().makeDimensionSelector(dimSpec);
if (selector.getValueCardinality() == DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
throw new UnsupportedOperationException(
"GroupBy v1 does not support dimension selectors with unknown cardinality.");
}
dimensions.add(selector);
dimNames.add(dimSpec.getOutputName());
}
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
aggregators = new BufferAggregator[aggregatorSpecs.size()];
metricNames = new String[aggregatorSpecs.size()];
sizesRequired = new int[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); ++i) {
AggregatorFactory aggregatorSpec = aggregatorSpecs.get(i);
aggregators[i] = aggregatorSpec.factorizeBuffered(cursor.getColumnSelectorFactory());
metricNames[i] = aggregatorSpec.getName();
sizesRequired[i] = aggregatorSpec.getMaxIntermediateSizeWithNulls();
}
}
@Override
public boolean hasNext()
{
return delegate.hasNext() || !cursor.isDone();
}
@Override
public Row next()
{
if (delegate.hasNext()) {
return delegate.next();
}
if (unprocessedKeys == null && cursor.isDone()) {
throw new NoSuchElementException();
}
final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining());
final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer);
if (unprocessedKeys != null) {
for (ByteBuffer key : unprocessedKeys) {
final List<ByteBuffer> unprocUnproc = rowUpdater.updateValues(key, ImmutableList.of());
if (unprocUnproc != null) {
throw new ISE("Not enough memory to process the request.");
}
}
cursor.advance();
}
while (!cursor.isDone() && rowUpdater.getNumRows() < maxIntermediateRows) {
ByteBuffer key = ByteBuffer.allocate(dimensions.size() * Integer.BYTES);
unprocessedKeys = rowUpdater.updateValues(key, dimensions);
if (unprocessedKeys != null) {
break;
}
cursor.advance();
}
if (rowUpdater.getPositions().isEmpty() && unprocessedKeys != null) {
throw new ISE(
"Not enough memory to process even a single item. Required [%,d] memory, but only have[%,d]",
positionMaintainer.getIncrement(), metricsBuffer.remaining()
);
}
delegate = FunctionalIterator
.create(rowUpdater.getPositions().entrySet().iterator())
.transform(
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
{
private final DateTime timestamp = cursor.getTime();
private final int[] increments = positionMaintainer.getIncrements();
@Override
public Row apply(@Nullable Map.Entry<ByteBuffer, Integer> input)
{
Map<String, Object> theEvent = Maps.newLinkedHashMap();
ByteBuffer keyBuffer = input.getKey().duplicate();
for (int i = 0; i < dimensions.size(); ++i) {
final DimensionSelector dimSelector = dimensions.get(i);
final int dimVal = keyBuffer.getInt();
if (MISSING_VALUE != dimVal) {
theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
}
}
int position = input.getValue();
for (int i = 0; i < aggregators.length; ++i) {
theEvent.put(metricNames[i], aggregators[i].get(metricsBuffer, position));
position += increments[i];
}
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
theEvent.put(postAggregator.getName(), postAggregator.compute(theEvent));
}
return new MapBasedRow(timestamp, theEvent);
}
}
);
return delegate.next();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
// cleanup
for (BufferAggregator agg : aggregators) {
agg.close();
}
}
}
}