blob: fee5b4b9de72b711b6e4294c8f7e4cc953540265 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Tests the order in which Scan query results come back.
*
* Ensures that we have run-to-run stability of result order, which is important for offset-based pagination.
*/
@RunWith(Parameterized.class)
public class ScanQueryResultOrderingTest
{
private static final String DATASOURCE = "datasource";
private static final String ID_COLUMN = "id";
private static final RowAdapter<Object[]> ROW_ADAPTER = columnName -> {
if (ID_COLUMN.equals(columnName)) {
return row -> row[1];
} else if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
return row -> ((DateTime) row[0]).getMillis();
} else {
return row -> null;
}
};
private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
.addTimeColumn()
.add(ID_COLUMN, ColumnType.LONG)
.build();
private static final List<RowBasedSegment<Object[]>> SEGMENTS = ImmutableList.of(
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 0),
Sequences.simple(
ImmutableList.of(
new Object[]{DateTimes.of("2000T01"), 101},
new Object[]{DateTimes.of("2000T01"), 80},
new Object[]{DateTimes.of("2000T01"), 232},
new Object[]{DateTimes.of("2000T01"), 12},
new Object[]{DateTimes.of("2000T02"), 808},
new Object[]{DateTimes.of("2000T02"), 411},
new Object[]{DateTimes.of("2000T02"), 383},
new Object[]{DateTimes.of("2000T05"), 22}
)
),
ROW_ADAPTER,
ROW_SIGNATURE
),
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 1),
Sequences.simple(
ImmutableList.of(
new Object[]{DateTimes.of("2000T01"), 333},
new Object[]{DateTimes.of("2000T01"), 222},
new Object[]{DateTimes.of("2000T01"), 444},
new Object[]{DateTimes.of("2000T01"), 111},
new Object[]{DateTimes.of("2000T03"), 555},
new Object[]{DateTimes.of("2000T03"), 999},
new Object[]{DateTimes.of("2000T03"), 888},
new Object[]{DateTimes.of("2000T05"), 777}
)
),
ROW_ADAPTER,
ROW_SIGNATURE
),
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-02/P1D"), "1", 0),
Sequences.simple(
ImmutableList.of(
new Object[]{DateTimes.of("2000-01-02T00"), 7},
new Object[]{DateTimes.of("2000-01-02T02"), 9},
new Object[]{DateTimes.of("2000-01-02T03"), 8}
)
),
ROW_ADAPTER,
ROW_SIGNATURE
)
);
private final List<Integer> segmentToServerMap;
private final int limit;
private final int batchSize;
private final int maxRowsQueuedForOrdering;
private ScanQueryRunnerFactory queryRunnerFactory;
private List<QueryRunner<ScanResultValue>> segmentRunners;
@Parameterized.Parameters(name = "Segment-to-server map[{0}], limit[{1}], batchSize[{2}], maxRowsQueuedForOrdering[{3}]")
public static Iterable<Object[]> constructorFeeder()
{
// Set number of server equal to number of segments, then try all possible distributions of segments to servers.
final int numServers = SEGMENTS.size();
final Set<List<Integer>> segmentToServerMaps = Sets.cartesianProduct(
IntStream.range(0, SEGMENTS.size())
.mapToObj(i -> IntStream.range(0, numServers).boxed().collect(Collectors.toSet()))
.collect(Collectors.toList())
);
// Try every limit up to one past the total number of rows.
final Set<Integer> limits = new TreeSet<>();
final int totalNumRows = SEGMENTS.stream().mapToInt(s -> s.asStorageAdapter().getNumRows()).sum();
for (int i = 0; i <= totalNumRows + 1; i++) {
limits.add(i);
}
// Try various batch sizes.
final Set<Integer> batchSizes = ImmutableSortedSet.of(1, 2, 100);
final Set<Integer> maxRowsQueuedForOrderings = ImmutableSortedSet.of(1, 7, 100000);
return Sets.cartesianProduct(
segmentToServerMaps,
limits,
batchSizes,
maxRowsQueuedForOrderings
).stream().map(args -> args.toArray(new Object[0])).collect(Collectors.toList());
}
public ScanQueryResultOrderingTest(
final List<Integer> segmentToServerMap,
final int limit,
final int batchSize,
final int maxRowsQueuedForOrdering
)
{
this.segmentToServerMap = segmentToServerMap;
this.limit = limit;
this.batchSize = batchSize;
this.maxRowsQueuedForOrdering = maxRowsQueuedForOrdering;
}
@Before
public void setUp()
{
queryRunnerFactory = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
);
segmentRunners = SEGMENTS.stream().map(queryRunnerFactory::createRunner).collect(Collectors.toList());
}
@Test
public void testOrderNone()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.NONE)
.build(),
ImmutableList.of(
101,
80,
232,
12,
808,
411,
383,
22,
333,
222,
444,
111,
555,
999,
888,
777,
7,
9,
8
)
);
}
@Test
public void testOrderTimeAscending()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.ASCENDING)
.build(),
ImmutableList.of(
101,
80,
232,
12,
333,
222,
444,
111,
808,
411,
383,
555,
999,
888,
22,
777,
7,
9,
8
)
);
}
@Test
public void testOrderTimeDescending()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.DESCENDING)
.build(),
ImmutableList.of(
8,
9,
7,
777,
22,
888,
999,
555,
383,
411,
808,
111,
444,
222,
333,
12,
232,
80,
101
)
);
}
private void assertResultsEquals(final ScanQuery query, final List<Integer> expectedResults)
{
final List<List<Pair<SegmentId, QueryRunner<ScanResultValue>>>> serverRunners = new ArrayList<>();
for (int i = 0; i <= segmentToServerMap.stream().max(Comparator.naturalOrder()).orElse(0); i++) {
serverRunners.add(new ArrayList<>());
}
for (int segmentNumber = 0; segmentNumber < segmentToServerMap.size(); segmentNumber++) {
final SegmentId segmentId = SEGMENTS.get(segmentNumber).getId();
final int serverNumber = segmentToServerMap.get(segmentNumber);
serverRunners.get(serverNumber).add(Pair.of(segmentId, segmentRunners.get(segmentNumber)));
}
// Simulates what the Historical servers would do.
final List<QueryRunner<ScanResultValue>> mergedServerRunners =
serverRunners.stream()
.filter(runners -> !runners.isEmpty())
.map(
runners ->
queryRunnerFactory.getToolchest().mergeResults(
new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final QueryPlus<ScanResultValue> queryPlus,
final ResponseContext responseContext
)
{
return queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
runners.stream().map(p -> p.rhs).collect(Collectors.toList())
).run(
queryPlus.withQuery(
queryPlus.getQuery()
.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
runners.stream()
.map(p -> p.lhs.toDescriptor())
.collect(Collectors.toList())
)
)
),
responseContext
);
}
}
)
)
.collect(Collectors.toList());
// Simulates what the Broker would do.
final QueryRunner<ScanResultValue> brokerRunner = queryRunnerFactory.getToolchest().mergeResults(
(queryPlus, responseContext) -> {
final List<Sequence<ScanResultValue>> sequences =
mergedServerRunners.stream()
.map(runner -> runner.run(queryPlus.withoutThreadUnsafeState()))
.collect(Collectors.toList());
return new MergeSequence<>(
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(sequences)
);
}
);
// Finally: run the query.
final List<Integer> results = runQuery(
(ScanQuery) Druids.ScanQueryBuilder.copy(query)
.limit(limit)
.batchSize(batchSize)
.build()
.withOverriddenContext(
ImmutableMap.of(
ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING,
maxRowsQueuedForOrdering
)
),
brokerRunner
);
Assert.assertEquals(
expectedResults.stream().limit(limit == 0 ? Long.MAX_VALUE : limit).collect(Collectors.toList()),
results
);
}
private List<Integer> runQuery(final ScanQuery query, final QueryRunner<ScanResultValue> brokerRunner)
{
final List<Object[]> results = queryRunnerFactory.getToolchest().resultsAsArrays(
query,
brokerRunner.run(QueryPlus.wrap(query))
).toList();
return results.stream().mapToInt(row -> (int) row[1]).boxed().collect(Collectors.toList());
}
}