blob: 9d793c73fd819affdeec62aee91f7d4358449f1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.RegexDimExtractionFn;
import org.apache.druid.query.filter.JavaScriptDimFilter;
import org.apache.druid.query.groupby.having.GreaterThanHavingSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategy;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class NestedQueryPushDownTest
{
private static final IndexIO INDEX_IO;
private static final IndexMergerV9 INDEX_MERGER_V9;
public static final ObjectMapper JSON_MAPPER;
private File tmpDir;
private QueryRunnerFactory<ResultRow, GroupByQuery> groupByFactory;
private QueryRunnerFactory<ResultRow, GroupByQuery> groupByFactory2;
private List<IncrementalIndex> incrementalIndices = new ArrayList<>();
private List<QueryableIndex> groupByIndices = new ArrayList<>();
private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std().addValue(
ExprMacroTable.class,
ExprMacroTable.nil()
)
);
INDEX_IO = new IndexIO(
JSON_MAPPER,
() -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dimA"),
new StringDimensionSchema("dimB"),
new LongDimensionSchema("metA"),
new LongDimensionSchema("metB")
),
null,
null
))
.build()
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
}
@Before
public void setup() throws Exception
{
tmpDir = FileUtils.createTempDir();
InputRow row;
List<String> dimNames = Arrays.asList("dimA", "metA", "dimB", "metB");
Map<String, Object> event;
final IncrementalIndex indexA = makeIncIndex();
incrementalIndices.add(indexA);
event = new HashMap<>();
event.put("dimA", "pomegranate");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 10L);
row = new MapBasedInputRow(1505260888888L, dimNames, event);
indexA.add(row);
event = new HashMap<>();
event.put("dimA", "mango");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 20L);
row = new MapBasedInputRow(1505260800000L, dimNames, event);
indexA.add(row);
event = new HashMap<>();
event.put("dimA", "pomegranate");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 10L);
row = new MapBasedInputRow(1505264400000L, dimNames, event);
indexA.add(row);
event = new HashMap<>();
event.put("dimA", "mango");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 20L);
row = new MapBasedInputRow(1505264400400L, dimNames, event);
indexA.add(row);
final File fileA = INDEX_MERGER_V9.persist(
indexA,
new File(tmpDir, "A"),
new IndexSpec(),
null
);
QueryableIndex qindexA = INDEX_IO.loadIndex(fileA);
final IncrementalIndex indexB = makeIncIndex();
incrementalIndices.add(indexB);
event = new HashMap<>();
event.put("dimA", "pomegranate");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 10L);
row = new MapBasedInputRow(1505260800000L, dimNames, event);
indexB.add(row);
event = new HashMap<>();
event.put("dimA", "mango");
event.put("metA", 1000L);
event.put("dimB", "sweet");
event.put("metB", 20L);
row = new MapBasedInputRow(1505260800000L, dimNames, event);
indexB.add(row);
event = new HashMap<>();
event.put("dimA", "pomegranate");
event.put("metA", 1000L);
event.put("dimB", "sour");
event.put("metB", 10L);
row = new MapBasedInputRow(1505264400000L, dimNames, event);
indexB.add(row);
event = new HashMap<>();
event.put("dimA", "mango");
event.put("metA", 1000L);
event.put("dimB", "sour");
event.put("metB", 20L);
row = new MapBasedInputRow(1505264400000L, dimNames, event);
indexB.add(row);
final File fileB = INDEX_MERGER_V9.persist(
indexB,
new File(tmpDir, "B"),
new IndexSpec(),
null
);
QueryableIndex qindexB = INDEX_IO.loadIndex(fileB);
groupByIndices = Arrays.asList(qindexA, qindexB);
setupGroupByFactory();
}
private void setupGroupByFactory()
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
// limit of 3 is required since we simulate running historical running nested query and broker doing the final merge
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
10
);
// limit of 3 is required since we simulate running historical running nested query and broker doing the final merge
BlockingPool<ByteBuffer> mergePool2 = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
10
);
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return "v2";
}
@Override
public int getBufferGrouperInitialBuckets()
{
return -1;
}
@Override
public long getMaxOnDiskStorage()
{
return 1_000_000_000L;
}
};
config.setSingleThreaded(false);
config.setMaxIntermediateRows(Integer.MAX_VALUE);
config.setMaxResults(Integer.MAX_VALUE);
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
{
@Override
public int getNumThreads()
{
// Used by "v2" strategy for concurrencyHint
return 2;
}
@Override
public String getFormatString()
{
return null;
}
};
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
);
final GroupByStrategySelector strategySelector2 = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool2,
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
);
groupByFactory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(strategySelector)
);
groupByFactory2 = new GroupByQueryRunnerFactory(
strategySelector2,
new GroupByQueryQueryToolChest(strategySelector2)
);
}
@After
public void tearDown() throws Exception
{
for (IncrementalIndex incrementalIndex : incrementalIndices) {
incrementalIndex.close();
}
for (QueryableIndex queryableIndex : groupByIndices) {
queryableIndex.close();
}
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}
@Test
public void testSimpleDoubleAggregation()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(new LongSumAggregatorFactory("totalSum", "metASum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.build();
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"dimB", "sour",
"totalSum", 2000L
);
ResultRow expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"dimB", "sweet",
"totalSum", 6000L
);
Assert.assertEquals(2, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
Assert.assertEquals(expectedRow1, results.get(1));
}
@Test
public void testNestedQueryWithRenamedDimensions()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "newDimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("newDimB", "renamedDimB"))
.setAggregatorSpecs(new LongMaxAggregatorFactory("maxBSum", "metBSum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.build();
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"renamedDimB", "sour",
"maxBSum", 20L
);
ResultRow expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"renamedDimB", "sweet",
"maxBSum", 60L
);
Assert.assertEquals(2, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
Assert.assertEquals(expectedRow1, results.get(1));
}
@Test
public void testDimensionFilterOnOuterAndInnerQueries()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.setDimFilter(new JavaScriptDimFilter(
"dimA",
"function(dim){ return dim == 'mango' }",
null,
JavaScriptConfig.getEnabledInstance()
))
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setDimensions(new DefaultDimensionSpec("dimA", "newDimA"))
.setAggregatorSpecs(new LongSumAggregatorFactory("finalSum", "metASum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.setDimFilter(new JavaScriptDimFilter(
"dimA",
"function(dim){ return dim == 'pomegranate' }",
null,
JavaScriptConfig.getEnabledInstance()
))
.setQuerySegmentSpec(intervalSpec)
.build();
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
Assert.assertEquals(0, results.size());
}
@Test
public void testDimensionFilterOnOuterQuery()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setDimensions(new DefaultDimensionSpec("dimA", "newDimA"))
.setAggregatorSpecs(new LongSumAggregatorFactory("finalSum", "metASum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.setDimFilter(new JavaScriptDimFilter(
"dimA",
"function(dim){ return dim == 'mango' }",
null,
JavaScriptConfig.getEnabledInstance()
))
.setQuerySegmentSpec(intervalSpec)
.build();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"finalSum", 4000L,
"newDimA", "mango"
);
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
}
@Test
public void testDimensionFilterOnInnerQuery()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.setDimFilter(new JavaScriptDimFilter(
"dimA",
"function(dim){ return dim == 'mango' }",
null,
JavaScriptConfig.getEnabledInstance()
))
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setDimensions(new DefaultDimensionSpec("dimA", "newDimA"))
.setAggregatorSpecs(new LongSumAggregatorFactory("finalSum", "metASum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"finalSum", 4000L,
"newDimA", "mango"
);
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
}
@Test
public void testSubqueryWithExtractionFnInOuterQuery()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setDimensions(new ExtractionDimensionSpec("dimA", "extractedDimA", new RegexDimExtractionFn("^(p)", true,
"replacement"
)))
.setAggregatorSpecs(new LongSumAggregatorFactory("finalSum", "metASum"))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"finalSum", 4000L,
"extractedDimA", "p"
);
ResultRow expectedRow1 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"finalSum", 4000L,
"extractedDimA", "replacement"
);
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
Assert.assertEquals(2, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
Assert.assertEquals(expectedRow1, results.get(1));
}
@Test
public void testHavingClauseInNestedPushDownQuery()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
GroupByQuery innerQuery = GroupByQuery
.builder()
.setDataSource("blah")
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(innerQuery)
.setDimensions(new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(new LongSumAggregatorFactory("finalSum", "metBSum"))
.setHavingSpec(new GreaterThanHavingSpec("finalSum", 70L))
.setContext(
ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true
)
)
.setGranularity(Granularities.ALL)
.setQuerySegmentSpec(intervalSpec)
.build();
ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
nestedQuery,
"2017-07-14T02:40:00.000Z",
"dimB", "sweet",
"finalSum", 90L
);
Sequence<ResultRow> queryResult = runNestedQueryWithForcePushDown(nestedQuery);
List<ResultRow> results = queryResult.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(expectedRow0, results.get(0));
}
private Sequence<ResultRow> runNestedQueryWithForcePushDown(GroupByQuery nestedQuery)
{
ResponseContext context = ResponseContext.createEmpty();
QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
GroupByQuery pushDownQuery = nestedQuery;
QueryRunner<ResultRow> segment1Runner = new FinalizeResultsQueryRunner<ResultRow>(
toolChest.mergeResults(
groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1())
),
(QueryToolChest) toolChest
);
QueryRunner<ResultRow> segment2Runner = new FinalizeResultsQueryRunner<ResultRow>(
toolChest.mergeResults(
groupByFactory2.mergeRunners(executorService, getQueryRunnerForSegment2())
),
(QueryToolChest) toolChest
);
QueryRunner<ResultRow> queryRunnerForSegments = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
(queryPlus, responseContext) -> Sequences
.simple(
ImmutableList.of(
Sequences.map(
segment1Runner.run(queryPlus, responseContext),
toolChest.makePreComputeManipulatorFn(
(GroupByQuery) queryPlus.getQuery(),
MetricManipulatorFns.deserializing()
)
),
Sequences.map(
segment2Runner.run(queryPlus, responseContext),
toolChest.makePreComputeManipulatorFn(
(GroupByQuery) queryPlus.getQuery(),
MetricManipulatorFns.deserializing()
)
)
)
)
.flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering())
),
(QueryToolChest) toolChest
);
GroupByStrategy strategy = ((GroupByQueryRunnerFactory) groupByFactory).getStrategySelector()
.strategize(nestedQuery);
// Historicals execute the query with force push down flag as false
GroupByQuery queryWithPushDownDisabled = pushDownQuery.withOverriddenContext(ImmutableMap.of(
GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY,
false
));
Sequence<ResultRow> pushDownQueryResults = strategy.mergeResults(
queryRunnerForSegments,
queryWithPushDownDisabled,
context
);
return toolChest.mergeResults((queryPlus, responseContext) -> pushDownQueryResults)
.run(QueryPlus.wrap(nestedQuery), context);
}
@Test
public void testQueryRewriteForPushDown()
{
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.utc(1500000000000L, 1600000000000L))
);
String outputNameB = "dimBOutput";
String outputNameAgg = "totalSum";
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("dimA", "dimA"), new DefaultDimensionSpec("dimB", "dimB"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("metASum", "metA"),
new LongSumAggregatorFactory("metBSum", "metB")
)
.setGranularity(Granularities.ALL)
.build();
GroupByQuery nestedQuery = GroupByQuery
.builder()
.setDataSource(query)
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("dimB", outputNameB))
.setAggregatorSpecs(new LongSumAggregatorFactory(outputNameAgg, "metASum"))
.setContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, true))
.setGranularity(Granularities.ALL)
.build();
QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
GroupByQuery rewrittenQuery = ((GroupByQueryQueryToolChest) toolChest).rewriteNestedQueryForPushDown(nestedQuery);
Assert.assertEquals(outputNameB, rewrittenQuery.getDimensions().get(0).getDimension());
Assert.assertEquals(outputNameAgg, rewrittenQuery.getAggregatorSpecs().get(0).getName());
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<>(
new BySegmentQueryRunner<>(segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter)),
(QueryToolChest<T, Query<T>>) factory.getToolchest()
);
}
private List<QueryRunner<ResultRow>> getQueryRunnerForSegment1()
{
List<QueryRunner<ResultRow>> runners = new ArrayList<>();
QueryableIndex index = groupByIndices.get(0);
QueryRunner<ResultRow> runner = makeQueryRunnerForSegment(
groupByFactory,
SegmentId.dummy(index.toString()),
new QueryableIndexSegment(index, SegmentId.dummy(index.toString()))
);
runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner));
return runners;
}
private List<QueryRunner<ResultRow>> getQueryRunnerForSegment2()
{
List<QueryRunner<ResultRow>> runners = new ArrayList<>();
QueryableIndex index2 = groupByIndices.get(1);
QueryRunner<ResultRow> tooSmallRunner = makeQueryRunnerForSegment(
groupByFactory2,
SegmentId.dummy(index2.toString()),
new QueryableIndexSegment(index2, SegmentId.dummy(index2.toString()))
);
runners.add(groupByFactory2.getToolchest().preMergeQueryDecoration(tooSmallRunner));
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
private static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunnerForSegment(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<>(
new BySegmentQueryRunner<>(segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter)),
(QueryToolChest<T, Query<T>>) factory.getToolchest()
);
}
public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {};
}