blob: bf550312b22e661c7b1bb3a51c4cf44a63c0ba37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToCDFPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToHistogramPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilePostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilesPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
{
private static final DruidOperatorTable OPERATOR_TABLE = new DruidOperatorTable(
ImmutableSet.of(
new DoublesSketchApproxQuantileSqlAggregator(),
new DoublesSketchObjectSqlAggregator()
),
ImmutableSet.of(
new DoublesSketchQuantileOperatorConversion(),
new DoublesSketchQuantilesOperatorConversion(),
new DoublesSketchToHistogramOperatorConversion(),
new DoublesSketchRankOperatorConversion(),
new DoublesSketchCDFOperatorConversion(),
new DoublesSketchSummaryOperatorConversion()
)
);
@Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker() throws IOException
{
DoublesSketchModule.registerSerde();
for (Module mod : new DoublesSketchModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
}
final QueryableIndex index =
IndexBuilder.create(CalciteTests.getJsonMapper())
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new DoublesSketchAggregatorFactory(
"qsketch_m1",
"m1",
128
)
)
.withRollup(false)
.build()
)
.rows(CalciteTests.ROWS1)
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(index.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build(),
index
);
}
@Override
public DruidOperatorTable createOperatorTable()
{
return OPERATOR_TABLE;
}
@Test
public void testQuantileOnFloatAndLongs() throws Exception
{
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(m1, 0.5, 64),\n"
+ "APPROX_QUANTILE_DS(m1, 0.98, 256),\n"
+ "APPROX_QUANTILE_DS(m1, 0.99),\n"
+ "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n"
+ "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+ "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+ "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n"
+ "APPROX_QUANTILE_DS(cnt, 0.5)\n"
+ "FROM foo",
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"(\"m1\" * 2)",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
)
)
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
new DoublesSketchAggregatorFactory("a1:agg", "m1", 64),
new DoublesSketchAggregatorFactory("a2:agg", "m1", 256),
new DoublesSketchAggregatorFactory("a4:agg", "v0", null),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a5:agg", "m1", null),
new SelectorDimFilter("dim1", "abc", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a6:agg", "m1", null),
new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
),
new DoublesSketchAggregatorFactory("a8:agg", "cnt", null)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f),
new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
1.0,
4.0,
6.0,
6.0,
12.0,
6.0,
5.0,
6.0,
1.0
}
)
);
}
@Test
public void testQuantileOnComplexColumn() throws Exception
{
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
+ "FROM foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null),
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64),
new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null),
new SelectorDimFilter("dim1", "abc", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null),
new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f),
new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
1.0,
4.0,
6.0,
6.0,
6.0,
5.0,
6.0
}
)
);
}
@Test
public void testQuantileOnCastedString() throws Exception
{
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
new Object[]{
0.0,
1.0,
10.1,
10.1,
20.2,
0.0,
10.1,
0.0
}
);
} else {
expectedResults = ImmutableList.of(
new Object[]{
1.0,
2.0,
10.1,
10.1,
20.2,
Double.NaN,
10.1,
Double.NaN
}
);
}
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.01),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.5, 64),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.98, 256),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.99),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE) * 2, 0.97),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.99) FILTER(WHERE dim2 = 'abc'),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.999) FILTER(WHERE dim2 <> 'abc'),\n"
+ "APPROX_QUANTILE_DS(CAST(dim1 as DOUBLE), 0.999) FILTER(WHERE dim2 = 'abc')\n"
+ "FROM foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"CAST(\"dim1\", 'DOUBLE')",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
),
new ExpressionVirtualColumn(
"v1",
"(CAST(\"dim1\", 'DOUBLE') * 2)",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
)
)
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "v0", 128),
new DoublesSketchAggregatorFactory("a1:agg", "v0", 64),
new DoublesSketchAggregatorFactory("a2:agg", "v0", 256),
new DoublesSketchAggregatorFactory("a4:agg", "v1", 128),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a5:agg", "v0", 128),
new SelectorDimFilter("dim2", "abc", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a6:agg", "v0", 128),
new NotDimFilter(new SelectorDimFilter("dim2", "abc", null))
)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testQuantileOnInnerQuery() throws Exception
{
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(new Object[]{7.0, 11.0});
} else {
expectedResults = ImmutableList.of(new Object[]{5.25, 8.0});
}
testQuery(
"SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n"
+ "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)",
Collections.singletonList(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("dim2", "d0"))
.setAggregatorSpecs(
ImmutableList.of(
new DoubleSumAggregatorFactory("a0", "m1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count"),
new DoublesSketchAggregatorFactory(
"_a1:agg",
"a0",
null
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new ArithmeticPostAggregator(
"_a0",
"quotient",
ImmutableList.of(
new FieldAccessPostAggregator(null, "_a0:sum"),
new FieldAccessPostAggregator(null, "_a0:count")
)
),
new DoublesSketchToQuantilePostAggregator(
"_a1",
makeFieldAccessPostAgg("_a1:agg"),
0.98f
)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testQuantileOnInnerQuantileQuery() throws Exception
{
ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
builder.add(new Object[]{"", 1.0});
builder.add(new Object[]{"1", 4.0});
builder.add(new Object[]{"10.1", 2.0});
builder.add(new Object[]{"2", 3.0});
builder.add(new Object[]{"abc", 6.0});
builder.add(new Object[]{"def", 5.0});
final List<Object[]> expectedResults = builder.build();
testQuery(
"SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+ "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1",
Collections.singletonList(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator(
"a0",
makeFieldAccessPostAgg("a0:agg"),
0.5f
)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
.setAggregatorSpecs(
new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator(
"_a0",
makeFieldAccessPostAgg("_a0:agg"),
0.5f
)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testDoublesSketchPostAggs() throws Exception
{
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
+ " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
+ " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
+ " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n"
+ " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n"
+ " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+ " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n"
+ " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+ " -- The nonvectorized query uses a regular Aggregator, and the vectorized query uses a buffer-based\n"
+ " -- VectorAggregator. The buffer-based aggregators return HeapCompactDoublesSketch instead of\n"
+ " -- HeapUpdateDoublesSketch since they must make a copy out of the buffer before returning something.\n"
+ " -- Use REPLACE to normalize summaries.\n"
+ " REPLACE("
+ " REPLACE("
+ " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt)),"
+ " 'HeapCompactDoublesSketch',"
+ " 'HeapUpdateDoublesSketch'"
+ " ),"
+ " 'Combined Buffer Capacity : 6',"
+ " 'Combined Buffer Capacity : 8'"
+ " )\n"
+ "FROM foo",
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"(\"cnt\" + 123)",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
)
)
.aggregators(ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator(
"a1",
makeFieldAccessPostAgg("a1:agg"),
0.5f
),
new ExpressionPostAggregator(
"p0",
"(\"a1\" + 1)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p2",
new FieldAccessPostAggregator(
"p1",
"a2:agg"
),
0.5f
),
new ExpressionPostAggregator(
"p3",
"(p2 + 1000)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p5",
new FieldAccessPostAggregator(
"p4",
"a3:agg"
),
0.5f
),
new ExpressionPostAggregator(
"p6",
"(p5 + 1000)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p8",
new FieldAccessPostAggregator(
"p7",
"a2:agg"
),
0.5f
),
new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE),
new DoublesSketchToQuantilesPostAggregator(
"p11",
new FieldAccessPostAggregator(
"p10",
"a2:agg"
),
new double[]{0.5d, 0.8d}
),
new DoublesSketchToHistogramPostAggregator(
"p13",
new FieldAccessPostAggregator(
"p12",
"a2:agg"
),
new double[]{0.2d, 0.6d},
null
),
new DoublesSketchToRankPostAggregator(
"p15",
new FieldAccessPostAggregator(
"p14",
"a2:agg"
),
3.0d
),
new DoublesSketchToCDFPostAggregator(
"p17",
new FieldAccessPostAggregator(
"p16",
"a2:agg"
),
new double[]{0.2d, 0.6d}
),
new DoublesSketchToStringPostAggregator(
"p19",
new FieldAccessPostAggregator(
"p18",
"a2:agg"
)
),
new ExpressionPostAggregator(
"p20",
"replace(replace(p19,'HeapCompactDoublesSketch','HeapUpdateDoublesSketch'),"
+ "'Combined Buffer Capacity : 6',"
+ "'Combined Buffer Capacity : 8')",
null,
ExprMacroTable.nil()
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
6L,
2.0d,
1001.0d,
1124.0d,
1.0d,
"[1.0,1.0]",
"[0.0,0.0,6.0]",
1.0d,
"[0.0,0.0,1.0]",
"\n"
+ "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n"
+ " Empty : false\n"
+ " Direct, Capacity bytes : false, \n"
+ " Estimation Mode : false\n"
+ " K : 128\n"
+ " N : 6\n"
+ " Levels (Needed, Total, Valid): 0, 0, 0\n"
+ " Level Bit Pattern : 0\n"
+ " BaseBufferCount : 6\n"
+ " Combined Buffer Capacity : 8\n"
+ " Retained Items : 6\n"
+ " Compact Storage Bytes : 80\n"
+ " Updatable Storage Bytes : 96\n"
+ " Normalized Rank Error : 1.406%\n"
+ " Normalized Rank Error (PMF) : 1.711%\n"
+ " Min Value : 1.000000e+00\n"
+ " Max Value : 1.000000e+00\n"
+ "### END SKETCH SUMMARY\n"
}
)
);
}
@Test
public void testDoublesSketchPostAggsPostSort() throws Exception
{
testQuery(
"SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from ("
+ "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10"
+ ")",
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0:agg"),
new DoublesSketchToQuantilePostAggregator(
"p2",
new FieldAccessPostAggregator("p1", "a0:agg"),
0.5
),
new DoublesSketchToQuantilePostAggregator(
"s1",
new FieldAccessPostAggregator("s0", "p0"),
0.5
),
new DoublesSketchToQuantilePostAggregator(
"s3",
new FieldAccessPostAggregator("s2", "p0"),
0.9800000190734863
)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
4.0d,
6.0d
}
)
);
}
@Test
public void testEmptyTimeseriesResults() throws Exception
{
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
+ "DS_QUANTILES_SKETCH(m1),\n"
+ "DS_QUANTILES_SKETCH(qsketch_m1)\n"
+ "FROM foo WHERE dim2 = 0",
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null),
new DoublesSketchAggregatorFactory("a2:agg", "m1", null),
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
Double.NaN,
Double.NaN,
"0",
"0"
}
)
);
}
@Test
public void testGroupByAggregatorDefaultValues() throws Exception
{
testQuery(
"SELECT\n"
+ "dim2,\n"
+ "APPROX_QUANTILE_DS(m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "DS_QUANTILES_SKETCH(m1) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "DS_QUANTILES_SKETCH(qsketch_m1) FILTER(WHERE dim1 = 'nonexistent')\n"
+ "FROM foo WHERE dim2 = 'a' GROUP BY dim2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ValueType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ValueType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a2:agg", "m1", null),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null),
selector("dim1", "nonexistent", null)
)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
"a",
Double.NaN,
Double.NaN,
"0",
"0"
}
)
);
}
@Test
public void testFailWithSmallMaxStreamLength() throws Exception
{
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(
DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH,
1
);
testQueryThrows(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(cnt, 0.5)\n"
+ "FROM foo",
context,
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f)
)
.context(context)
.build()
),
expectedException -> {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
}
);
}
private static PostAggregator makeFieldAccessPostAgg(String name)
{
return new FieldAccessPostAggregator(name, name);
}
}