blob: f0b2a9e1edded84114212de41030a76813807224 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.incremental.AppendableIndexBuilder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupByQueryHelper
{
public static final String CTX_KEY_SORT_RESULTS = "sortResults";
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
@Nullable final GroupByQuery subquery,
final GroupByQueryConfig config,
NonBlockingPool<ByteBuffer> bufferPool
)
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final Granularity gran = query.getGranularity();
final DateTime timeStart = query.getIntervals().get(0).getStart();
final boolean combine = subquery == null;
DateTime granTimeStart = timeStart;
if (!(Granularities.ALL.equals(gran))) {
granTimeStart = gran.bucketStart(timeStart);
}
final List<AggregatorFactory> aggs;
if (combine) {
aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
} else {
aggs = query.getAggregatorSpecs();
}
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getOutputName();
}
}
);
final IncrementalIndex index;
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
// All groupBy dimensions are strings, for now.
final List<DimensionSchema> dimensionSchemas = new ArrayList<>();
for (DimensionSpec dimension : query.getDimensions()) {
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
}
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(dimensionSchemas, null, null))
.withMetrics(aggs.toArray(new AggregatorFactory[0]))
.withQueryGranularity(gran)
.withMinTimestamp(granTimeStart.getMillis())
.build();
final AppendableIndexBuilder indexBuilder;
if (query.getContextValue("useOffheap", false)) {
indexBuilder = new OffheapIncrementalIndex.Builder()
.setBufferPool(bufferPool);
} else {
indexBuilder = new OnheapIncrementalIndex.Builder();
}
index = indexBuilder
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.build();
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, T in)
{
final MapBasedRow mapBasedRow;
if (in instanceof MapBasedRow) {
mapBasedRow = (MapBasedRow) in;
} else if (in instanceof ResultRow) {
final ResultRow row = (ResultRow) in;
mapBasedRow = row.toMapBasedRow(combine ? query : subquery);
} else {
throw new ISE("Unable to accumulate something of type [%s]", in.getClass());
}
try {
accumulated.add(
new MapBasedInputRow(
mapBasedRow.getTimestamp(),
dimensions,
mapBasedRow.getEvent()
)
);
}
catch (IndexSizeExceededException e) {
throw new ResourceLimitExceededException(e.getMessage());
}
return accumulated;
}
};
return new Pair<>(index, accumulator);
}
public static <T> Pair<Queue, Accumulator<Queue, T>> createBySegmentAccumulatorPair()
{
// In parallel query runner multiple threads add to this queue concurrently
Queue init = new ConcurrentLinkedQueue<>();
Accumulator<Queue, T> accumulator = new Accumulator<Queue, T>()
{
@Override
public Queue accumulate(Queue accumulated, T in)
{
if (in == null) {
throw new ISE("Cannot have null result");
}
accumulated.offer(in);
return accumulated;
}
};
return new Pair<>(init, accumulator);
}
// Used by GroupByStrategyV1
public static IncrementalIndex makeIncrementalIndex(
GroupByQuery query,
@Nullable GroupByQuery subquery,
GroupByQueryConfig config,
NonBlockingPool<ByteBuffer> bufferPool,
Sequence<ResultRow> rows
)
{
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, ResultRow>> indexAccumulatorPair =
GroupByQueryHelper.createIndexAccumulatorPair(query, subquery, config, bufferPool);
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
// Used by GroupByStrategyV1
public static Sequence<ResultRow> postAggregate(final GroupByQuery query, IncrementalIndex<?> index)
{
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs(), query.isDescending())),
input -> {
final ResultRow resultRow = toResultRow(query, input);
if (query.getResultRowHasTimestamp()) {
resultRow.set(0, query.getGranularity().toDateTime(resultRow.getLong(0)).getMillis());
}
return resultRow;
}
);
}
public static ResultRow toResultRow(final GroupByQuery query, final Row row)
{
final ResultRow resultRow = ResultRow.create(query.getResultRowSizeWithPostAggregators());
int i = 0;
if (query.getResultRowHasTimestamp()) {
resultRow.set(i++, row.getTimestampFromEpoch());
}
for (DimensionSpec dimensionSpec : query.getDimensions()) {
resultRow.set(i++, row.getRaw(dimensionSpec.getOutputName()));
}
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
resultRow.set(i++, row.getRaw(aggregatorFactory.getName()));
}
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
resultRow.set(i++, row.getRaw(postAggregator.getName()));
}
return resultRow;
}
}