blob: 2efe5e49eb4aaaf52ff286dd48aa8e0734b554be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.collections.StableLimitingSorter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
private final ScanQueryQueryToolChest toolChest;
private final ScanQueryEngine engine;
private final ScanQueryConfig scanQueryConfig;
@Inject
public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest,
ScanQueryEngine engine,
ScanQueryConfig scanQueryConfig
)
{
this.toolChest = toolChest;
this.engine = engine;
this.scanQueryConfig = scanQueryConfig;
}
@Override
public QueryRunner<ScanResultValue> createRunner(Segment segment)
{
return new ScanQueryRunner(engine, segment);
}
@Override
public QueryRunner<ScanResultValue> mergeRunners(
final ExecutorService queryExecutor,
final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{
// in single thread and in jetty thread instead of processing thread
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of ResponseContext.Key.TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(ResponseContext.Key.TIMEOUT_AT, timeoutAt);
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
// Use normal strategy
Sequence<ScanResultValue> returnedRows = Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
input -> input.run(queryPlus, responseContext)
)
);
if (query.getScanRowsLimit() <= Integer.MAX_VALUE) {
return returnedRows.limit(Math.toIntExact(query.getScanRowsLimit()));
} else {
return returnedRows;
}
} else {
List<Interval> intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
intervalsOrdered = Lists.reverse(intervalsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
? scanQueryConfig.getMaxRowsQueuedForOrdering()
: query.getMaxRowsQueuedForOrdering());
if (query.getScanRowsLimit() <= maxRowsQueuedForOrdering) {
// Use priority queue strategy
try {
return stableLimitingSort(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext)
)),
query,
intervalsOrdered
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
// Use n-way merge strategy
List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
if (intervalsOrdered.size() == queryRunnersOrdered.size()) {
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
}
} else if (queryRunners instanceof SinkQueryRunners) {
((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
.forEachRemaining(intervalsAndRunnersOrdered::add);
} else {
throw new ISE("Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!");
}
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
intervalsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs,
LinkedHashMap::new,
Collectors.toList()
));
// Find the segment with the largest numbers of partitions. This will be used to compare with the
// maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
int maxNumPartitionsInSegment =
partitionsGroupedByInterval.values()
.stream()
.map(x -> x.size())
.max(Comparator.comparing(Integer::valueOf))
.get();
int segmentPartitionLimit = query.getMaxSegmentPartitionsOrderedInMemory() == null
? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
: query.getMaxSegmentPartitionsOrderedInMemory();
if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
// Use n-way merge strategy
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
// there should be no interval overlap. We create a list of lists so we can create a sequence of sequences.
// There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
List<List<QueryRunner<ScanResultValue>>> groupedRunners =
partitionsGroupedByInterval.entrySet()
.stream()
.map(entry -> entry.getValue()
.stream()
.map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
.collect(Collectors.toList()))
.collect(Collectors.toList());
return nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
}
throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+ " %,d partitions or lower the row limit below %,d.",
maxNumPartitionsInSegment,
query.getScanRowsLimit(),
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
}
}
};
}
/**
* Returns a sorted and limited copy of the provided {@param inputSequence}. Materializes the full sequence
* in memory before returning it. The amount of memory use is limited by the limit of the {@param scanQuery}.
*/
@VisibleForTesting
Sequence<ScanResultValue> stableLimitingSort(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
List<Interval> intervalsOrdered
) throws IOException
{
Comparator<ScanResultValue> comparator = scanQuery.getResultOrdering();
if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
throw new UOE(
"Limit of %,d rows not supported for priority queue strategy of time-ordering scan results",
scanQuery.getScanRowsLimit()
);
}
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getScanRowsLimit());
final StableLimitingSorter<ScanResultValue> sorter = new StableLimitingSorter<>(comparator, limit);
Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
try {
boolean doneScanning = yielder.isDone();
// We need to scan limit elements and anything else in the last segment
int numRowsScanned = 0;
Interval finalInterval = null;
while (!doneScanning) {
ScanResultValue next = yielder.get();
List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
for (ScanResultValue srv : singleEventScanResultValues) {
numRowsScanned++;
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
sorter.add(srv);
// Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
for (Interval interval : intervalsOrdered) {
if (interval.contains(timestampOfLimitRow)) {
finalInterval = interval;
}
}
if (finalInterval == null) {
throw new ISE("Row came from an unscanned interval");
}
}
}
yielder = yielder.next(null);
doneScanning = yielder.isDone() ||
(finalInterval != null &&
!finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat())));
}
final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
Iterators.addAll(sortedElements, sorter.drain());
return Sequences.simple(sortedElements);
}
finally {
yielder.close();
}
}
@VisibleForTesting
List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
{
// Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
// segment descriptors need to be present for a 1:1 matching of intervals with query runners.
// The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
// the 1:1 relationship between intervals and query runners.
List<Interval> descriptorsOrdered;
if (spec instanceof MultipleSpecificSegmentSpec) {
// Ascending time order for both descriptors and query runners by default
descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors()
.stream()
.map(SegmentDescriptor::getInterval)
.collect(Collectors.toList());
} else if (spec instanceof SpecificSegmentSpec) {
descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor().getInterval());
} else {
throw new UOE(
"Time-ordering on scan queries is only supported for queries with segment specs "
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
spec.getClass().getSimpleName()
);
}
return descriptorsOrdered;
}
@VisibleForTesting
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,
QueryPlus<ScanResultValue> queryPlus,
ResponseContext responseContext
)
{
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
Sequence<ScanResultValue> resultSequence =
Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
queryPlus.getQuery().getResultOrdering()
)
)
);
long limit = ((ScanQuery) (queryPlus.getQuery())).getScanRowsLimit();
if (limit == Long.MAX_VALUE) {
return resultSequence;
}
return resultSequence.limit(limit);
}
@Override
public QueryToolChest<ScanResultValue, ScanQuery> getToolchest()
{
return toolChest;
}
private static class ScanQueryRunner implements QueryRunner<ScanResultValue>
{
private final ScanQueryEngine engine;
private final Segment segment;
public ScanQueryRunner(ScanQueryEngine engine, Segment segment)
{
this.engine = engine;
this.segment = segment;
}
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext)
{
Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}
// it happens in unit tests
final Number timeoutAt = (Number) responseContext.get(ResponseContext.Key.TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
responseContext.put(ResponseContext.Key.TIMEOUT_AT, JodaUtils.MAX_INSTANT);
}
return engine.process((ScanQuery) query, segment, responseContext);
}
}
}