blob: 24e3b6b9726783b26c53011103b48b5448227c9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.filter.sql;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.SqlType;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.expressions.BloomFilterExprMacro;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.http.SqlParameter;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
{
@Override
public DruidOperatorTable createOperatorTable()
{
CalciteTests.getJsonMapper().registerModule(new BloomFilterSerializersModule());
return new DruidOperatorTable(
ImmutableSet.of(),
ImmutableSet.of(new BloomFilterOperatorConversion())
);
}
@Override
public ExprMacroTable createMacroTable()
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();
for (Class<? extends ExprMacroTable.ExprMacro> clazz : ExpressionModule.EXPR_MACROS) {
exprMacros.add(CalciteTests.INJECTOR.getInstance(clazz));
}
exprMacros.add(CalciteTests.INJECTOR.getInstance(LookupExprMacro.class));
exprMacros.add(new BloomFilterExprMacro());
return new ExprMacroTable(exprMacros);
}
@Test
public void testBloomFilter() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Test
public void testBloomFilterExprFilter() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("a-foo");
filter.addString("-foo");
if (!NullHandling.replaceWithDefault()) {
filter.addBytes(null, 0, 0);
}
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
// fool the planner to make an expression virtual column to test bloom filter Druid expression
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE nullif(bloom_filter_test(concat(dim2, '-foo'), '%s'), 1) is null", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new ExpressionDimFilter(
StringUtils.format(
"case_searched(bloom_filter_test(concat(\"dim2\",'-foo'),'%s'),1,isnull(bloom_filter_test(concat(\"dim2\",'-foo'),'%s')))",
base64,
base64
),
null,
createMacroTable()
)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{5L}
)
);
}
@Test
public void testBloomFilterVirtualColumn() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def-foo");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(concat(dim1, '-foo'), '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns(expressionVirtualColumn("v0", "concat(\"dim1\",'-foo')", ValueType.STRING))
.filters(
new BloomDimFilter("v0", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Test
public void testBloomFilterVirtualColumnNumber() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addFloat(20.2f);
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(2 * CAST(dim1 AS float), '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns(
expressionVirtualColumn("v0", "(2 * CAST(\"dim1\", 'DOUBLE'))", ValueType.FLOAT)
)
.filters(
new BloomDimFilter("v0", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Test
public void testBloomFilters() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def");
BloomKFilter filter2 = new BloomKFilter(1500);
filter.addString("abc");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
byte[] bytes2 = BloomFilterSerializersModule.bloomKFilterToBytes(filter2);
String base64 = StringUtils.encodeBase64String(bytes);
String base642 = StringUtils.encodeBase64String(bytes2);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s') OR bloom_filter_test(dim2, '%s')", base64, base642),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new OrDimFilter(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null),
new BloomDimFilter("dim2", BloomKFilterHolder.fromBloomKFilter(filter2), null)
)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{2L}
)
);
}
@Ignore("this test is really slow and is intended to use for comparisons with testBloomFilterBigParameter")
@Test
public void testBloomFilterBigNoParam() throws Exception
{
BloomKFilter filter = new BloomKFilter(5_000_000);
filter.addString("def");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Ignore("this test is for comparison with testBloomFilterBigNoParam")
@Test
public void testBloomFilterBigParameter() throws Exception
{
BloomKFilter filter = new BloomKFilter(5_000_000);
filter.addString("def");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
"SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, ?)",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
),
ImmutableList.of(new SqlParameter(SqlType.VARCHAR, base64))
);
}
@Test
public void testBloomFilterNullParameter() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addBytes(null, 0, 0);
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
// bloom filter expression is evaluated and optimized out at planning time since parameter is null and null matches
// the supplied filter of the other parameter
testQuery(
"SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(?, ?)",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{6L}
),
// there are no empty strings in the druid expression language since empty is coerced into a null when parsed
ImmutableList.of(new SqlParameter(SqlType.VARCHAR, NullHandling.defaultStringValue()), new SqlParameter(SqlType.VARCHAR, base64))
);
}
}