blob: 66a00784c8b1de6335a9ae6fac1b3046fab07479 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.druid;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPredicateList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.rules.AggregateExtractProjectRule;
import org.apache.calcite.rel.rules.AggregateFilterTransposeRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexExecutor;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSimplify;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.joda.time.Interval;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Rules and relational operators for {@link DruidQuery}.
*/
public class DruidRules {
private DruidRules() {}
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
public static final DruidFilterRule FILTER = DruidFilterRule.Config.DEFAULT.toRule();
public static final DruidProjectRule PROJECT = DruidProjectRule.Config.DEFAULT.toRule();
public static final DruidAggregateRule AGGREGATE = DruidAggregateRule.Config.DEFAULT.toRule();
public static final DruidAggregateProjectRule AGGREGATE_PROJECT =
DruidAggregateProjectRule.Config.DEFAULT
.toRule();
public static final DruidSortRule SORT = DruidSortRule.Config.DEFAULT.toRule();
/** Rule to push an {@link org.apache.calcite.rel.core.Sort} through a
* {@link org.apache.calcite.rel.core.Project}. Useful to transform
* to complex Druid queries. */
public static final SortProjectTransposeRule SORT_PROJECT_TRANSPOSE =
(SortProjectTransposeRule) SortProjectTransposeRule.Config.DEFAULT
.withOperandFor(Sort.class, Project.class, DruidQuery.class)
.withDescription("DruidSortProjectTransposeRule")
.toRule();
/** Rule to push a {@link org.apache.calcite.rel.core.Project}
* past a {@link org.apache.calcite.rel.core.Filter}
* when {@code Filter} is on top of a {@link DruidQuery}. */
public static final ProjectFilterTransposeRule PROJECT_FILTER_TRANSPOSE =
(ProjectFilterTransposeRule) ProjectFilterTransposeRule.Config.DEFAULT
.withOperandFor(Project.class, Filter.class, DruidQuery.class)
.withDescription("DruidProjectFilterTransposeRule")
.toRule();
/** Rule to push a {@link org.apache.calcite.rel.core.Filter}
* past a {@link org.apache.calcite.rel.core.Project}
* when {@code Project} is on top of a {@link DruidQuery}. */
public static final FilterProjectTransposeRule FILTER_PROJECT_TRANSPOSE =
(FilterProjectTransposeRule) FilterProjectTransposeRule.Config.DEFAULT
.withOperandFor(Filter.class, Project.class, DruidQuery.class)
.withCopyFilter(true)
.withCopyProject(true)
.withDescription("DruidFilterProjectTransposeRule")
.toRule();
/** Rule to push an {@link org.apache.calcite.rel.core.Aggregate}
* past a {@link org.apache.calcite.rel.core.Filter}
* when {@code Filter} is on top of a {@link DruidQuery}. */
public static final AggregateFilterTransposeRule AGGREGATE_FILTER_TRANSPOSE =
(AggregateFilterTransposeRule) AggregateFilterTransposeRule.Config.DEFAULT
.withOperandFor(Aggregate.class, Filter.class, DruidQuery.class)
.withDescription("DruidAggregateFilterTransposeRule")
.toRule();
/** Rule to push an {@link org.apache.calcite.rel.core.Filter}
* past an {@link org.apache.calcite.rel.core.Aggregate}
* when {@code Aggregate} is on top of a {@link DruidQuery}. */
public static final FilterAggregateTransposeRule FILTER_AGGREGATE_TRANSPOSE =
(FilterAggregateTransposeRule) FilterAggregateTransposeRule.Config.DEFAULT
.withOperandFor(Filter.class, Aggregate.class, DruidQuery.class)
.withDescription("DruidFilterAggregateTransposeRule")
.toRule();
public static final DruidPostAggregationProjectRule POST_AGGREGATION_PROJECT =
DruidPostAggregationProjectRule.Config.DEFAULT.toRule();
/** Rule to extract a {@link org.apache.calcite.rel.core.Project} from
* {@link org.apache.calcite.rel.core.Aggregate} on top of
* {@link org.apache.calcite.adapter.druid.DruidQuery} based on the fields
* used in the aggregate. */
public static final AggregateExtractProjectRule PROJECT_EXTRACT_RULE =
(AggregateExtractProjectRule) AggregateExtractProjectRule.Config.DEFAULT
.withOperandFor(Aggregate.class, DruidQuery.class)
.withDescription("DruidAggregateExtractProjectRule")
.toRule();
public static final DruidHavingFilterRule DRUID_HAVING_FILTER_RULE =
DruidHavingFilterRule.Config.DEFAULT
.toRule();
public static final List<RelOptRule> RULES =
ImmutableList.of(FILTER,
PROJECT_FILTER_TRANSPOSE,
AGGREGATE_FILTER_TRANSPOSE,
AGGREGATE_PROJECT,
PROJECT_EXTRACT_RULE,
PROJECT,
POST_AGGREGATION_PROJECT,
AGGREGATE,
FILTER_AGGREGATE_TRANSPOSE,
FILTER_PROJECT_TRANSPOSE,
SORT,
SORT_PROJECT_TRANSPOSE,
DRUID_HAVING_FILTER_RULE);
/**
* Rule to push a {@link org.apache.calcite.rel.core.Filter} into a
* {@link DruidQuery}.
*/
public static class DruidFilterRule
extends RelRule<DruidFilterRule.Config> {
/** Creates a DruidFilterRule. */
protected DruidFilterRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Filter filter = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = filter.getCluster();
final RelBuilder relBuilder = call.builder();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (!DruidQuery.isValidSignature(query.signature() + 'f')) {
return;
}
final List<RexNode> validPreds = new ArrayList<>();
final List<RexNode> nonValidPreds = new ArrayList<>();
final RexExecutor executor =
Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
final RelOptPredicateList predicates =
call.getMetadataQuery().getPulledUpPredicates(filter.getInput());
final RexSimplify simplify =
new RexSimplify(rexBuilder, predicates, executor);
final RexNode cond =
simplify.simplifyUnknownAsFalse(filter.getCondition());
for (RexNode e : RelOptUtil.conjunctions(cond)) {
DruidJsonFilter druidJsonFilter =
DruidJsonFilter.toDruidFilters(e, filter.getInput().getRowType(),
query, rexBuilder);
if (druidJsonFilter != null) {
validPreds.add(e);
} else {
nonValidPreds.add(e);
}
}
// Timestamp
int timestampFieldIdx =
query.getRowType().getFieldNames()
.indexOf(query.druidTable.timestampFieldName);
RelNode newDruidQuery = query;
final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple =
splitFilters(rexBuilder, query, validPreds, nonValidPreds, timestampFieldIdx);
if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) {
// it sucks, nothing to push
return;
}
final List<RexNode> residualPreds = new ArrayList<>(triple.getRight());
List<Interval> intervals = null;
if (!triple.getLeft().isEmpty()) {
final String timeZone = cluster.getPlanner().getContext()
.unwrap(CalciteConnectionConfig.class).timeZone();
assert timeZone != null;
intervals = DruidDateTimeUtils.createInterval(
RexUtil.composeConjunction(rexBuilder, triple.getLeft()));
if (intervals == null || intervals.isEmpty()) {
// Case we have a filter with extract that can not be written as interval push down
triple.getMiddle().addAll(triple.getLeft());
}
}
if (!triple.getMiddle().isEmpty()) {
final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
RexUtil.composeConjunction(rexBuilder, triple.getMiddle()));
newDruidQuery = DruidQuery.extendQuery(query, newFilter);
}
if (intervals != null && !intervals.isEmpty()) {
newDruidQuery = DruidQuery.extendQuery((DruidQuery) newDruidQuery, intervals);
}
if (!residualPreds.isEmpty()) {
newDruidQuery = relBuilder
.push(newDruidQuery)
.filter(residualPreds)
.build();
}
call.transformTo(newDruidQuery);
}
/**
* Given a list of conditions that contain Druid valid operations and
* a list that contains those that contain any non-supported operation,
* it outputs a triple with three different categories:
* 1-l) condition filters on the timestamp column,
* 2-m) condition filters that can be pushed to Druid,
* 3-r) condition filters that cannot be pushed to Druid.
*/
private static Triple<List<RexNode>, List<RexNode>, List<RexNode>> splitFilters(
final RexBuilder rexBuilder, final DruidQuery input, final List<RexNode> validPreds,
final List<RexNode> nonValidPreds, final int timestampFieldIdx) {
final List<RexNode> timeRangeNodes = new ArrayList<>();
final List<RexNode> pushableNodes = new ArrayList<>();
final List<RexNode> nonPushableNodes = new ArrayList<>(nonValidPreds);
// Number of columns with the dimensions and timestamp
for (RexNode conj : validPreds) {
final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
conj.accept(visitor);
if (visitor.inputPosReferenced.contains(timestampFieldIdx)
&& visitor.inputPosReferenced.size() == 1) {
timeRangeNodes.add(conj);
} else {
pushableNodes.add(conj);
}
}
return ImmutableTriple.of(timeRangeNodes, pushableNodes, nonPushableNodes);
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Filter.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidFilterRule.Config.class);
@Override default DruidFilterRule toRule() {
return new DruidFilterRule(this);
}
}
}
/** Rule to Push a Having {@link Filter} into a {@link DruidQuery}. */
public static class DruidHavingFilterRule
extends RelRule<DruidHavingFilterRule.Config> {
/** Creates a DruidHavingFilterRule. */
protected DruidHavingFilterRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Filter filter = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = filter.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (!DruidQuery.isValidSignature(query.signature() + 'h')) {
return;
}
final RexNode cond = filter.getCondition();
final DruidJsonFilter druidJsonFilter =
DruidJsonFilter.toDruidFilters(cond, query.getTopNode().getRowType(),
query, rexBuilder);
if (druidJsonFilter != null) {
final RelNode newFilter = filter
.copy(filter.getTraitSet(), Util.last(query.rels), filter.getCondition());
final DruidQuery newDruidQuery = DruidQuery.extendQuery(query, newFilter);
call.transformTo(newDruidQuery);
}
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Filter.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidHavingFilterRule.Config.class);
@Override default DruidHavingFilterRule toRule() {
return new DruidHavingFilterRule(this);
}
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Project} into a
* {@link DruidQuery}.
*/
public static class DruidProjectRule
extends RelRule<DruidProjectRule.Config> {
/** Creates a DruidProjectRule. */
protected DruidProjectRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Project project = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = project.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (!DruidQuery.isValidSignature(query.signature() + 'p')) {
return;
}
if (DruidQuery.computeProjectAsScan(project, query.getTable().getRowType(), query)
!= null) {
// All expressions can be pushed to Druid in their entirety.
final RelNode newProject = project.copy(project.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
RelNode newNode = DruidQuery.extendQuery(query, newProject);
call.transformTo(newNode);
return;
}
final Pair<List<RexNode>, List<RexNode>> pair =
splitProjects(rexBuilder, query, project.getProjects());
if (pair == null) {
// We can't push anything useful to Druid.
return;
}
final List<RexNode> above = pair.left;
final List<RexNode> below = pair.right;
final RelDataTypeFactory.Builder builder =
cluster.getTypeFactory().builder();
final RelNode input = Util.last(query.rels);
for (RexNode e : below) {
final String name;
if (e instanceof RexInputRef) {
name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex());
} else {
name = null;
}
builder.add(name, e.getType());
}
final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build());
final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above,
project.getRowType());
call.transformTo(newProject2);
}
private static Pair<List<RexNode>, List<RexNode>> splitProjects(
final RexBuilder rexBuilder, final RelNode input, List<RexNode> nodes) {
final RelOptUtil.InputReferencedVisitor visitor =
new RelOptUtil.InputReferencedVisitor();
visitor.visitEach(nodes);
if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) {
// All inputs are referenced
return null;
}
final List<RexNode> belowNodes = new ArrayList<>();
final List<RelDataType> belowTypes = new ArrayList<>();
final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced);
for (int i : positions) {
final RexNode node = rexBuilder.makeInputRef(input, i);
belowNodes.add(node);
belowTypes.add(node.getType());
}
final List<RexNode> aboveNodes = new RexShuttle() {
@Override public RexNode visitInputRef(RexInputRef ref) {
final int index = positions.indexOf(ref.getIndex());
return rexBuilder.makeInputRef(belowTypes.get(index), index);
}
}.visitList(nodes);
return Pair.of(aboveNodes, belowNodes);
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Project.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidProjectRule.Config.class);
@Override default DruidProjectRule toRule() {
return new DruidProjectRule(this);
}
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Project} into a
* {@link DruidQuery} as a Post aggregator.
*/
public static class DruidPostAggregationProjectRule
extends RelRule<DruidPostAggregationProjectRule.Config> {
/** Creates a DruidPostAggregationProjectRule. */
protected DruidPostAggregationProjectRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
Project project = call.rel(0);
DruidQuery query = call.rel(1);
if (!DruidQuery.isValidSignature(query.signature() + 'o')) {
return;
}
boolean hasRexCalls = false;
for (RexNode rexNode : project.getProjects()) {
if (rexNode instanceof RexCall) {
hasRexCalls = true;
break;
}
}
// Only try to push down Project when there will be Post aggregators in result DruidQuery
if (hasRexCalls) {
final RelNode topNode = query.getTopNode();
final Aggregate topAgg;
if (topNode instanceof Aggregate) {
topAgg = (Aggregate) topNode;
} else {
topAgg = (Aggregate) ((Filter) topNode).getInput();
}
for (RexNode rexNode : project.getProjects()) {
if (DruidExpressions.toDruidExpression(rexNode, topAgg.getRowType(), query) == null) {
return;
}
}
final RelNode newProject = project
.copy(project.getTraitSet(), ImmutableList.of(Util.last(query.rels)));
final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
call.transformTo(newQuery);
}
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Project.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidPostAggregationProjectRule.Config.class);
@Override default DruidPostAggregationProjectRule toRule() {
return new DruidPostAggregationProjectRule(this);
}
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Aggregate}
* into a {@link DruidQuery}.
*/
public static class DruidAggregateRule
extends RelRule<DruidAggregateRule.Config> {
/** Creates a DruidAggregateRule. */
protected DruidAggregateRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final DruidQuery query = call.rel(1);
final RelNode topDruidNode = query.getTopNode();
final Project project = topDruidNode instanceof Project ? (Project) topDruidNode : null;
if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
return;
}
if (aggregate.getGroupSets().size() != 1) {
return;
}
if (DruidQuery
.computeProjectGroupSet(project, aggregate.getGroupSet(), query.table.getRowType(), query)
== null) {
return;
}
final List<String> aggNames = Util
.skip(aggregate.getRowType().getFieldNames(), aggregate.getGroupSet().cardinality());
if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, project, query)
== null) {
return;
}
final RelNode newAggregate = aggregate
.copy(aggregate.getTraitSet(), ImmutableList.of(query.getTopNode()));
call.transformTo(DruidQuery.extendQuery(query, newAggregate));
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Aggregate.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidAggregateRule.Config.class);
@Override default DruidAggregateRule toRule() {
return new DruidAggregateRule(this);
}
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and
* {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
*/
public static class DruidAggregateProjectRule
extends RelRule<DruidAggregateProjectRule.Config> {
/** Creates a DruidAggregateProjectRule. */
protected DruidAggregateProjectRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final Project project = call.rel(1);
final DruidQuery query = call.rel(2);
if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
return;
}
if (aggregate.getGroupSets().size() != 1) {
return;
}
if (DruidQuery
.computeProjectGroupSet(project, aggregate.getGroupSet(), query.table.getRowType(), query)
== null) {
return;
}
final List<String> aggNames = Util
.skip(aggregate.getRowType().getFieldNames(), aggregate.getGroupSet().cardinality());
if (DruidQuery.computeDruidJsonAgg(aggregate.getAggCallList(), aggNames, project, query)
== null) {
return;
}
final RelNode newProject = project.copy(project.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
ImmutableList.of(newProject));
List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList());
final DruidQuery query2;
if (filterRefs.size() > 0) {
query2 = optimizeFilteredAggregations(call, query, (Project) newProject,
(Aggregate) newAggregate);
} else {
final DruidQuery query1 = DruidQuery.extendQuery(query, newProject);
query2 = DruidQuery.extendQuery(query1, newAggregate);
}
call.transformTo(query2);
}
/** Returns an array of unique filter references from the given list of
* {@link org.apache.calcite.rel.core.AggregateCall}s. */
private Set<Integer> getUniqueFilterRefs(List<AggregateCall> calls) {
Set<Integer> refs = new HashSet<>();
for (AggregateCall call : calls) {
if (call.hasFilter()) {
refs.add(call.filterArg);
}
}
return refs;
}
/**
* Attempts to optimize any aggregations with filters in the DruidQuery.
* Uses the following steps:
*
* <ol>
* <li>Tries to abstract common filters out into the "filter" field;
* <li>Eliminates expressions that are always true or always false when
* possible;
* <li>ANDs aggregate filters together with the outer filter to allow for
* pruning of data.
* </ol>
*
* <p>Should be called before pushing both the aggregate and project into
* Druid. Assumes that at least one aggregate call has a filter attached to
* it. */
private DruidQuery optimizeFilteredAggregations(RelOptRuleCall call,
DruidQuery query,
Project project, Aggregate aggregate) {
Filter filter = null;
final RexBuilder builder = query.getCluster().getRexBuilder();
final RexExecutor executor =
Util.first(query.getCluster().getPlanner().getExecutor(),
RexUtil.EXECUTOR);
final RelNode scan = query.rels.get(0); // first rel is the table scan
final RelOptPredicateList predicates =
call.getMetadataQuery().getPulledUpPredicates(scan);
final RexSimplify simplify =
new RexSimplify(builder, predicates, executor);
// if the druid query originally contained a filter
boolean containsFilter = false;
for (RelNode node : query.rels) {
if (node instanceof Filter) {
filter = (Filter) node;
containsFilter = true;
break;
}
}
// if every aggregate call has a filter arg reference
boolean allHaveFilters = allAggregatesHaveFilters(aggregate.getAggCallList());
Set<Integer> uniqueFilterRefs = getUniqueFilterRefs(aggregate.getAggCallList());
// One of the pre-conditions for this method
assert uniqueFilterRefs.size() > 0;
List<AggregateCall> newCalls = new ArrayList<>();
// OR all the filters so that they can ANDed to the outer filter
List<RexNode> disjunctions = new ArrayList<>();
for (Integer i : uniqueFilterRefs) {
disjunctions.add(stripFilter(project.getProjects().get(i)));
}
RexNode filterNode = RexUtil.composeDisjunction(builder, disjunctions);
// Erase references to filters
for (AggregateCall aggCall : aggregate.getAggCallList()) {
if ((uniqueFilterRefs.size() == 1
&& allHaveFilters) // filters get extracted
|| aggCall.hasFilter()
&& project.getProjects().get(aggCall.filterArg).isAlwaysTrue()) {
aggCall = aggCall.copy(aggCall.getArgList(), -1, aggCall.collation);
}
newCalls.add(aggCall);
}
aggregate = aggregate.copy(aggregate.getTraitSet(), aggregate.getInput(),
aggregate.getGroupSet(), aggregate.getGroupSets(), newCalls);
if (containsFilter) {
// AND the current filterNode with the filter node inside filter
filterNode = builder.makeCall(SqlStdOperatorTable.AND, filterNode, filter.getCondition());
}
// Simplify the filter as much as possible
RexNode tempFilterNode = filterNode;
filterNode = simplify.simplifyUnknownAsFalse(filterNode);
// It's possible that after simplification that the expression is now always false.
// Druid cannot handle such a filter.
// This will happen when the below expression (f_n+1 may not exist):
// f_n+1 AND (f_1 OR f_2 OR ... OR f_n) simplifies to be something always false.
// f_n+1 cannot be false, since it came from a pushed filter rel node
// and each f_i cannot be false, since DruidAggregateProjectRule would have caught that.
// So, the only solution is to revert back to the un simplified version and let Druid
// handle a filter that is ultimately unsatisfiable.
if (filterNode.isAlwaysFalse()) {
filterNode = tempFilterNode;
}
filter = LogicalFilter.create(scan, filterNode);
boolean addNewFilter = !filter.getCondition().isAlwaysTrue() && allHaveFilters;
// Assumes that Filter nodes are always right after
// TableScan nodes (which are always present)
int startIndex = containsFilter && addNewFilter ? 2 : 1;
List<RelNode> newNodes = constructNewNodes(query.rels, addNewFilter, startIndex,
filter, project, aggregate);
return DruidQuery.create(query.getCluster(),
aggregate.getTraitSet().replace(query.getConvention()),
query.getTable(), query.druidTable, newNodes);
}
// Returns true if and only if every AggregateCall in calls has a filter argument.
private static boolean allAggregatesHaveFilters(List<AggregateCall> calls) {
for (AggregateCall call : calls) {
if (!call.hasFilter()) {
return false;
}
}
return true;
}
/**
* Returns a new List of RelNodes in the order of the given order of the oldNodes,
* the given {@link Filter}, and any extra nodes.
*/
private static List<RelNode> constructNewNodes(List<RelNode> oldNodes,
boolean addFilter, int startIndex, RelNode filter, RelNode... trailingNodes) {
List<RelNode> newNodes = new ArrayList<>();
// The first item should always be the Table scan, so any filter would go after that
newNodes.add(oldNodes.get(0));
if (addFilter) {
newNodes.add(filter);
// This is required so that each RelNode is linked to the one before it
if (startIndex < oldNodes.size()) {
RelNode next = oldNodes.get(startIndex);
newNodes.add(next.copy(next.getTraitSet(), Collections.singletonList(filter)));
startIndex++;
}
}
// Add the rest of the nodes from oldNodes
for (int i = startIndex; i < oldNodes.size(); i++) {
newNodes.add(oldNodes.get(i));
}
// Add the trailing nodes (need to link them)
for (RelNode node : trailingNodes) {
newNodes.add(node.copy(node.getTraitSet(), Collections.singletonList(Util.last(newNodes))));
}
return newNodes;
}
// Removes the IS_TRUE in front of RexCalls, if they exist
private static RexNode stripFilter(RexNode node) {
if (node.getKind() == SqlKind.IS_TRUE) {
return ((RexCall) node).getOperands().get(0);
}
return node;
}
private static List<Integer> getFilterRefs(List<AggregateCall> calls) {
List<Integer> refs = new ArrayList<>();
for (AggregateCall call : calls) {
if (call.hasFilter()) {
refs.add(call.filterArg);
}
}
return refs;
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Aggregate.class).oneInput(b1 ->
b1.operand(Project.class).oneInput(b2 ->
b2.operand(DruidQuery.class).noInputs())))
.as(DruidAggregateProjectRule.Config.class);
@Override default DruidAggregateProjectRule toRule() {
return new DruidAggregateProjectRule(this);
}
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Sort}
* into a {@link DruidQuery}.
*/
public static class DruidSortRule
extends RelRule<DruidSortRule.Config> {
/** Creates a DruidSortRule. */
protected DruidSortRule(Config config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Sort sort = call.rel(0);
final DruidQuery query = call.rel(1);
if (!DruidQuery.isValidSignature(query.signature() + 'l')) {
return;
}
// Either it is:
// - a pure limit above a query of type scan
// - a sort and limit on a dimension/metric part of the druid group by query
if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
// offset not supported by Druid
return;
}
if (query.getQueryType() == QueryType.SCAN && !RelOptUtil.isPureLimit(sort)) {
return;
}
final RelNode newSort = sort
.copy(sort.getTraitSet(), ImmutableList.of(Util.last(query.rels)));
call.transformTo(DruidQuery.extendQuery(query, newSort));
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
Config DEFAULT = EMPTY
.withOperandSupplier(b0 ->
b0.operand(Sort.class).oneInput(b1 ->
b1.operand(DruidQuery.class).noInputs()))
.as(DruidSortRule.Config.class);
@Override default DruidSortRule toRule() {
return new DruidSortRule(this);
}
}
}
}