| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.query; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.MergeSequence; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.js.JavaScriptConfig; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; |
| import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; |
| import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongMinAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; |
| import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; |
| import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; |
| import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; |
| import org.apache.druid.query.aggregation.post.ConstantPostAggregator; |
| import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.query.spec.SpecificSegmentSpec; |
| import org.apache.druid.query.timeseries.TimeseriesQueryEngine; |
| import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; |
| import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; |
| import org.apache.druid.segment.IncrementalIndexSegment; |
| import org.apache.druid.segment.QueryableIndex; |
| import org.apache.druid.segment.QueryableIndexSegment; |
| import org.apache.druid.segment.ReferenceCountingSegment; |
| import org.apache.druid.segment.Segment; |
| import org.apache.druid.segment.TestIndex; |
| import org.apache.druid.segment.incremental.IncrementalIndex; |
| import org.apache.druid.timeline.SegmentId; |
| import org.apache.druid.timeline.TimelineObjectHolder; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| /** |
| * |
| */ |
| public class QueryRunnerTestHelper |
| { |
| |
| public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> { |
| }; |
| |
| public static final String DATA_SOURCE = "testing"; |
| public static final Interval FULL_ON_INTERVAL = Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"); |
| public static final SegmentId SEGMENT_ID = SegmentId.of(DATA_SOURCE, FULL_ON_INTERVAL, "dummy_version", 0); |
| public static final UnionDataSource UNION_DATA_SOURCE = new UnionDataSource( |
| Stream.of(DATA_SOURCE, DATA_SOURCE, DATA_SOURCE, DATA_SOURCE) |
| .map(TableDataSource::new) |
| .collect(Collectors.toList()) |
| ); |
| |
| public static final Granularity DAY_GRAN = Granularities.DAY; |
| public static final Granularity ALL_GRAN = Granularities.ALL; |
| public static final Granularity MONTH_GRAN = Granularities.MONTH; |
| public static final String TIME_DIMENSION = "__time"; |
| public static final String MARKET_DIMENSION = "market"; |
| public static final String QUALITY_DIMENSION = "quality"; |
| public static final String PLACEMENT_DIMENSION = "placement"; |
| public static final String PLACEMENTISH_DIMENSION = "placementish"; |
| public static final String PARTIAL_NULL_DIMENSION = "partial_null_column"; |
| |
| public static final List<String> DIMENSIONS = Lists.newArrayList( |
| MARKET_DIMENSION, |
| QUALITY_DIMENSION, |
| PLACEMENT_DIMENSION, |
| PLACEMENTISH_DIMENSION |
| ); |
| public static final String INDEX_METRIC = "index"; |
| public static final String UNIQUE_METRIC = "uniques"; |
| public static final String ADD_ROWS_INDEX_CONSTANT_METRIC = "addRowsIndexConstant"; |
| public static final String LONG_MIN_INDEX_METRIC = "longMinIndex"; |
| public static final String LONG_MAX_INDEX_METRIC = "longMaxIndex"; |
| public static final String DOUBLE_MIN_INDEX_METRIC = "doubleMinIndex"; |
| public static final String DOUBLE_MAX_INDEX_METRIC = "doubleMaxIndex"; |
| public static final String FLOAT_MIN_INDEX_METRIC = "floatMinIndex"; |
| public static final String FLOAT_MAX_INDEX_METRIC = "floatMaxIndex"; |
| public static String dependentPostAggMetric = "dependentPostAgg"; |
| public static final CountAggregatorFactory ROWS_COUNT = new CountAggregatorFactory("rows"); |
| public static final LongSumAggregatorFactory INDEX_LONG_SUM = new LongSumAggregatorFactory("index", INDEX_METRIC); |
| public static final LongSumAggregatorFactory TIME_LONG_SUM = new LongSumAggregatorFactory("sumtime", TIME_DIMENSION); |
| public static final DoubleSumAggregatorFactory INDEX_DOUBLE_SUM = new DoubleSumAggregatorFactory( |
| "index", |
| INDEX_METRIC |
| ); |
| public static final LongMinAggregatorFactory INDEX_LONG_MIN = new LongMinAggregatorFactory(LONG_MIN_INDEX_METRIC, INDEX_METRIC); |
| public static final LongMaxAggregatorFactory INDEX_LONG_MAX = new LongMaxAggregatorFactory(LONG_MAX_INDEX_METRIC, INDEX_METRIC); |
| public static final DoubleMinAggregatorFactory INDEX_DOUBLE_MIN = new DoubleMinAggregatorFactory(DOUBLE_MIN_INDEX_METRIC, INDEX_METRIC); |
| public static final DoubleMaxAggregatorFactory INDEX_DOUBLE_MAX = new DoubleMaxAggregatorFactory(DOUBLE_MAX_INDEX_METRIC, INDEX_METRIC); |
| public static final FloatMinAggregatorFactory INDEX_FLOAT_MIN = new FloatMinAggregatorFactory(FLOAT_MIN_INDEX_METRIC, INDEX_METRIC); |
| public static final FloatMaxAggregatorFactory INDEX_FLOAT_MAX = new FloatMaxAggregatorFactory(FLOAT_MAX_INDEX_METRIC, INDEX_METRIC); |
| public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }"; |
| public static final String JS_RESET_0 = "function reset() { return 0; }"; |
| public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory( |
| "nindex", |
| Arrays.asList("placementish", "index"), |
| "function aggregate(current, a, b) { if ((Array.isArray(a) && a.indexOf('a') > -1) || a === 'a') { return current + b; } else { return current; } }", |
| JS_RESET_0, |
| JS_COMBINE_A_PLUS_B, |
| JavaScriptConfig.getEnabledInstance() |
| ); |
| public static final JavaScriptAggregatorFactory JS_COUNT_IF_TIME_GREATER_THAN = new JavaScriptAggregatorFactory( |
| "ntimestamps", |
| Collections.singletonList("__time"), |
| "function aggregate(current, t) { if (t > " + |
| DateTimes.of("2011-04-01T12:00:00Z").getMillis() + |
| ") { return current + 1; } else { return current; } }", |
| JS_RESET_0, |
| JS_COMBINE_A_PLUS_B, |
| JavaScriptConfig.getEnabledInstance() |
| ); |
| public static final JavaScriptAggregatorFactory JS_PLACEMENTISH_COUNT = new JavaScriptAggregatorFactory( |
| "pishcount", |
| Arrays.asList("placementish", "index"), |
| "function aggregate(current, a) { if (Array.isArray(a)) { return current + a.length; } else if (typeof a === 'string') { return current + 1; } else { return current; } }", |
| JS_RESET_0, |
| JS_COMBINE_A_PLUS_B, |
| JavaScriptConfig.getEnabledInstance() |
| ); |
| public static final HyperUniquesAggregatorFactory QUALITY_UNIQUES = new HyperUniquesAggregatorFactory( |
| "uniques", |
| "quality_uniques" |
| ); |
| public static final HyperUniquesAggregatorFactory QUALITY_UNIQUES_ROUNDED = new HyperUniquesAggregatorFactory( |
| "uniques", |
| "quality_uniques", |
| false, |
| true |
| ); |
| public static final CardinalityAggregatorFactory QUALITY_CARDINALITY = new CardinalityAggregatorFactory( |
| "cardinality", |
| Collections.singletonList(new DefaultDimensionSpec("quality", "quality")), |
| false |
| ); |
| public static final ConstantPostAggregator CONSTANT = new ConstantPostAggregator("const", 1L); |
| public static final FieldAccessPostAggregator ROWS_POST_AGG = new FieldAccessPostAggregator("rows", "rows"); |
| public static final FieldAccessPostAggregator INDEX_POST_AGG = new FieldAccessPostAggregator("index", "index"); |
| public static final ArithmeticPostAggregator ADD_ROWS_INDEX_CONSTANT = new ArithmeticPostAggregator( |
| ADD_ROWS_INDEX_CONSTANT_METRIC, |
| "+", |
| Lists.newArrayList(CONSTANT, ROWS_POST_AGG, INDEX_POST_AGG) |
| ); |
| // dependent on AddRowsIndexContact postAgg |
| public static final ArithmeticPostAggregator DEPENDENT_POST_AGG = new ArithmeticPostAggregator( |
| dependentPostAggMetric, |
| "+", |
| Lists.newArrayList( |
| CONSTANT, |
| new FieldAccessPostAggregator(ADD_ROWS_INDEX_CONSTANT_METRIC, ADD_ROWS_INDEX_CONSTANT_METRIC), |
| new FieldAccessPostAggregator("rows", "rows") |
| ) |
| ); |
| |
| public static final String HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC = "hyperUniqueFinalizingPostAggMetric"; |
| public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator( |
| HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC, |
| "+", |
| Lists.newArrayList( |
| new HyperUniqueFinalizingPostAggregator(UNIQUE_METRIC, UNIQUE_METRIC), |
| new ConstantPostAggregator(null, 1) |
| ) |
| ); |
| |
| public static final List<AggregatorFactory> COMMON_DOUBLE_AGGREGATORS = Arrays.asList( |
| ROWS_COUNT, |
| INDEX_DOUBLE_SUM, |
| QUALITY_UNIQUES |
| ); |
| |
| public static final List<AggregatorFactory> COMMON_FLOAT_AGGREGATORS = Arrays.asList( |
| new FloatSumAggregatorFactory("index", "indexFloat"), |
| new CountAggregatorFactory("rows"), |
| new HyperUniquesAggregatorFactory( |
| "uniques", |
| "quality_uniques" |
| ) |
| ); |
| |
| public static final double UNIQUES_9 = 9.019833517963864; |
| public static final double UNIQUES_2 = 2.000977198748901d; |
| public static final double UNIQUES_1 = 1.0002442201269182d; |
| |
| public static final String[] EXPECTED_FULL_ON_INDEX_VALUES = new String[]{ |
| "4500.0", "6077.949111938477", "4922.488838195801", "5726.140853881836", "4698.468170166016", |
| "4651.030891418457", "4398.145851135254", "4596.068244934082", "4434.630561828613", "0.0", |
| "6162.801361083984", "5590.292701721191", "4994.298484802246", "5179.679672241211", "6288.556800842285", |
| "6025.663551330566", "5772.855537414551", "5346.517524719238", "5497.331253051758", "5909.684387207031", |
| "5862.711364746094", "5958.373008728027", "5224.882194519043", "5456.789611816406", "5456.095397949219", |
| "4642.481948852539", "5023.572692871094", "5155.821723937988", "5350.3723220825195", "5236.997489929199", |
| "4910.097717285156", "4507.608840942383", "4659.80500793457", "5354.878845214844", "4945.796455383301", |
| "6459.080368041992", "4390.493583679199", "6545.758262634277", "6922.801231384277", "6023.452911376953", |
| "6812.107475280762", "6368.713348388672", "6381.748748779297", "5631.245086669922", "4976.192253112793", |
| "6541.463027954102", "5983.8513107299805", "5967.189498901367", "5567.139289855957", "4863.5944747924805", |
| "4681.164360046387", "6122.321441650391", "5410.308860778809", "4846.676376342773", "5333.872688293457", |
| "5013.053741455078", "4836.85563659668", "5264.486434936523", "4581.821243286133", "4680.233596801758", |
| "4771.363662719727", "5038.354717254639", "4816.808464050293", "4684.095504760742", "5023.663467407227", |
| "5889.72257232666", "4984.973915100098", "5664.220512390137", "5572.653915405273", "5537.123138427734", |
| "5980.422874450684", "6243.834693908691", "5372.147285461426", "5690.728981018066", "5827.796455383301", |
| "6141.0769119262695", "6082.3237228393555", "5678.771339416504", "6814.467971801758", "6626.151596069336", |
| "5833.2095947265625", "4679.222328186035", "5367.9403076171875", "5410.445640563965", "5689.197135925293", |
| "5240.5018310546875", "4790.912239074707", "4992.670921325684", "4796.888023376465", "5479.439590454102", |
| "5506.567192077637", "4743.144546508789", "4913.282669067383", "4723.869743347168" |
| }; |
| |
| public static final String[] EXPECTED_FULL_ON_INDEX_VALUES_DESC; |
| |
| static { |
| List<String> list = new ArrayList<>(Arrays.asList(EXPECTED_FULL_ON_INDEX_VALUES)); |
| Collections.reverse(list); |
| EXPECTED_FULL_ON_INDEX_VALUES_DESC = list.toArray(new String[0]); |
| } |
| |
| public static final DateTime EARLIEST = DateTimes.of("2011-01-12"); |
| public static final DateTime LAST = DateTimes.of("2011-04-15"); |
| |
| public static final DateTime SKIPPED_DAY = DateTimes.of("2011-01-21T00:00:00.000Z"); |
| |
| public static final QuerySegmentSpec FIRST_TO_THIRD = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")) |
| ); |
| public static final QuerySegmentSpec SECOND_ONLY = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(Intervals.of("2011-04-02T00:00:00.000Z/P1D")) |
| ); |
| |
| public static final QuerySegmentSpec FULL_ON_INTERVAL_SPEC = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(FULL_ON_INTERVAL) |
| ); |
| public static final QuerySegmentSpec EMPTY_INTERVAL = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(Intervals.of("2020-04-02T00:00:00.000Z/P1D")) |
| ); |
| |
| public static Iterable<Object[]> transformToConstructionFeeder(Iterable<?> in) |
| { |
| return Iterables.transform(in, (Function<Object, Object[]>) input -> new Object[]{input}); |
| } |
| |
| // simple cartesian iterable |
| public static Iterable<Object[]> cartesian(final Iterable... iterables) |
| { |
| return new Iterable<Object[]>() |
| { |
| |
| @Override |
| public Iterator<Object[]> iterator() |
| { |
| return new Iterator<Object[]>() |
| { |
| private final Iterator[] iterators = new Iterator[iterables.length]; |
| private final Object[] cached = new Object[iterables.length]; |
| |
| @Override |
| public boolean hasNext() |
| { |
| return hasNext(0); |
| } |
| |
| private boolean hasNext(int index) |
| { |
| if (iterators[index] == null) { |
| iterators[index] = iterables[index].iterator(); |
| } |
| for (; hasMore(index); cached[index] = null) { |
| if (index == iterables.length - 1 || hasNext(index + 1)) { |
| return true; |
| } |
| } |
| iterators[index] = null; |
| return false; |
| } |
| |
| private boolean hasMore(int index) |
| { |
| if (cached[index] == null && iterators[index].hasNext()) { |
| cached[index] = iterators[index].next(); |
| } |
| return cached[index] != null; |
| } |
| |
| @Override |
| public Object[] next() |
| { |
| Object[] result = Arrays.copyOf(cached, cached.length); |
| cached[cached.length - 1] = null; |
| return result; |
| } |
| |
| @Override |
| public void remove() |
| { |
| throw new UnsupportedOperationException("remove"); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** |
| * Check if a QueryRunner returned by {@link #makeQueryRunners(QueryRunnerFactory)} is vectorizable. |
| */ |
| public static boolean isTestRunnerVectorizable(QueryRunner runner) |
| { |
| final String runnerName = runner.toString(); |
| return !("rtIndex".equals(runnerName) || "noRollupRtIndex".equals(runnerName)); |
| } |
| |
| public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunners( |
| QueryRunnerFactory<T, QueryType> factory |
| ) |
| { |
| final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); |
| final IncrementalIndex noRollupRtIndex = TestIndex.getNoRollupIncrementalTestIndex(); |
| final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); |
| final QueryableIndex noRollupMMappedTestIndex = TestIndex.getNoRollupMMappedTestIndex(); |
| final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); |
| return ImmutableList.of( |
| makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, SEGMENT_ID), ("rtIndex")), |
| makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, SEGMENT_ID), "noRollupRtIndex"), |
| makeQueryRunner(factory, new QueryableIndexSegment(mMappedTestIndex, SEGMENT_ID), "mMappedTestIndex"), |
| makeQueryRunner( |
| factory, |
| new QueryableIndexSegment(noRollupMMappedTestIndex, SEGMENT_ID), |
| "noRollupMMappedTestIndex" |
| ), |
| makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex") |
| ); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static Collection<?> makeUnionQueryRunners(QueryRunnerFactory factory) |
| { |
| final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); |
| final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); |
| final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); |
| |
| return Arrays.asList( |
| makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, SEGMENT_ID), "rtIndex"), |
| makeUnionQueryRunner(factory, new QueryableIndexSegment(mMappedTestIndex, SEGMENT_ID), "mMappedTestIndex"), |
| makeUnionQueryRunner( |
| factory, |
| new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), |
| "mergedRealtimeIndex" |
| ) |
| ); |
| } |
| |
| public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner( |
| QueryRunnerFactory<T, QueryType> factory, |
| String resourceFileName, |
| final String runnerName |
| ) |
| { |
| return makeQueryRunner( |
| factory, |
| SEGMENT_ID, |
| new IncrementalIndexSegment(TestIndex.makeRealtimeIndex(resourceFileName), SEGMENT_ID), |
| runnerName |
| ); |
| } |
| |
| public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner( |
| QueryRunnerFactory<T, QueryType> factory, |
| Segment adapter, |
| final String runnerName |
| ) |
| { |
| return makeQueryRunner(factory, SEGMENT_ID, adapter, runnerName); |
| } |
| |
| public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner( |
| QueryRunnerFactory<T, QueryType> factory, |
| SegmentId segmentId, |
| Segment adapter, |
| final String runnerName |
| ) |
| { |
| return new FinalizeResultsQueryRunner<T>( |
| new BySegmentQueryRunner<>(segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter)), |
| (QueryToolChest<T, Query<T>>) factory.getToolchest() |
| ) |
| { |
| @Override |
| public String toString() |
| { |
| return runnerName; |
| } |
| }; |
| } |
| |
| public static <T> QueryRunner<T> makeUnionQueryRunner( |
| QueryRunnerFactory<T, Query<T>> factory, |
| Segment adapter, |
| final String runnerName |
| ) |
| { |
| BySegmentQueryRunner<T> bySegmentQueryRunner = |
| new BySegmentQueryRunner<>(SEGMENT_ID, adapter.getDataInterval().getStart(), factory.createRunner(adapter)); |
| final QueryRunner<T> runner = new FluentQueryRunnerBuilder<T>(factory.getToolchest()) |
| .create(new UnionQueryRunner<>(bySegmentQueryRunner)) |
| .mergeResults() |
| .applyPostMergeDecoration(); |
| |
| return new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) |
| { |
| return runner.run(queryPlus, responseContext); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return runnerName; |
| } |
| }; |
| } |
| |
| public static <T> QueryRunner<T> makeFilteringQueryRunner( |
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline, |
| final QueryRunnerFactory<T, Query<T>> factory |
| ) |
| { |
| final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); |
| return new FluentQueryRunnerBuilder<T>(toolChest) |
| .create( |
| new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) |
| { |
| Query<T> query = queryPlus.getQuery(); |
| List<TimelineObjectHolder> segments = new ArrayList<>(); |
| for (Interval interval : query.getIntervals()) { |
| segments.addAll(timeline.lookup(interval)); |
| } |
| List<Sequence<T>> sequences = new ArrayList<>(); |
| for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : toolChest.filterSegments( |
| query, |
| segments |
| )) { |
| Segment segment = holder.getObject().getChunk(0).getObject(); |
| QueryPlus queryPlusRunning = queryPlus.withQuery( |
| queryPlus.getQuery().withQuerySegmentSpec( |
| new SpecificSegmentSpec( |
| new SegmentDescriptor( |
| holder.getInterval(), |
| holder.getVersion(), |
| 0 |
| ) |
| ) |
| ) |
| ); |
| sequences.add(factory.createRunner(segment).run(queryPlusRunning, responseContext)); |
| } |
| return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences)); |
| } |
| } |
| ) |
| .applyPreMergeDecoration() |
| .mergeResults() |
| .applyPostMergeDecoration(); |
| } |
| |
| public static Map<String, Object> of(Object... keyvalues) |
| { |
| ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); |
| for (int i = 0; i < keyvalues.length; i += 2) { |
| builder.put(String.valueOf(keyvalues[i]), keyvalues[i + 1]); |
| } |
| return builder.build(); |
| } |
| |
| public static TimeseriesQueryRunnerFactory newTimeseriesQueryRunnerFactory() |
| { |
| return new TimeseriesQueryRunnerFactory( |
| new TimeseriesQueryQueryToolChest(), |
| new TimeseriesQueryEngine(), |
| QueryRunnerTestHelper.NOOP_QUERYWATCHER |
| ); |
| } |
| |
| public static Map<String, Object> orderedMap(Object... keyValues) |
| { |
| LinkedHashMap<String, Object> map = new LinkedHashMap<>(); |
| for (int i = 0; i < keyValues.length; i += 2) { |
| map.put(keyValues[i].toString(), keyValues[i + 1]); |
| } |
| return map; |
| } |
| } |