| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.benchmark.query; |
| |
| import com.fasterxml.jackson.databind.InjectableValues; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.dataformat.smile.SmileFactory; |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Suppliers; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.druid.collections.BlockingPool; |
| import org.apache.druid.collections.DefaultBlockingPool; |
| import org.apache.druid.collections.NonBlockingPool; |
| import org.apache.druid.collections.StupidPool; |
| import org.apache.druid.common.config.NullHandling; |
| import org.apache.druid.data.input.InputRow; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.math.expr.ExprMacroTable; |
| import org.apache.druid.offheap.OffheapBufferGenerator; |
| import org.apache.druid.query.DruidProcessingConfig; |
| import org.apache.druid.query.FinalizeResultsQueryRunner; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryPlus; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.QueryRunnerFactory; |
| import org.apache.druid.query.QueryToolChest; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.expression.TestExprMacroTable; |
| import org.apache.druid.query.filter.BoundDimFilter; |
| import org.apache.druid.query.groupby.GroupByQuery; |
| import org.apache.druid.query.groupby.GroupByQueryConfig; |
| import org.apache.druid.query.groupby.GroupByQueryEngine; |
| import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; |
| import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; |
| import org.apache.druid.query.groupby.ResultRow; |
| import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; |
| import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; |
| import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; |
| import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; |
| import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; |
| import org.apache.druid.query.ordering.StringComparators; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.segment.IncrementalIndexSegment; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.IndexMergerV9; |
| import org.apache.druid.segment.IndexSpec; |
| import org.apache.druid.segment.QueryableIndex; |
| import org.apache.druid.segment.QueryableIndexSegment; |
| import org.apache.druid.segment.column.ColumnConfig; |
| import org.apache.druid.segment.column.ValueType; |
| import org.apache.druid.segment.generator.DataGenerator; |
| import org.apache.druid.segment.generator.GeneratorBasicSchemas; |
| import org.apache.druid.segment.generator.GeneratorSchemaInfo; |
| import org.apache.druid.segment.incremental.IncrementalIndex; |
| import org.apache.druid.segment.incremental.IncrementalIndexSchema; |
| import org.apache.druid.segment.serde.ComplexMetrics; |
| import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; |
| import org.apache.druid.timeline.SegmentId; |
| import org.openjdk.jmh.annotations.Benchmark; |
| import org.openjdk.jmh.annotations.BenchmarkMode; |
| import org.openjdk.jmh.annotations.Fork; |
| import org.openjdk.jmh.annotations.Level; |
| import org.openjdk.jmh.annotations.Measurement; |
| import org.openjdk.jmh.annotations.Mode; |
| import org.openjdk.jmh.annotations.OutputTimeUnit; |
| import org.openjdk.jmh.annotations.Param; |
| import org.openjdk.jmh.annotations.Scope; |
| import org.openjdk.jmh.annotations.Setup; |
| import org.openjdk.jmh.annotations.State; |
| import org.openjdk.jmh.annotations.TearDown; |
| import org.openjdk.jmh.annotations.Warmup; |
| import org.openjdk.jmh.infra.Blackhole; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| @State(Scope.Benchmark) |
| @Fork(value = 2) |
| @Warmup(iterations = 10) |
| @Measurement(iterations = 25) |
| public class GroupByBenchmark |
| { |
| @Param({"4"}) |
| private int numSegments; |
| |
| @Param({"2", "4"}) |
| private int numProcessingThreads; |
| |
| @Param({"-1"}) |
| private int initialBuckets; |
| |
| @Param({"100000"}) |
| private int rowsPerSegment; |
| |
| @Param({"basic.A", "basic.nested"}) |
| private String schemaAndQuery; |
| |
| @Param({"v1", "v2"}) |
| private String defaultStrategy; |
| |
| @Param({"all", "day"}) |
| private String queryGranularity; |
| |
| @Param({"force", "false"}) |
| private String vectorize; |
| |
| private static final Logger log = new Logger(GroupByBenchmark.class); |
| private static final int RNG_SEED = 9999; |
| private static final IndexMergerV9 INDEX_MERGER_V9; |
| private static final IndexIO INDEX_IO; |
| public static final ObjectMapper JSON_MAPPER; |
| |
| static { |
| NullHandling.initializeForTests(); |
| } |
| |
| private File tmpDir; |
| private IncrementalIndex anIncrementalIndex; |
| private List<QueryableIndex> queryableIndexes; |
| |
| private QueryRunnerFactory<ResultRow, GroupByQuery> factory; |
| |
| private GeneratorSchemaInfo schemaInfo; |
| private GroupByQuery query; |
| |
| private ExecutorService executorService; |
| |
| static { |
| JSON_MAPPER = new DefaultObjectMapper(); |
| INDEX_IO = new IndexIO( |
| JSON_MAPPER.setInjectableValues( |
| new InjectableValues.Std() |
| .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) |
| .addValue(ObjectMapper.class.getName(), JSON_MAPPER) |
| ), |
| new ColumnConfig() |
| { |
| @Override |
| public int columnCacheSizeBytes() |
| { |
| return 0; |
| } |
| } |
| ); |
| INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); |
| } |
| |
| private static final Map<String, Map<String, GroupByQuery>> SCHEMA_QUERY_MAP = new LinkedHashMap<>(); |
| |
| private void setupQueries() |
| { |
| // queries for the basic schema |
| Map<String, GroupByQuery> basicQueries = new LinkedHashMap<>(); |
| GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic"); |
| |
| { // basic.A |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new CountAggregatorFactory("cnt")); |
| queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", null), new DefaultDimensionSpec("dimZipf", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| basicQueries.put("A", queryA); |
| } |
| |
| { // basic.sorted |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", null), new DefaultDimensionSpec("dimZipf", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setLimitSpec( |
| new DefaultLimitSpec( |
| Collections.singletonList( |
| new OrderByColumnSpec( |
| "sumLongSequential", |
| OrderByColumnSpec.Direction.DESCENDING, |
| StringComparators.NUMERIC |
| ) |
| ), |
| 100 |
| ) |
| ) |
| .build(); |
| |
| basicQueries.put("sorted", queryA); |
| } |
| |
| { // basic.nested |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new LongSumAggregatorFactory( |
| "sumLongSequential", |
| "sumLongSequential" |
| )); |
| |
| GroupByQuery subqueryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", null), new DefaultDimensionSpec("dimZipf", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularities.DAY) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource(subqueryA) |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularities.WEEK) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| basicQueries.put("nested", queryA); |
| } |
| |
| { // basic.filter |
| final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(basicSchema.getDataInterval()) |
| ); |
| // Use multiple aggregators to see how the number of aggregators impact to the query performance |
| List<AggregatorFactory> queryAggs = ImmutableList.of( |
| new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"), |
| new LongSumAggregatorFactory("rows", "rows"), |
| new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"), |
| new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf") |
| ); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimUniform", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setDimFilter(new BoundDimFilter("dimUniform", "0", "100", true, true, null, null, null)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| basicQueries.put("filter", queryA); |
| } |
| |
| { // basic.singleZipf |
| final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( |
| Collections.singletonList(basicSchema.getDataInterval()) |
| ); |
| // Use multiple aggregators to see how the number of aggregators impact to the query performance |
| List<AggregatorFactory> queryAggs = ImmutableList.of( |
| new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"), |
| new LongSumAggregatorFactory("rows", "rows"), |
| new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"), |
| new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf") |
| ); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimZipf", null)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| basicQueries.put("singleZipf", queryA); |
| } |
| SCHEMA_QUERY_MAP.put("basic", basicQueries); |
| |
| // simple one column schema, for testing performance difference between querying on numeric values as Strings and |
| // directly as longs |
| Map<String, GroupByQuery> simpleQueries = new LinkedHashMap<>(); |
| GeneratorSchemaInfo simpleSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simple"); |
| |
| { // simple.A |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new LongSumAggregatorFactory( |
| "rows", |
| "rows" |
| )); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", "dimSequential", ValueType.STRING)) |
| .setAggregatorSpecs( |
| queryAggs |
| ) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| simpleQueries.put("A", queryA); |
| } |
| SCHEMA_QUERY_MAP.put("simple", simpleQueries); |
| |
| |
| Map<String, GroupByQuery> simpleLongQueries = new LinkedHashMap<>(); |
| GeneratorSchemaInfo simpleLongSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simpleLong"); |
| { // simpleLong.A |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleLongSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new LongSumAggregatorFactory( |
| "rows", |
| "rows" |
| )); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", "dimSequential", ValueType.LONG)) |
| .setAggregatorSpecs( |
| queryAggs |
| ) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| simpleLongQueries.put("A", queryA); |
| } |
| SCHEMA_QUERY_MAP.put("simpleLong", simpleLongQueries); |
| |
| |
| Map<String, GroupByQuery> simpleFloatQueries = new LinkedHashMap<>(); |
| GeneratorSchemaInfo simpleFloatSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simpleFloat"); |
| { // simpleFloat.A |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleFloatSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new LongSumAggregatorFactory( |
| "rows", |
| "rows" |
| )); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("dimSequential", "dimSequential", ValueType.FLOAT)) |
| .setAggregatorSpecs(queryAggs) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| simpleFloatQueries.put("A", queryA); |
| } |
| SCHEMA_QUERY_MAP.put("simpleFloat", simpleFloatQueries); |
| |
| // simple one column schema, for testing performance difference between querying on numeric values as Strings and |
| // directly as longs |
| Map<String, GroupByQuery> nullQueries = new LinkedHashMap<>(); |
| GeneratorSchemaInfo nullSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("nulls"); |
| |
| { // simple-null |
| QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(nullSchema.getDataInterval())); |
| List<AggregatorFactory> queryAggs = new ArrayList<>(); |
| queryAggs.add(new DoubleSumAggregatorFactory( |
| "doubleSum", |
| "doubleZipf" |
| )); |
| GroupByQuery queryA = GroupByQuery |
| .builder() |
| .setDataSource("blah") |
| .setQuerySegmentSpec(intervalSpec) |
| .setDimensions(new DefaultDimensionSpec("stringZipf", "stringZipf", ValueType.STRING)) |
| .setAggregatorSpecs( |
| queryAggs |
| ) |
| .setGranularity(Granularity.fromString(queryGranularity)) |
| .setContext(ImmutableMap.of("vectorize", vectorize)) |
| .build(); |
| |
| nullQueries.put("A", queryA); |
| } |
| SCHEMA_QUERY_MAP.put("nulls", nullQueries); |
| } |
| |
| @Setup(Level.Trial) |
| public void setup() throws IOException |
| { |
| log.info("SETUP CALLED AT " + +System.currentTimeMillis()); |
| |
| ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); |
| |
| executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]"); |
| |
| setupQueries(); |
| |
| String[] schemaQuery = schemaAndQuery.split("\\."); |
| String schemaName = schemaQuery[0]; |
| String queryName = schemaQuery[1]; |
| |
| schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName); |
| query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); |
| |
| final DataGenerator dataGenerator = new DataGenerator( |
| schemaInfo.getColumnSchemas(), |
| RNG_SEED + 1, |
| schemaInfo.getDataInterval(), |
| rowsPerSegment |
| ); |
| |
| tmpDir = FileUtils.createTempDir(); |
| log.info("Using temp dir: %s", tmpDir.getAbsolutePath()); |
| |
| // queryableIndexes -> numSegments worth of on-disk segments |
| // anIncrementalIndex -> the last incremental index |
| anIncrementalIndex = null; |
| queryableIndexes = new ArrayList<>(numSegments); |
| |
| for (int i = 0; i < numSegments; i++) { |
| log.info("Generating rows for segment %d/%d", i + 1, numSegments); |
| |
| final IncrementalIndex index = makeIncIndex(schemaInfo.isWithRollup()); |
| |
| for (int j = 0; j < rowsPerSegment; j++) { |
| final InputRow row = dataGenerator.nextRow(); |
| if (j % 20000 == 0) { |
| log.info("%,d/%,d rows generated.", i * rowsPerSegment + j, rowsPerSegment * numSegments); |
| } |
| index.add(row); |
| } |
| |
| log.info( |
| "%,d/%,d rows generated, persisting segment %d/%d.", |
| (i + 1) * rowsPerSegment, |
| rowsPerSegment * numSegments, |
| i + 1, |
| numSegments |
| ); |
| |
| final File file = INDEX_MERGER_V9.persist( |
| index, |
| new File(tmpDir, String.valueOf(i)), |
| new IndexSpec(), |
| null |
| ); |
| |
| queryableIndexes.add(INDEX_IO.loadIndex(file)); |
| |
| if (i == numSegments - 1) { |
| anIncrementalIndex = index; |
| } else { |
| index.close(); |
| } |
| } |
| |
| NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>( |
| "GroupByBenchmark-computeBufferPool", |
| new OffheapBufferGenerator("compute", 250_000_000), |
| 0, |
| Integer.MAX_VALUE |
| ); |
| |
| // limit of 2 is required since we simulate both historical merge and broker merge in the same process |
| BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>( |
| new OffheapBufferGenerator("merge", 250_000_000), |
| 2 |
| ); |
| final GroupByQueryConfig config = new GroupByQueryConfig() |
| { |
| @Override |
| public String getDefaultStrategy() |
| { |
| return defaultStrategy; |
| } |
| |
| @Override |
| public int getBufferGrouperInitialBuckets() |
| { |
| return initialBuckets; |
| } |
| |
| @Override |
| public long getMaxOnDiskStorage() |
| { |
| return 1_000_000_000L; |
| } |
| }; |
| config.setSingleThreaded(false); |
| config.setMaxIntermediateRows(Integer.MAX_VALUE); |
| config.setMaxResults(Integer.MAX_VALUE); |
| |
| DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig() |
| { |
| @Override |
| public int getNumThreads() |
| { |
| // Used by "v2" strategy for concurrencyHint |
| return numProcessingThreads; |
| } |
| |
| @Override |
| public String getFormatString() |
| { |
| return null; |
| } |
| }; |
| |
| final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config); |
| final GroupByStrategySelector strategySelector = new GroupByStrategySelector( |
| configSupplier, |
| new GroupByStrategyV1( |
| configSupplier, |
| new GroupByQueryEngine(configSupplier, bufferPool), |
| QueryBenchmarkUtil.NOOP_QUERYWATCHER, |
| bufferPool |
| ), |
| new GroupByStrategyV2( |
| druidProcessingConfig, |
| configSupplier, |
| bufferPool, |
| mergePool, |
| new ObjectMapper(new SmileFactory()), |
| QueryBenchmarkUtil.NOOP_QUERYWATCHER |
| ) |
| ); |
| |
| factory = new GroupByQueryRunnerFactory( |
| strategySelector, |
| new GroupByQueryQueryToolChest(strategySelector) |
| ); |
| } |
| |
| private IncrementalIndex makeIncIndex(boolean withRollup) |
| { |
| return new IncrementalIndex.Builder() |
| .setIndexSchema( |
| new IncrementalIndexSchema.Builder() |
| .withDimensionsSpec(schemaInfo.getDimensionsSpec()) |
| .withMetrics(schemaInfo.getAggsArray()) |
| .withRollup(withRollup) |
| .build() |
| ) |
| .setConcurrentEventAdd(true) |
| .setMaxRowCount(rowsPerSegment) |
| .buildOnheap(); |
| } |
| |
| @TearDown(Level.Trial) |
| public void tearDown() |
| { |
| try { |
| if (anIncrementalIndex != null) { |
| anIncrementalIndex.close(); |
| } |
| |
| if (queryableIndexes != null) { |
| for (QueryableIndex index : queryableIndexes) { |
| index.close(); |
| } |
| } |
| |
| if (tmpDir != null) { |
| FileUtils.deleteDirectory(tmpDir); |
| } |
| } |
| catch (IOException e) { |
| log.warn(e, "Failed to tear down, temp dir was: %s", tmpDir); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static <T> Sequence<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query) |
| { |
| QueryToolChest toolChest = factory.getToolchest(); |
| QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>( |
| toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), |
| toolChest |
| ); |
| |
| return theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.AverageTime) |
| @OutputTimeUnit(TimeUnit.MICROSECONDS) |
| public void querySingleIncrementalIndex(Blackhole blackhole) |
| { |
| QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner( |
| factory, |
| SegmentId.dummy("incIndex"), |
| new IncrementalIndexSegment(anIncrementalIndex, SegmentId.dummy("incIndex")) |
| ); |
| |
| final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query); |
| final ResultRow lastRow = results.accumulate( |
| null, |
| (accumulated, in) -> in |
| ); |
| |
| blackhole.consume(lastRow); |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.AverageTime) |
| @OutputTimeUnit(TimeUnit.MICROSECONDS) |
| public void querySingleQueryableIndex(Blackhole blackhole) |
| { |
| QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner( |
| factory, |
| SegmentId.dummy("qIndex"), |
| new QueryableIndexSegment(queryableIndexes.get(0), SegmentId.dummy("qIndex")) |
| ); |
| |
| final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query); |
| final ResultRow lastRow = results.accumulate( |
| null, |
| (accumulated, in) -> in |
| ); |
| |
| blackhole.consume(lastRow); |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.AverageTime) |
| @OutputTimeUnit(TimeUnit.MICROSECONDS) |
| public void queryMultiQueryableIndexX(Blackhole blackhole) |
| { |
| QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest(); |
| QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>( |
| toolChest.mergeResults( |
| factory.mergeRunners(executorService, makeMultiRunners()) |
| ), |
| (QueryToolChest) toolChest |
| ); |
| |
| Sequence<ResultRow> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); |
| List<ResultRow> results = queryResult.toList(); |
| blackhole.consume(results); |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.AverageTime) |
| @OutputTimeUnit(TimeUnit.MICROSECONDS) |
| public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole) |
| { |
| QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest(); |
| QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>( |
| toolChest.mergeResults( |
| factory.mergeRunners(executorService, makeMultiRunners()) |
| ), |
| (QueryToolChest) toolChest |
| ); |
| |
| final GroupByQuery spillingQuery = query.withOverriddenContext( |
| ImmutableMap.of("bufferGrouperMaxSize", 4000) |
| ); |
| Sequence<ResultRow> queryResult = theRunner.run(QueryPlus.wrap(spillingQuery), ResponseContext.createEmpty()); |
| List<ResultRow> results = queryResult.toList(); |
| blackhole.consume(results); |
| } |
| |
| @Benchmark |
| @BenchmarkMode(Mode.AverageTime) |
| @OutputTimeUnit(TimeUnit.MICROSECONDS) |
| public void queryMultiQueryableIndexWithSerde(Blackhole blackhole) |
| { |
| QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest(); |
| //noinspection unchecked |
| QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>( |
| toolChest.mergeResults( |
| new SerializingQueryRunner<>( |
| new DefaultObjectMapper(new SmileFactory()), |
| ResultRow.class, |
| toolChest.mergeResults( |
| factory.mergeRunners(executorService, makeMultiRunners()) |
| ) |
| ) |
| ), |
| (QueryToolChest) toolChest |
| ); |
| |
| Sequence<ResultRow> queryResult = theRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); |
| List<ResultRow> results = queryResult.toList(); |
| blackhole.consume(results); |
| } |
| |
| private List<QueryRunner<ResultRow>> makeMultiRunners() |
| { |
| List<QueryRunner<ResultRow>> runners = new ArrayList<>(); |
| for (int i = 0; i < numSegments; i++) { |
| String segmentName = "qIndex " + i; |
| QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner( |
| factory, |
| SegmentId.dummy(segmentName), |
| new QueryableIndexSegment(queryableIndexes.get(i), SegmentId.dummy(segmentName)) |
| ); |
| runners.add(factory.getToolchest().preMergeQueryDecoration(runner)); |
| } |
| return runners; |
| } |
| } |