| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.query.aggregation.datasketches.quantiles.sql; |
| |
| import com.fasterxml.jackson.databind.Module; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import org.apache.druid.common.config.NullHandling; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.math.expr.ExprMacroTable; |
| import org.apache.druid.query.Druids; |
| import org.apache.druid.query.QueryDataSource; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.FilteredAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.PostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToCDFPostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToHistogramPostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilePostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilesPostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator; |
| import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator; |
| import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; |
| import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; |
| import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.expression.TestExprMacroTable; |
| import org.apache.druid.query.filter.NotDimFilter; |
| import org.apache.druid.query.filter.SelectorDimFilter; |
| import org.apache.druid.query.groupby.GroupByQuery; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.segment.IndexBuilder; |
| import org.apache.druid.segment.QueryableIndex; |
| import org.apache.druid.segment.column.ValueType; |
| import org.apache.druid.segment.incremental.IncrementalIndexSchema; |
| import org.apache.druid.segment.virtual.ExpressionVirtualColumn; |
| import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; |
| import org.apache.druid.server.security.AuthenticationResult; |
| import org.apache.druid.sql.SqlLifecycle; |
| import org.apache.druid.sql.calcite.BaseCalciteQueryTest; |
| import org.apache.druid.sql.calcite.filtration.Filtration; |
| import org.apache.druid.sql.calcite.planner.DruidOperatorTable; |
| import org.apache.druid.sql.calcite.planner.PlannerConfig; |
| import org.apache.druid.sql.calcite.planner.PlannerContext; |
| import org.apache.druid.sql.calcite.util.CalciteTests; |
| import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; |
| import org.apache.druid.sql.http.SqlParameter; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.partition.LinearShardSpec; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| |
| public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest |
| { |
| private static final DruidOperatorTable OPERATOR_TABLE = new DruidOperatorTable( |
| ImmutableSet.of( |
| new DoublesSketchApproxQuantileSqlAggregator(), |
| new DoublesSketchObjectSqlAggregator() |
| ), |
| ImmutableSet.of( |
| new DoublesSketchQuantileOperatorConversion(), |
| new DoublesSketchQuantilesOperatorConversion(), |
| new DoublesSketchToHistogramOperatorConversion(), |
| new DoublesSketchRankOperatorConversion(), |
| new DoublesSketchCDFOperatorConversion(), |
| new DoublesSketchSummaryOperatorConversion() |
| ) |
| ); |
| |
| @Override |
| public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker() throws IOException |
| { |
| DoublesSketchModule.registerSerde(); |
| for (Module mod : new DoublesSketchModule().getJacksonModules()) { |
| CalciteTests.getJsonMapper().registerModule(mod); |
| } |
| |
| final QueryableIndex index = |
| IndexBuilder.create(CalciteTests.getJsonMapper()) |
| .tmpDir(temporaryFolder.newFolder()) |
| .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) |
| .schema( |
| new IncrementalIndexSchema.Builder() |
| .withMetrics( |
| new CountAggregatorFactory("cnt"), |
| new DoubleSumAggregatorFactory("m1", "m1"), |
| new DoublesSketchAggregatorFactory( |
| "qsketch_m1", |
| "m1", |
| 128 |
| ) |
| ) |
| .withRollup(false) |
| .build() |
| ) |
| .rows(CalciteTests.ROWS1) |
| .buildMMappedIndex(); |
| |
| return new SpecificSegmentsQuerySegmentWalker(conglomerate).add( |
| DataSegment.builder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .interval(index.getDataInterval()) |
| .version("1") |
| .shardSpec(new LinearShardSpec(0)) |
| .size(0) |
| .build(), |
| index |
| ); |
| } |
| |
| @Override |
| public List<Object[]> getResults( |
| final PlannerConfig plannerConfig, |
| final Map<String, Object> queryContext, |
| final List<SqlParameter> parameters, |
| final String sql, |
| final AuthenticationResult authenticationResult |
| ) throws Exception |
| { |
| return getResults( |
| plannerConfig, |
| queryContext, |
| parameters, |
| sql, |
| authenticationResult, |
| OPERATOR_TABLE, |
| CalciteTests.createExprMacroTable(), |
| CalciteTests.TEST_AUTHORIZER_MAPPER, |
| CalciteTests.getJsonMapper() |
| ); |
| } |
| |
| private SqlLifecycle getSqlLifecycle() |
| { |
| return getSqlLifecycleFactory( |
| BaseCalciteQueryTest.PLANNER_CONFIG_DEFAULT, |
| OPERATOR_TABLE, |
| CalciteTests.createExprMacroTable(), |
| CalciteTests.TEST_AUTHORIZER_MAPPER, |
| CalciteTests.getJsonMapper() |
| ).factorize(); |
| } |
| |
| @Test |
| public void testQuantileOnFloatAndLongs() throws Exception |
| { |
| testQuery( |
| "SELECT\n" |
| + "APPROX_QUANTILE_DS(m1, 0.01),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.99),\n" |
| + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" |
| + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n" |
| + "APPROX_QUANTILE_DS(cnt, 0.5)\n" |
| + "FROM foo", |
| Collections.singletonList( |
| Druids.newTimeseriesQueryBuilder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .granularity(Granularities.ALL) |
| .virtualColumns( |
| new ExpressionVirtualColumn( |
| "v0", |
| "(\"m1\" * 2)", |
| ValueType.FLOAT, |
| TestExprMacroTable.INSTANCE |
| ) |
| ) |
| .aggregators(ImmutableList.of( |
| new DoublesSketchAggregatorFactory("a0:agg", "m1", null), |
| new DoublesSketchAggregatorFactory("a1:agg", "m1", 64), |
| new DoublesSketchAggregatorFactory("a2:agg", "m1", 256), |
| new DoublesSketchAggregatorFactory("a4:agg", "v0", null), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a5:agg", "m1", null), |
| new SelectorDimFilter("dim1", "abc", null) |
| ), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a6:agg", "m1", null), |
| new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) |
| ), |
| new DoublesSketchAggregatorFactory("a8:agg", "cnt", null) |
| )) |
| .postAggregators( |
| new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), |
| new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), |
| new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), |
| new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), |
| new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), |
| new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f), |
| new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f) |
| ) |
| .context(TIMESERIES_CONTEXT_DEFAULT) |
| .build() |
| ), |
| ImmutableList.of( |
| new Object[]{ |
| 1.0, |
| 4.0, |
| 6.0, |
| 6.0, |
| 12.0, |
| 6.0, |
| 5.0, |
| 6.0, |
| 1.0 |
| } |
| ) |
| ); |
| } |
| |
| @Test |
| public void testQuantileOnComplexColumn() throws Exception |
| { |
| testQuery( |
| "SELECT\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n" |
| + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n" |
| + "FROM foo", |
| ImmutableList.of( |
| Druids.newTimeseriesQueryBuilder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .granularity(Granularities.ALL) |
| .aggregators(ImmutableList.of( |
| new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null), |
| new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64), |
| new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null), |
| new SelectorDimFilter("dim1", "abc", null) |
| ), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null), |
| new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) |
| ) |
| )) |
| .postAggregators( |
| new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), |
| new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), |
| new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), |
| new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f), |
| new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f) |
| ) |
| .context(TIMESERIES_CONTEXT_DEFAULT) |
| .build() |
| ), |
| ImmutableList.of( |
| new Object[]{ |
| 1.0, |
| 4.0, |
| 6.0, |
| 6.0, |
| 6.0, |
| 5.0, |
| 6.0 |
| } |
| ) |
| ); |
| } |
| |
| @Test |
| public void testQuantileOnCastedString() throws Exception |
| { |
| final List<Object[]> expectedResults; |
| if (NullHandling.replaceWithDefault()) { |
| expectedResults = ImmutableList.of( |
| new Object[]{ |
| 0.0, |
| 1.0, |
| 10.1, |
| 10.1, |
| 20.2, |
| 0.0, |
| 10.1, |
| 0.0 |
| } |
| ); |
| } else { |
| expectedResults = ImmutableList.of( |
| new Object[]{ |
| 1.0, |
| 2.0, |
| 10.1, |
| 10.1, |
| 20.2, |
| Double.NaN, |
| 10.1, |
| Double.NaN |
| } |
| ); |
| } |
| |
| testQuery( |
| "SELECT\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.01),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.5, 64),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.98, 256),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.99),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE) * 2, 0.97),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.99) FILTER(WHERE dim2 = 'abc'),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.999) FILTER(WHERE dim2 <> 'abc'),\n" |
| + "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.999) FILTER(WHERE dim2 = 'abc')\n" |
| + "FROM foo", |
| ImmutableList.of( |
| Druids.newTimeseriesQueryBuilder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .granularity(Granularities.ALL) |
| .virtualColumns( |
| new ExpressionVirtualColumn( |
| "v0", |
| "CAST(\"dim1\", 'DOUBLE')", |
| ValueType.FLOAT, |
| TestExprMacroTable.INSTANCE |
| ), |
| new ExpressionVirtualColumn( |
| "v1", |
| "(CAST(\"dim1\", 'DOUBLE') * 2)", |
| ValueType.FLOAT, |
| TestExprMacroTable.INSTANCE |
| ) |
| ) |
| .aggregators(ImmutableList.of( |
| new DoublesSketchAggregatorFactory("a0:agg", "v0", 128), |
| new DoublesSketchAggregatorFactory("a1:agg", "v0", 64), |
| new DoublesSketchAggregatorFactory("a2:agg", "v0", 256), |
| new DoublesSketchAggregatorFactory("a4:agg", "v1", 128), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a5:agg", "v0", 128), |
| new SelectorDimFilter("dim2", "abc", null) |
| ), |
| new FilteredAggregatorFactory( |
| new DoublesSketchAggregatorFactory("a6:agg", "v0", 128), |
| new NotDimFilter(new SelectorDimFilter("dim2", "abc", null)) |
| ) |
| )) |
| .postAggregators( |
| new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), |
| new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f), |
| new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f), |
| new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f), |
| new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f), |
| new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f), |
| new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f) |
| ) |
| .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) |
| .build() |
| ), |
| expectedResults |
| ); |
| } |
| |
| @Test |
| public void testQuantileOnInnerQuery() throws Exception |
| { |
| final List<Object[]> expectedResults; |
| if (NullHandling.replaceWithDefault()) { |
| expectedResults = ImmutableList.of(new Object[]{7.0, 11.0}); |
| } else { |
| expectedResults = ImmutableList.of(new Object[]{5.25, 8.0}); |
| } |
| |
| testQuery( |
| "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n" |
| + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)", |
| Collections.singletonList( |
| GroupByQuery.builder() |
| .setDataSource( |
| new QueryDataSource( |
| GroupByQuery.builder() |
| .setDataSource(CalciteTests.DATASOURCE1) |
| .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .setGranularity(Granularities.ALL) |
| .setDimensions(new DefaultDimensionSpec("dim2", "d0")) |
| .setAggregatorSpecs( |
| ImmutableList.of( |
| new DoubleSumAggregatorFactory("a0", "m1") |
| ) |
| ) |
| .setContext(QUERY_CONTEXT_DEFAULT) |
| .build() |
| ) |
| ) |
| .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .setGranularity(Granularities.ALL) |
| .setAggregatorSpecs( |
| new DoubleSumAggregatorFactory("_a0:sum", "a0"), |
| new CountAggregatorFactory("_a0:count"), |
| new DoublesSketchAggregatorFactory( |
| "_a1:agg", |
| "a0", |
| null |
| ) |
| ) |
| .setPostAggregatorSpecs( |
| ImmutableList.of( |
| new ArithmeticPostAggregator( |
| "_a0", |
| "quotient", |
| ImmutableList.of( |
| new FieldAccessPostAggregator(null, "_a0:sum"), |
| new FieldAccessPostAggregator(null, "_a0:count") |
| ) |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "_a1", |
| makeFieldAccessPostAgg("_a1:agg"), |
| 0.98f |
| ) |
| ) |
| ) |
| .setContext(QUERY_CONTEXT_DEFAULT) |
| .build() |
| ), |
| expectedResults |
| ); |
| } |
| |
| @Test |
| public void testQuantileOnInnerQuantileQuery() throws Exception |
| { |
| ImmutableList.Builder<Object[]> builder = ImmutableList.builder(); |
| builder.add(new Object[]{"", 1.0}); |
| builder.add(new Object[]{"1", 4.0}); |
| builder.add(new Object[]{"10.1", 2.0}); |
| builder.add(new Object[]{"2", 3.0}); |
| builder.add(new Object[]{"abc", 6.0}); |
| builder.add(new Object[]{"def", 5.0}); |
| final List<Object[]> expectedResults = builder.build(); |
| |
| testQuery( |
| "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n" |
| + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1", |
| Collections.singletonList( |
| GroupByQuery.builder() |
| .setDataSource( |
| new QueryDataSource( |
| GroupByQuery.builder() |
| .setDataSource(CalciteTests.DATASOURCE1) |
| .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .setGranularity(Granularities.ALL) |
| .setDimensions( |
| new DefaultDimensionSpec("dim1", "d0"), |
| new DefaultDimensionSpec("dim2", "d1") |
| ) |
| .setAggregatorSpecs( |
| ImmutableList.of( |
| new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) |
| ) |
| ) |
| .setPostAggregatorSpecs( |
| ImmutableList.of( |
| new DoublesSketchToQuantilePostAggregator( |
| "a0", |
| makeFieldAccessPostAgg("a0:agg"), |
| 0.5f |
| ) |
| ) |
| ) |
| .setContext(QUERY_CONTEXT_DEFAULT) |
| .build() |
| ) |
| ) |
| .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .setGranularity(Granularities.ALL) |
| .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING)) |
| .setAggregatorSpecs( |
| new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128) |
| ) |
| .setPostAggregatorSpecs( |
| ImmutableList.of( |
| new DoublesSketchToQuantilePostAggregator( |
| "_a0", |
| makeFieldAccessPostAgg("_a0:agg"), |
| 0.5f |
| ) |
| ) |
| ) |
| .setContext(QUERY_CONTEXT_DEFAULT) |
| .build() |
| ), |
| expectedResults |
| ); |
| } |
| |
| @Test |
| public void testDoublesSketchPostAggs() throws Exception |
| { |
| testQuery( |
| "SELECT\n" |
| + " SUM(cnt),\n" |
| + " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n" |
| + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n" |
| + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n" |
| + " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n" |
| + " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n" |
| + " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" |
| + " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n" |
| + " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n" |
| + " -- The nonvectorized query uses a regular Aggregator, and the vectorized query uses a buffer-based\n" |
| + " -- VectorAggregator. The buffer-based aggregators return HeapCompactDoublesSketch instead of\n" |
| + " -- HeapUpdateDoublesSketch since they must make a copy out of the buffer before returning something.\n" |
| + " -- Use REPLACE to normalize summaries.\n" |
| + " REPLACE(" |
| + " REPLACE(" |
| + " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt))," |
| + " 'HeapCompactDoublesSketch'," |
| + " 'HeapUpdateDoublesSketch'" |
| + " )," |
| + " 'Combined Buffer Capacity : 6'," |
| + " 'Combined Buffer Capacity : 8'" |
| + " )\n" |
| + "FROM foo", |
| Collections.singletonList( |
| Druids.newTimeseriesQueryBuilder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .granularity(Granularities.ALL) |
| .virtualColumns( |
| new ExpressionVirtualColumn( |
| "v0", |
| "(\"cnt\" + 123)", |
| ValueType.FLOAT, |
| TestExprMacroTable.INSTANCE |
| ) |
| ) |
| .aggregators(ImmutableList.of( |
| new LongSumAggregatorFactory("a0", "cnt"), |
| new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128), |
| new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128), |
| new DoublesSketchAggregatorFactory("a3:agg", "v0", 128) |
| )) |
| .postAggregators( |
| new DoublesSketchToQuantilePostAggregator( |
| "a1", |
| makeFieldAccessPostAgg("a1:agg"), |
| 0.5f |
| ), |
| new ExpressionPostAggregator( |
| "p0", |
| "(\"a1\" + 1)", |
| null, |
| TestExprMacroTable.INSTANCE |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "p2", |
| new FieldAccessPostAggregator( |
| "p1", |
| "a2:agg" |
| ), |
| 0.5f |
| ), |
| new ExpressionPostAggregator( |
| "p3", |
| "(p2 + 1000)", |
| null, |
| TestExprMacroTable.INSTANCE |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "p5", |
| new FieldAccessPostAggregator( |
| "p4", |
| "a3:agg" |
| ), |
| 0.5f |
| ), |
| new ExpressionPostAggregator( |
| "p6", |
| "(p5 + 1000)", |
| null, |
| TestExprMacroTable.INSTANCE |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "p8", |
| new FieldAccessPostAggregator( |
| "p7", |
| "a2:agg" |
| ), |
| 0.5f |
| ), |
| new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE), |
| new DoublesSketchToQuantilesPostAggregator( |
| "p11", |
| new FieldAccessPostAggregator( |
| "p10", |
| "a2:agg" |
| ), |
| new double[]{0.5d, 0.8d} |
| ), |
| new DoublesSketchToHistogramPostAggregator( |
| "p13", |
| new FieldAccessPostAggregator( |
| "p12", |
| "a2:agg" |
| ), |
| new double[]{0.2d, 0.6d}, |
| null |
| ), |
| new DoublesSketchToRankPostAggregator( |
| "p15", |
| new FieldAccessPostAggregator( |
| "p14", |
| "a2:agg" |
| ), |
| 3.0d |
| ), |
| new DoublesSketchToCDFPostAggregator( |
| "p17", |
| new FieldAccessPostAggregator( |
| "p16", |
| "a2:agg" |
| ), |
| new double[]{0.2d, 0.6d} |
| ), |
| new DoublesSketchToStringPostAggregator( |
| "p19", |
| new FieldAccessPostAggregator( |
| "p18", |
| "a2:agg" |
| ) |
| ), |
| new ExpressionPostAggregator( |
| "p20", |
| "replace(replace(p19,'HeapCompactDoublesSketch','HeapUpdateDoublesSketch')," |
| + "'Combined Buffer Capacity : 6'," |
| + "'Combined Buffer Capacity : 8')", |
| null, |
| ExprMacroTable.nil() |
| ) |
| ) |
| .context(TIMESERIES_CONTEXT_DEFAULT) |
| .build() |
| ), |
| ImmutableList.of( |
| new Object[]{ |
| 6L, |
| 2.0d, |
| 1001.0d, |
| 1124.0d, |
| 1.0d, |
| "[1.0,1.0]", |
| "[0.0,0.0,6.0]", |
| 1.0d, |
| "[0.0,0.0,1.0]", |
| "\n" |
| + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n" |
| + " Empty : false\n" |
| + " Direct, Capacity bytes : false, \n" |
| + " Estimation Mode : false\n" |
| + " K : 128\n" |
| + " N : 6\n" |
| + " Levels (Needed, Total, Valid): 0, 0, 0\n" |
| + " Level Bit Pattern : 0\n" |
| + " BaseBufferCount : 6\n" |
| + " Combined Buffer Capacity : 8\n" |
| + " Retained Items : 6\n" |
| + " Compact Storage Bytes : 80\n" |
| + " Updatable Storage Bytes : 96\n" |
| + " Normalized Rank Error : 1.406%\n" |
| + " Normalized Rank Error (PMF) : 1.711%\n" |
| + " Min Value : 1.000000e+00\n" |
| + " Max Value : 1.000000e+00\n" |
| + "### END SKETCH SUMMARY\n" |
| } |
| ) |
| ); |
| } |
| |
| @Test |
| public void testDoublesSketchPostAggsPostSort() throws Exception |
| { |
| testQuery( |
| "SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (" |
| + "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10" |
| + ")", |
| Collections.singletonList( |
| Druids.newTimeseriesQueryBuilder() |
| .dataSource(CalciteTests.DATASOURCE1) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) |
| .granularity(Granularities.ALL) |
| .aggregators( |
| ImmutableList.of( |
| new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) |
| ) |
| ) |
| .postAggregators( |
| ImmutableList.of( |
| new FieldAccessPostAggregator("p0", "a0:agg"), |
| new DoublesSketchToQuantilePostAggregator( |
| "p2", |
| new FieldAccessPostAggregator("p1", "a0:agg"), |
| 0.5 |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "s1", |
| new FieldAccessPostAggregator("s0", "p0"), |
| 0.5 |
| ), |
| new DoublesSketchToQuantilePostAggregator( |
| "s3", |
| new FieldAccessPostAggregator("s2", "p0"), |
| 0.9800000190734863 |
| ) |
| ) |
| ) |
| .context(TIMESERIES_CONTEXT_DEFAULT) |
| .build() |
| ), |
| ImmutableList.of( |
| new Object[]{ |
| 4.0d, |
| 6.0d |
| } |
| ) |
| ); |
| } |
| |
| private static PostAggregator makeFieldAccessPostAgg(String name) |
| { |
| return new FieldAccessPostAggregator(name, name); |
| } |
| } |