blob: de65a3a7733a87c0f1361a5c30f5bbf5ce2dd8e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.druid;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.rules.AggregateFilterTransposeRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.ProjectSortTransposeRule;
import org.apache.calcite.rel.rules.PushProjector;
import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexExecutor;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSimplify;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.runtime.PredicateImpl;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
/**
* Rules and relational operators for {@link DruidQuery}.
*/
public class DruidRules {
private DruidRules() {}
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
public static final DruidFilterRule FILTER = new DruidFilterRule();
public static final DruidProjectRule PROJECT = new DruidProjectRule();
public static final DruidAggregateRule AGGREGATE = new DruidAggregateRule();
public static final DruidAggregateProjectRule AGGREGATE_PROJECT =
new DruidAggregateProjectRule();
public static final DruidSortRule SORT = new DruidSortRule();
public static final DruidSortProjectTransposeRule SORT_PROJECT_TRANSPOSE =
new DruidSortProjectTransposeRule();
public static final DruidProjectSortTransposeRule PROJECT_SORT_TRANSPOSE =
new DruidProjectSortTransposeRule();
public static final DruidProjectFilterTransposeRule PROJECT_FILTER_TRANSPOSE =
new DruidProjectFilterTransposeRule();
public static final DruidFilterProjectTransposeRule FILTER_PROJECT_TRANSPOSE =
new DruidFilterProjectTransposeRule();
public static final DruidAggregateFilterTransposeRule AGGREGATE_FILTER_TRANSPOSE =
new DruidAggregateFilterTransposeRule();
public static final DruidFilterAggregateTransposeRule FILTER_AGGREGATE_TRANSPOSE =
new DruidFilterAggregateTransposeRule();
public static final List<RelOptRule> RULES =
ImmutableList.of(FILTER,
PROJECT_FILTER_TRANSPOSE,
// Disabled, per
// [CALCITE-1706] DruidAggregateFilterTransposeRule
// causes very fine-grained aggregations to be pushed to Druid
// AGGREGATE_FILTER_TRANSPOSE,
AGGREGATE_PROJECT,
PROJECT,
AGGREGATE,
FILTER_AGGREGATE_TRANSPOSE,
FILTER_PROJECT_TRANSPOSE,
PROJECT_SORT_TRANSPOSE,
SORT,
SORT_PROJECT_TRANSPOSE);
/** Predicate that returns whether Druid can not handle an aggregate. */
private static final Predicate<Triple<Aggregate, RelNode, DruidQuery>> BAD_AGG =
new PredicateImpl<Triple<Aggregate, RelNode, DruidQuery>>() {
public boolean test(Triple<Aggregate, RelNode, DruidQuery> triple) {
final Aggregate aggregate = triple.getLeft();
final RelNode node = triple.getMiddle();
final DruidQuery query = triple.getRight();
final CalciteConnectionConfig config = query.getConnectionConfig();
for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
switch (aggregateCall.getAggregation().getKind()) {
case COUNT:
// Druid can handle 2 scenarios:
// 1. count(distinct col) when approximate results
// are acceptable and col is not a metric
// 2. count(*)
if (checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()),
node, query)) {
return true;
}
if ((config.approximateDistinctCount() && aggregateCall.isDistinct())
|| aggregateCall.getArgList().isEmpty()) {
continue;
}
return true;
case SUM:
case SUM0:
case MIN:
case MAX:
final RelDataType type = aggregateCall.getType();
final SqlTypeName sqlTypeName = type.getSqlTypeName();
if (SqlTypeFamily.APPROXIMATE_NUMERIC.getTypeNames().contains(sqlTypeName)
|| SqlTypeFamily.INTEGER.getTypeNames().contains(sqlTypeName)) {
continue;
} else if (SqlTypeFamily.EXACT_NUMERIC.getTypeNames().contains(sqlTypeName)) {
// Decimal
assert sqlTypeName == SqlTypeName.DECIMAL;
if (type.getScale() == 0 || config.approximateDecimal()) {
// If scale is zero or we allow approximating decimal, we can proceed
continue;
}
}
// Cannot handle this aggregate function
return true;
default:
// Cannot handle this aggregate function
return true;
}
}
return false;
}
};
/**
* Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}.
*/
private static class DruidFilterRule extends RelOptRule {
private DruidFilterRule() {
super(operand(Filter.class, operand(DruidQuery.class, none())));
}
public void onMatch(RelOptRuleCall call) {
final Filter filter = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = filter.getCluster();
final RelBuilder relBuilder = call.builder();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (!DruidQuery.isValidSignature(query.signature() + 'f')) {
return;
}
final List<RexNode> validPreds = new ArrayList<>();
final List<RexNode> nonValidPreds = new ArrayList<>();
final RexExecutor executor =
Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
final RexSimplify simplify = new RexSimplify(rexBuilder, true, executor);
final RexNode cond = simplify.simplify(filter.getCondition());
if (!canPush(cond)) {
return;
}
for (RexNode e : RelOptUtil.conjunctions(cond)) {
if (query.isValidFilter(e)) {
validPreds.add(e);
} else {
nonValidPreds.add(e);
}
}
// Timestamp
int timestampFieldIdx = -1;
for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
if (query.druidTable.timestampFieldName.equals(
query.getRowType().getFieldList().get(i).getName())) {
timestampFieldIdx = i;
break;
}
}
final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple =
splitFilters(rexBuilder, query, validPreds, nonValidPreds, timestampFieldIdx);
if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) {
// We can't push anything useful to Druid.
return;
}
final List<RexNode> residualPreds = new ArrayList<>(triple.getRight());
List<LocalInterval> intervals = null;
if (!triple.getLeft().isEmpty()) {
intervals = DruidDateTimeUtils.createInterval(
query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false));
if (intervals == null || intervals.isEmpty()) {
// Case we have an filter with extract that can not be written as interval push down
triple.getMiddle().addAll(triple.getLeft());
}
}
RelNode newDruidQuery = query;
if (!triple.getMiddle().isEmpty()) {
final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
RexUtil.composeConjunction(rexBuilder, triple.getMiddle(), false));
newDruidQuery = DruidQuery.extendQuery(query, newFilter);
}
if (intervals != null && !intervals.isEmpty()) {
newDruidQuery = DruidQuery.extendQuery((DruidQuery) newDruidQuery, intervals);
}
if (!residualPreds.isEmpty()) {
newDruidQuery = relBuilder
.push(newDruidQuery)
.filter(residualPreds)
.build();
}
call.transformTo(newDruidQuery);
}
/**
* Given a list of conditions that contain Druid valid operations and
* a list that contains those that contain any non-supported operation,
* it outputs a triple with three different categories:
* 1-l) condition filters on the timestamp column,
* 2-m) condition filters that can be pushed to Druid,
* 3-r) condition filters that cannot be pushed to Druid.
*/
private static Triple<List<RexNode>, List<RexNode>, List<RexNode>> splitFilters(
final RexBuilder rexBuilder, final DruidQuery input, final List<RexNode> validPreds,
final List<RexNode> nonValidPreds, final int timestampFieldIdx) {
final List<RexNode> timeRangeNodes = new ArrayList<>();
final List<RexNode> pushableNodes = new ArrayList<>();
final List<RexNode> nonPushableNodes = new ArrayList<>(nonValidPreds);
// Number of columns with the dimensions and timestamp
for (RexNode conj : validPreds) {
final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
conj.accept(visitor);
if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
if (visitor.inputPosReferenced.size() != 1) {
// Complex predicate, transformation currently not supported
nonPushableNodes.add(conj);
} else {
timeRangeNodes.add(conj);
}
} else {
boolean filterOnMetrics = false;
for (Integer i : visitor.inputPosReferenced) {
if (input.druidTable.isMetric(input.getRowType().getFieldList().get(i).getName())) {
// Filter on metrics, not supported in Druid
filterOnMetrics = true;
break;
}
}
if (filterOnMetrics) {
nonPushableNodes.add(conj);
} else {
pushableNodes.add(conj);
}
}
}
return ImmutableTriple.of(timeRangeNodes, pushableNodes, nonPushableNodes);
}
/** Returns whether we can push an expression to Druid. */
private static boolean canPush(RexNode cond) {
// Druid cannot implement "where false"
return !cond.isAlwaysFalse();
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
*/
private static class DruidProjectRule extends RelOptRule {
private DruidProjectRule() {
super(operand(Project.class, operand(DruidQuery.class, none())));
}
public void onMatch(RelOptRuleCall call) {
final Project project = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = project.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
if (!DruidQuery.isValidSignature(query.signature() + 'p')) {
return;
}
if (canProjectAll(project.getProjects())) {
// All expressions can be pushed to Druid in their entirety.
final RelNode newProject = project.copy(project.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
RelNode newNode = DruidQuery.extendQuery(query, newProject);
call.transformTo(newNode);
return;
}
final Pair<List<RexNode>, List<RexNode>> pair =
splitProjects(rexBuilder, query, project.getProjects());
if (pair == null) {
// We can't push anything useful to Druid.
return;
}
final List<RexNode> above = pair.left;
final List<RexNode> below = pair.right;
final RelDataTypeFactory.FieldInfoBuilder builder =
cluster.getTypeFactory().builder();
final RelNode input = Util.last(query.rels);
for (RexNode e : below) {
final String name;
if (e instanceof RexInputRef) {
name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex());
} else {
name = null;
}
builder.add(name, e.getType());
}
final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build());
final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above,
project.getRowType());
call.transformTo(newProject2);
}
private static boolean canProjectAll(List<RexNode> nodes) {
for (RexNode e : nodes) {
if (!(e instanceof RexInputRef)) {
return false;
}
}
return true;
}
private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder,
final RelNode input, List<RexNode> nodes) {
final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
for (RexNode node : nodes) {
node.accept(visitor);
}
if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) {
// All inputs are referenced
return null;
}
final List<RexNode> belowNodes = new ArrayList<>();
final List<RelDataType> belowTypes = new ArrayList<>();
final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced);
for (int i : positions) {
final RexNode node = rexBuilder.makeInputRef(input, i);
belowNodes.add(node);
belowTypes.add(node.getType());
}
final List<RexNode> aboveNodes = new ArrayList<>();
for (RexNode node : nodes) {
aboveNodes.add(
node.accept(
new RexShuttle() {
@Override public RexNode visitInputRef(RexInputRef ref) {
final int index = positions.indexOf(ref.getIndex());
return rexBuilder.makeInputRef(belowTypes.get(index), index);
}
}));
}
return Pair.of(aboveNodes, belowNodes);
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
*/
private static class DruidAggregateRule extends RelOptRule {
private DruidAggregateRule() {
super(operand(Aggregate.class, operand(DruidQuery.class, none())));
}
public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final DruidQuery query = call.rel(1);
if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
return;
}
if (aggregate.indicator
|| aggregate.getGroupSets().size() != 1
|| BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) aggregate, query))
|| !validAggregate(aggregate, query)) {
return;
}
final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
call.transformTo(DruidQuery.extendQuery(query, newAggregate));
}
/* Check whether agg functions reference timestamp */
private static boolean validAggregate(Aggregate aggregate, DruidQuery query) {
ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
for (AggregateCall aggCall : aggregate.getAggCallList()) {
builder.addAll(aggCall.getArgList());
}
if (checkAggregateOnMetric(aggregate.getGroupSet(), aggregate, query)) {
return false;
}
return !checkTimestampRefOnQuery(builder.build(), query.getTopNode(), query);
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and
* {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
*/
private static class DruidAggregateProjectRule extends RelOptRule {
private DruidAggregateProjectRule() {
super(
operand(Aggregate.class,
operand(Project.class,
operand(DruidQuery.class, none()))));
}
public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final Project project = call.rel(1);
final DruidQuery query = call.rel(2);
if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
return;
}
int timestampIdx;
if ((timestampIdx = validProject(project, query)) == -1) {
return;
}
if (aggregate.indicator
|| aggregate.getGroupSets().size() != 1
|| BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) project, query))
|| !validAggregate(aggregate, timestampIdx)) {
return;
}
if (checkAggregateOnMetric(aggregate.getGroupSet(), project, query)) {
return;
}
final RelNode newProject = project.copy(project.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject);
final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
ImmutableList.of(Util.last(projectDruidQuery.rels)));
call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate));
}
/* To be a valid Project, we allow it to contain references, and a single call
* to a FLOOR function on the timestamp column OR valid time EXTRACT on the timestamp column.
* Returns the reference to the timestamp, if any. */
private static int validProject(Project project, DruidQuery query) {
List<RexNode> nodes = project.getProjects();
int idxTimestamp = -1;
boolean hasFloor = false;
for (int i = 0; i < nodes.size(); i++) {
final RexNode e = nodes.get(i);
if (e instanceof RexCall) {
// It is a call, check that it is EXTRACT and follow-up conditions
final RexCall call = (RexCall) e;
if (DruidDateTimeUtils.extractGranularity(call) == null) {
return -1;
}
if (idxTimestamp != -1 && hasFloor) {
// Already one usage of timestamp column
return -1;
}
switch (call.getKind()) {
case FLOOR:
hasFloor = true;
if (!(call.getOperands().get(0) instanceof RexInputRef)) {
return -1;
}
final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
query.getTopNode(),
query))) {
return -1;
}
idxTimestamp = i;
break;
case EXTRACT:
idxTimestamp = RelOptUtil.InputFinder.bits(call).asList().get(0);
break;
default:
throw new AssertionError();
}
continue;
}
if (!(e instanceof RexInputRef)) {
// It needs to be a reference
return -1;
}
final RexInputRef ref = (RexInputRef) e;
if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()),
query.getTopNode(), query)) {
if (idxTimestamp != -1) {
// Already one usage of timestamp column
return -1;
}
idxTimestamp = i;
}
}
return idxTimestamp;
}
private static boolean validAggregate(Aggregate aggregate, int idx) {
if (!aggregate.getGroupSet().get(idx)) {
return false;
}
for (AggregateCall aggCall : aggregate.getAggCallList()) {
if (aggCall.getArgList().contains(idx)) {
return false;
}
}
return true;
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Sort} through a
* {@link org.apache.calcite.rel.core.Project}. Useful to transform
* to complex Druid queries.
*/
private static class DruidSortProjectTransposeRule
extends SortProjectTransposeRule {
private DruidSortProjectTransposeRule() {
super(
operand(Sort.class,
operand(Project.class, operand(DruidQuery.class, none()))));
}
}
/**
* Rule to push back {@link org.apache.calcite.rel.core.Project} through a
* {@link org.apache.calcite.rel.core.Sort}. Useful if after pushing Sort,
* we could not push it inside DruidQuery.
*/
private static class DruidProjectSortTransposeRule
extends ProjectSortTransposeRule {
private DruidProjectSortTransposeRule() {
super(
operand(Project.class,
operand(Sort.class, operand(DruidQuery.class, none()))));
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Sort}
* into a {@link DruidQuery}.
*/
private static class DruidSortRule extends RelOptRule {
private DruidSortRule() {
super(operand(Sort.class, operand(DruidQuery.class, none())));
}
public void onMatch(RelOptRuleCall call) {
final Sort sort = call.rel(0);
final DruidQuery query = call.rel(1);
if (!DruidQuery.isValidSignature(query.signature() + 'l')) {
return;
}
// Either it is:
// - a sort and limit on a dimension/metric part of the druid group by query or
// - a sort without limit on the time column on top of
// Agg operator (transformable to timeseries query), or
// - a simple limit on top of other operator than Agg
if (!validSortLimit(sort, query)) {
return;
}
final RelNode newSort = sort.copy(sort.getTraitSet(),
ImmutableList.of(Util.last(query.rels)));
call.transformTo(DruidQuery.extendQuery(query, newSort));
}
/** Checks whether sort is valid. */
private static boolean validSortLimit(Sort sort, DruidQuery query) {
if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
// offset not supported by Druid
return false;
}
if (query.getTopNode() instanceof Aggregate) {
final Aggregate topAgg = (Aggregate) query.getTopNode();
final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder();
for (RelFieldCollation col : sort.collation.getFieldCollations()) {
int idx = col.getFieldIndex();
if (idx >= topAgg.getGroupCount()) {
continue;
}
// has the indexes of the columns used for sorts
positionsReferenced.set(topAgg.getGroupSet().nth(idx));
}
// Case it is a timeseries query
if (checkIsFlooringTimestampRefOnQuery(topAgg.getGroupSet(), topAgg.getInput(), query)
&& topAgg.getGroupCount() == 1) {
// do not push if it has a limit or more than one sort key or we have sort by
// metric/dimension
return !RelOptUtil.isLimit(sort) && sort.collation.getFieldCollations().size() == 1
&& checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput(), query);
}
return true;
}
// If it is going to be a Druid select operator, we push the limit if
// it does not contain a sort specification (required by Druid)
return RelOptUtil.isPureLimit(sort);
}
}
/** Returns true if any of the grouping key is a floor operator over the timestamp column. */
private static boolean checkIsFlooringTimestampRefOnQuery(ImmutableBitSet set, RelNode top,
DruidQuery query) {
if (top instanceof Project) {
ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
final Project project = (Project) top;
for (int index : set) {
RexNode node = project.getProjects().get(index);
if (node instanceof RexCall) {
RexCall call = (RexCall) node;
assert DruidDateTimeUtils.extractGranularity(call) != null;
if (call.getKind().equals(SqlKind.FLOOR)) {
newSet.addAll(RelOptUtil.InputFinder.bits(call));
}
}
}
top = project.getInput();
set = newSet.build();
}
// Check if any references the timestamp column
for (int index : set) {
if (query.druidTable.timestampFieldName.equals(
top.getRowType().getFieldNames().get(index))) {
return true;
}
}
return false;
}
/** Checks whether any of the references leads to the timestamp column. */
private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top,
DruidQuery query) {
if (top instanceof Project) {
ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
final Project project = (Project) top;
for (int index : set) {
RexNode node = project.getProjects().get(index);
if (node instanceof RexInputRef) {
newSet.set(((RexInputRef) node).getIndex());
} else if (node instanceof RexCall) {
RexCall call = (RexCall) node;
assert DruidDateTimeUtils.extractGranularity(call) != null;
// when we have extract from time columnthe rexCall is in the form of /Reinterpret$0
newSet.addAll(RelOptUtil.InputFinder.bits(call));
}
}
top = project.getInput();
set = newSet.build();
}
// Check if any references the timestamp column
for (int index : set) {
if (query.druidTable.timestampFieldName.equals(
top.getRowType().getFieldNames().get(index))) {
return true;
}
}
return false;
}
/** Checks whether any of the references leads to a metric column. */
private static boolean checkAggregateOnMetric(ImmutableBitSet set, RelNode topProject,
DruidQuery query) {
if (topProject instanceof Project) {
ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
final Project project = (Project) topProject;
for (int index : set) {
RexNode node = project.getProjects().get(index);
ImmutableBitSet setOfBits = RelOptUtil.InputFinder.bits(node);
newSet.addAll(setOfBits);
}
set = newSet.build();
}
for (int index : set) {
if (query.druidTable.isMetric(query.getTopNode().getRowType().getFieldNames().get(index))) {
return true;
}
}
return false;
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Project}
* past a {@link org.apache.calcite.rel.core.Filter}
* when {@code Filter} is on top of a {@link DruidQuery}.
*/
private static class DruidProjectFilterTransposeRule
extends ProjectFilterTransposeRule {
private DruidProjectFilterTransposeRule() {
super(
operand(Project.class,
operand(Filter.class,
operand(DruidQuery.class, none()))),
PushProjector.ExprCondition.FALSE,
RelFactories.LOGICAL_BUILDER);
}
}
/**
* Rule to push a {@link org.apache.calcite.rel.core.Filter}
* past a {@link org.apache.calcite.rel.core.Project}
* when {@code Project} is on top of a {@link DruidQuery}.
*/
private static class DruidFilterProjectTransposeRule
extends FilterProjectTransposeRule {
private DruidFilterProjectTransposeRule() {
super(
operand(Filter.class,
operand(Project.class,
operand(DruidQuery.class, none()))),
true, true, RelFactories.LOGICAL_BUILDER);
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Aggregate}
* past a {@link org.apache.calcite.rel.core.Filter}
* when {@code Filter} is on top of a {@link DruidQuery}.
*/
private static class DruidAggregateFilterTransposeRule
extends AggregateFilterTransposeRule {
private DruidAggregateFilterTransposeRule() {
super(
operand(Aggregate.class,
operand(Filter.class,
operand(DruidQuery.class, none()))),
RelFactories.LOGICAL_BUILDER);
}
}
/**
* Rule to push an {@link org.apache.calcite.rel.core.Filter}
* past an {@link org.apache.calcite.rel.core.Aggregate}
* when {@code Aggregate} is on top of a {@link DruidQuery}.
*/
private static class DruidFilterAggregateTransposeRule
extends FilterAggregateTransposeRule {
private DruidFilterAggregateTransposeRule() {
super(
operand(Filter.class,
operand(Aggregate.class,
operand(DruidQuery.class, none()))),
RelFactories.LOGICAL_BUILDER);
}
}
}
// End DruidRules.java