| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.sql.calcite.rel; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSortedMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Ordering; |
| import com.google.common.primitives.Ints; |
| import it.unimi.dsi.fastutil.ints.IntArrayList; |
| import it.unimi.dsi.fastutil.ints.IntList; |
| import org.apache.calcite.rel.RelFieldCollation; |
| import org.apache.calcite.rel.core.Aggregate; |
| import org.apache.calcite.rel.core.AggregateCall; |
| import org.apache.calcite.rel.core.Filter; |
| import org.apache.calcite.rel.core.Project; |
| import org.apache.calcite.rel.core.Sort; |
| import org.apache.calcite.rel.type.RelDataType; |
| import org.apache.calcite.rex.RexBuilder; |
| import org.apache.calcite.rex.RexInputRef; |
| import org.apache.calcite.rex.RexNode; |
| import org.apache.calcite.sql.SqlKind; |
| import org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.calcite.util.ImmutableBitSet; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.query.DataSource; |
| import org.apache.druid.query.JoinDataSource; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryDataSource; |
| import org.apache.druid.query.aggregation.PostAggregator; |
| import org.apache.druid.query.dimension.DimensionSpec; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.query.groupby.GroupByQuery; |
| import org.apache.druid.query.groupby.having.DimFilterHavingSpec; |
| import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; |
| import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; |
| import org.apache.druid.query.ordering.StringComparator; |
| import org.apache.druid.query.ordering.StringComparators; |
| import org.apache.druid.query.scan.ScanQuery; |
| import org.apache.druid.query.timeseries.TimeseriesQuery; |
| import org.apache.druid.query.topn.DimensionTopNMetricSpec; |
| import org.apache.druid.query.topn.InvertedTopNMetricSpec; |
| import org.apache.druid.query.topn.NumericTopNMetricSpec; |
| import org.apache.druid.query.topn.TopNMetricSpec; |
| import org.apache.druid.query.topn.TopNQuery; |
| import org.apache.druid.segment.VirtualColumn; |
| import org.apache.druid.segment.VirtualColumns; |
| import org.apache.druid.segment.column.ColumnHolder; |
| import org.apache.druid.segment.column.RowSignature; |
| import org.apache.druid.segment.column.ValueType; |
| import org.apache.druid.sql.calcite.aggregation.Aggregation; |
| import org.apache.druid.sql.calcite.aggregation.DimensionExpression; |
| import org.apache.druid.sql.calcite.expression.DruidExpression; |
| import org.apache.druid.sql.calcite.expression.Expressions; |
| import org.apache.druid.sql.calcite.filtration.Filtration; |
| import org.apache.druid.sql.calcite.planner.Calcites; |
| import org.apache.druid.sql.calcite.planner.OffsetLimit; |
| import org.apache.druid.sql.calcite.planner.PlannerContext; |
| import org.apache.druid.sql.calcite.rule.GroupByRules; |
| import org.apache.druid.sql.calcite.table.RowSignatures; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| |
| /** |
| * A fully formed Druid query, built from a {@link PartialDruidQuery}. The work to develop this query is done |
| * during construction, which may throw {@link CannotBuildQueryException}. |
| */ |
| public class DruidQuery |
| { |
| private final DataSource dataSource; |
| private final PlannerContext plannerContext; |
| |
| @Nullable |
| private final DimFilter filter; |
| |
| @Nullable |
| private final Projection selectProjection; |
| |
| @Nullable |
| private final Grouping grouping; |
| |
| @Nullable |
| private final Sorting sorting; |
| |
| private final Query query; |
| private final RowSignature outputRowSignature; |
| private final RelDataType outputRowType; |
| private final VirtualColumnRegistry virtualColumnRegistry; |
| |
| private DruidQuery( |
| final DataSource dataSource, |
| final PlannerContext plannerContext, |
| @Nullable final DimFilter filter, |
| @Nullable final Projection selectProjection, |
| @Nullable final Grouping grouping, |
| @Nullable final Sorting sorting, |
| final RowSignature sourceRowSignature, |
| final RelDataType outputRowType, |
| final VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); |
| this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); |
| this.filter = filter; |
| this.selectProjection = selectProjection; |
| this.grouping = grouping; |
| this.sorting = sorting; |
| this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting); |
| this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType"); |
| this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry"); |
| this.query = computeQuery(); |
| } |
| |
| public static DruidQuery fromPartialQuery( |
| final PartialDruidQuery partialQuery, |
| final DataSource dataSource, |
| final RowSignature sourceRowSignature, |
| final PlannerContext plannerContext, |
| final RexBuilder rexBuilder, |
| final boolean finalizeAggregations |
| ) |
| { |
| final RelDataType outputRowType = partialQuery.leafRel().getRowType(); |
| final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature); |
| |
| // Now the fun begins. |
| final DimFilter filter; |
| final Projection selectProjection; |
| final Grouping grouping; |
| final Sorting sorting; |
| |
| if (partialQuery.getWhereFilter() != null) { |
| filter = Preconditions.checkNotNull( |
| computeWhereFilter( |
| partialQuery, |
| plannerContext, |
| sourceRowSignature, |
| virtualColumnRegistry |
| ) |
| ); |
| } else { |
| filter = null; |
| } |
| |
| // Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will |
| // reflect select-project from partialQuery on its own.) |
| if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) { |
| selectProjection = Preconditions.checkNotNull( |
| computeSelectProjection( |
| partialQuery, |
| plannerContext, |
| computeOutputRowSignature(sourceRowSignature, null, null, null), |
| virtualColumnRegistry |
| ) |
| ); |
| } else { |
| selectProjection = null; |
| } |
| |
| if (partialQuery.getAggregate() != null) { |
| grouping = Preconditions.checkNotNull( |
| computeGrouping( |
| partialQuery, |
| plannerContext, |
| computeOutputRowSignature(sourceRowSignature, null, null, null), |
| virtualColumnRegistry, |
| rexBuilder, |
| finalizeAggregations |
| ) |
| ); |
| } else { |
| grouping = null; |
| } |
| |
| if (partialQuery.getSort() != null) { |
| sorting = Preconditions.checkNotNull( |
| computeSorting( |
| partialQuery, |
| plannerContext, |
| computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, null), |
| // When sorting follows grouping, virtual columns cannot be used |
| partialQuery.getAggregate() != null ? null : virtualColumnRegistry |
| ) |
| ); |
| } else { |
| sorting = null; |
| } |
| |
| return new DruidQuery( |
| dataSource, |
| plannerContext, |
| filter, |
| selectProjection, |
| grouping, |
| sorting, |
| sourceRowSignature, |
| outputRowType, |
| virtualColumnRegistry |
| ); |
| } |
| |
| @Nonnull |
| private static DimFilter computeWhereFilter( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| final VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| return getDimFilter(plannerContext, rowSignature, virtualColumnRegistry, partialQuery.getWhereFilter()); |
| } |
| |
| @Nullable |
| private static DimFilter computeHavingFilter( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature aggregateSignature |
| ) |
| { |
| final Filter havingFilter = partialQuery.getHavingFilter(); |
| |
| if (havingFilter == null) { |
| return null; |
| } |
| |
| // null virtualColumnRegistry, since virtual columns cannot be referenced by "having" filters. |
| return getDimFilter(plannerContext, aggregateSignature, null, havingFilter); |
| } |
| |
| @Nonnull |
| private static DimFilter getDimFilter( |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| @Nullable final VirtualColumnRegistry virtualColumnRegistry, |
| final Filter filter |
| ) |
| { |
| final RexNode condition = filter.getCondition(); |
| final DimFilter dimFilter = Expressions.toFilter( |
| plannerContext, |
| rowSignature, |
| virtualColumnRegistry, |
| condition |
| ); |
| if (dimFilter == null) { |
| throw new CannotBuildQueryException(filter, condition); |
| } else { |
| return dimFilter; |
| } |
| } |
| |
| @Nonnull |
| private static Projection computeSelectProjection( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| final VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| final Project project = Preconditions.checkNotNull(partialQuery.getSelectProject(), "selectProject"); |
| |
| if (partialQuery.getAggregate() != null) { |
| throw new ISE("Cannot have both 'selectProject' and 'aggregate', how can this be?"); |
| } else { |
| return Projection.preAggregation(project, plannerContext, rowSignature, virtualColumnRegistry); |
| } |
| } |
| |
| @Nonnull |
| private static Grouping computeGrouping( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| final VirtualColumnRegistry virtualColumnRegistry, |
| final RexBuilder rexBuilder, |
| final boolean finalizeAggregations |
| ) |
| { |
| final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate(), "aggregate"); |
| final Project aggregateProject = partialQuery.getAggregateProject(); |
| |
| final List<DimensionExpression> dimensions = computeDimensions( |
| partialQuery, |
| plannerContext, |
| rowSignature, |
| virtualColumnRegistry |
| ); |
| |
| final Subtotals subtotals = computeSubtotals( |
| partialQuery, |
| rowSignature |
| ); |
| |
| final List<Aggregation> aggregations = computeAggregations( |
| partialQuery, |
| plannerContext, |
| rowSignature, |
| virtualColumnRegistry, |
| rexBuilder, |
| finalizeAggregations |
| ); |
| |
| final RowSignature aggregateRowSignature = RowSignatures.fromRelDataType( |
| ImmutableList.copyOf( |
| Iterators.concat( |
| dimensions.stream().map(DimensionExpression::getOutputName).iterator(), |
| aggregations.stream().map(Aggregation::getOutputName).iterator() |
| ) |
| ), |
| aggregate.getRowType() |
| ); |
| |
| final DimFilter havingFilter = computeHavingFilter( |
| partialQuery, |
| plannerContext, |
| aggregateRowSignature |
| ); |
| |
| final Grouping grouping = Grouping.create(dimensions, subtotals, aggregations, havingFilter, aggregateRowSignature); |
| |
| if (aggregateProject == null) { |
| return grouping; |
| } else { |
| return grouping.applyProject(plannerContext, aggregateProject); |
| } |
| } |
| |
| /** |
| * Returns dimensions corresponding to {@code aggregate.getGroupSet()}, in the same order. |
| * |
| * @param partialQuery partial query |
| * @param plannerContext planner context |
| * @param rowSignature source row signature |
| * @param virtualColumnRegistry re-usable virtual column references |
| * |
| * @return dimensions |
| * |
| * @throws CannotBuildQueryException if dimensions cannot be computed |
| */ |
| private static List<DimensionExpression> computeDimensions( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| final VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); |
| final List<DimensionExpression> dimensions = new ArrayList<>(); |
| final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("d", rowSignature.getColumnNames()); |
| |
| int outputNameCounter = 0; |
| |
| for (int i : aggregate.getGroupSet()) { |
| // Dimension might need to create virtual columns. Avoid giving it a name that would lead to colliding columns. |
| final RexNode rexNode = Expressions.fromFieldAccess( |
| rowSignature, |
| partialQuery.getSelectProject(), |
| i |
| ); |
| final DruidExpression druidExpression = Expressions.toDruidExpression(plannerContext, rowSignature, rexNode); |
| if (druidExpression == null) { |
| throw new CannotBuildQueryException(aggregate, rexNode); |
| } |
| |
| final RelDataType dataType = rexNode.getType(); |
| final ValueType outputType = Calcites.getValueTypeForRelDataType(dataType); |
| if (outputType == null || outputType == ValueType.COMPLEX) { |
| // Can't group on unknown or COMPLEX types. |
| throw new CannotBuildQueryException(aggregate, rexNode); |
| } |
| |
| final VirtualColumn virtualColumn; |
| |
| |
| final String dimOutputName = outputNamePrefix + outputNameCounter++; |
| if (!druidExpression.isSimpleExtraction()) { |
| virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( |
| plannerContext, |
| druidExpression, |
| dataType |
| ); |
| dimensions.add(DimensionExpression.ofVirtualColumn( |
| virtualColumn.getOutputName(), |
| dimOutputName, |
| druidExpression, |
| outputType |
| )); |
| } else { |
| dimensions.add(DimensionExpression.ofSimpleColumn(dimOutputName, druidExpression, outputType)); |
| } |
| } |
| |
| return dimensions; |
| } |
| |
| /** |
| * Builds a {@link Subtotals} object based on {@link Aggregate#getGroupSets()}. |
| */ |
| private static Subtotals computeSubtotals( |
| final PartialDruidQuery partialQuery, |
| final RowSignature rowSignature |
| ) |
| { |
| final Aggregate aggregate = partialQuery.getAggregate(); |
| |
| // dimBitMapping maps from input field position to group set position (dimension number). |
| final int[] dimBitMapping; |
| if (partialQuery.getSelectProject() != null) { |
| dimBitMapping = new int[partialQuery.getSelectProject().getRowType().getFieldCount()]; |
| } else { |
| dimBitMapping = new int[rowSignature.size()]; |
| } |
| |
| int i = 0; |
| for (int dimBit : aggregate.getGroupSet()) { |
| dimBitMapping[dimBit] = i++; |
| } |
| |
| // Use dimBitMapping to remap groupSets (which is input-field-position based) into subtotals (which is |
| // dimension-list-position based). |
| final List<IntList> subtotals = new ArrayList<>(); |
| for (ImmutableBitSet groupSet : aggregate.getGroupSets()) { |
| final IntList subtotal = new IntArrayList(); |
| for (int dimBit : groupSet) { |
| subtotal.add(dimBitMapping[dimBit]); |
| } |
| |
| subtotals.add(subtotal); |
| } |
| |
| return new Subtotals(subtotals); |
| } |
| |
| /** |
| * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order. |
| * |
| * @param partialQuery partial query |
| * @param plannerContext planner context |
| * @param rowSignature source row signature |
| * @param virtualColumnRegistry re-usable virtual column references |
| * @param rexBuilder calcite RexBuilder |
| * @param finalizeAggregations true if this query should include explicit finalization for all of its |
| * aggregators, where required. Useful for subqueries where Druid's native query layer |
| * does not do this automatically. |
| * |
| * @return aggregations |
| * |
| * @throws CannotBuildQueryException if dimensions cannot be computed |
| */ |
| private static List<Aggregation> computeAggregations( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| final VirtualColumnRegistry virtualColumnRegistry, |
| final RexBuilder rexBuilder, |
| final boolean finalizeAggregations |
| ) |
| { |
| final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); |
| final List<Aggregation> aggregations = new ArrayList<>(); |
| final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("a", rowSignature.getColumnNames()); |
| |
| for (int i = 0; i < aggregate.getAggCallList().size(); i++) { |
| final String aggName = outputNamePrefix + i; |
| final AggregateCall aggCall = aggregate.getAggCallList().get(i); |
| final Aggregation aggregation = GroupByRules.translateAggregateCall( |
| plannerContext, |
| rowSignature, |
| virtualColumnRegistry, |
| rexBuilder, |
| partialQuery.getSelectProject(), |
| aggregations, |
| aggName, |
| aggCall, |
| finalizeAggregations |
| ); |
| |
| if (aggregation == null) { |
| throw new CannotBuildQueryException(aggregate, aggCall); |
| } |
| |
| aggregations.add(aggregation); |
| } |
| |
| return aggregations; |
| } |
| |
| @Nonnull |
| private static Sorting computeSorting( |
| final PartialDruidQuery partialQuery, |
| final PlannerContext plannerContext, |
| final RowSignature rowSignature, |
| @Nullable final VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort"); |
| final Project sortProject = partialQuery.getSortProject(); |
| |
| // Extract limit and offset. |
| final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort); |
| |
| // Extract orderBy column specs. |
| final List<OrderByColumnSpec> orderBys = new ArrayList<>(sort.getChildExps().size()); |
| for (int sortKey = 0; sortKey < sort.getChildExps().size(); sortKey++) { |
| final RexNode sortExpression = sort.getChildExps().get(sortKey); |
| final RelFieldCollation collation = sort.getCollation().getFieldCollations().get(sortKey); |
| final OrderByColumnSpec.Direction direction; |
| final StringComparator comparator; |
| |
| if (collation.getDirection() == RelFieldCollation.Direction.ASCENDING) { |
| direction = OrderByColumnSpec.Direction.ASCENDING; |
| } else if (collation.getDirection() == RelFieldCollation.Direction.DESCENDING) { |
| direction = OrderByColumnSpec.Direction.DESCENDING; |
| } else { |
| throw new ISE("Don't know what to do with direction[%s]", collation.getDirection()); |
| } |
| |
| final SqlTypeName sortExpressionType = sortExpression.getType().getSqlTypeName(); |
| if (SqlTypeName.NUMERIC_TYPES.contains(sortExpressionType) |
| || SqlTypeName.TIMESTAMP == sortExpressionType |
| || SqlTypeName.DATE == sortExpressionType) { |
| comparator = StringComparators.NUMERIC; |
| } else { |
| comparator = StringComparators.LEXICOGRAPHIC; |
| } |
| |
| if (sortExpression.isA(SqlKind.INPUT_REF)) { |
| final RexInputRef ref = (RexInputRef) sortExpression; |
| final String fieldName = rowSignature.getColumnName(ref.getIndex()); |
| orderBys.add(new OrderByColumnSpec(fieldName, direction, comparator)); |
| } else { |
| // We don't support sorting by anything other than refs which actually appear in the query result. |
| throw new CannotBuildQueryException(sort, sortExpression); |
| } |
| } |
| |
| // Extract any post-sort Projection. |
| final Projection projection; |
| |
| if (sortProject == null) { |
| projection = null; |
| } else if (partialQuery.getAggregate() == null) { |
| if (virtualColumnRegistry == null) { |
| throw new ISE("Must provide 'virtualColumnRegistry' for pre-aggregation Projection!"); |
| } |
| |
| projection = Projection.preAggregation(sortProject, plannerContext, rowSignature, virtualColumnRegistry); |
| } else { |
| projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s"); |
| } |
| |
| return Sorting.create(orderBys, offsetLimit, projection); |
| } |
| |
| /** |
| * Return the {@link RowSignature} corresponding to the output of a query with the given parameters. |
| */ |
| private static RowSignature computeOutputRowSignature( |
| final RowSignature sourceRowSignature, |
| @Nullable final Projection selectProjection, |
| @Nullable final Grouping grouping, |
| @Nullable final Sorting sorting |
| ) |
| { |
| if (sorting != null && sorting.getProjection() != null) { |
| return sorting.getProjection().getOutputRowSignature(); |
| } else if (grouping != null) { |
| // Sanity check: cannot have both "grouping" and "selectProjection". |
| Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'"); |
| return grouping.getOutputRowSignature(); |
| } else if (selectProjection != null) { |
| return selectProjection.getOutputRowSignature(); |
| } else { |
| return sourceRowSignature; |
| } |
| } |
| |
| private VirtualColumns getVirtualColumns(final boolean includeDimensions) |
| { |
| // 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we |
| // still want to collect the set of VirtualColumns this way to ensure we only add what is still being used after |
| // the various transforms and optimizations |
| Set<VirtualColumn> virtualColumns = new HashSet<>(); |
| |
| // we always want to add any virtual columns used by the query level DimFilter |
| if (filter != null) { |
| for (String columnName : filter.getRequiredColumns()) { |
| if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) { |
| virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName)); |
| } |
| } |
| } |
| |
| if (selectProjection != null) { |
| virtualColumns.addAll(selectProjection.getVirtualColumns()); |
| } |
| |
| if (grouping != null) { |
| if (includeDimensions) { |
| for (DimensionExpression expression : grouping.getDimensions()) { |
| if (virtualColumnRegistry.isVirtualColumnDefined(expression.getVirtualColumn())) { |
| virtualColumns.add(virtualColumnRegistry.getVirtualColumn(expression.getVirtualColumn())); |
| } |
| } |
| } |
| |
| for (Aggregation aggregation : grouping.getAggregations()) { |
| virtualColumns.addAll(virtualColumnRegistry.findVirtualColumns(aggregation.getRequiredColumns())); |
| } |
| } |
| |
| if (sorting != null && sorting.getProjection() != null && grouping == null) { |
| // Sorting without grouping means we might have some post-sort Projection virtual columns. |
| virtualColumns.addAll(sorting.getProjection().getVirtualColumns()); |
| } |
| |
| // sort for predictable output |
| List<VirtualColumn> columns = new ArrayList<>(virtualColumns); |
| columns.sort(Comparator.comparing(VirtualColumn::getOutputName)); |
| return VirtualColumns.create(columns); |
| } |
| |
| /** |
| * Returns a pair of DataSource and Filtration object created on the query filter. In case the, data source is |
| * a join datasource, the datasource may be altered and left filter of join datasource may |
| * be rid of time filters. |
| * TODO: should we optimize the base table filter just like we do with query filters |
| */ |
| @VisibleForTesting |
| static Pair<DataSource, Filtration> getFiltration( |
| DataSource dataSource, |
| DimFilter filter, |
| VirtualColumnRegistry virtualColumnRegistry |
| ) |
| { |
| if (!(dataSource instanceof JoinDataSource)) { |
| return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry)); |
| } |
| JoinDataSource joinDataSource = (JoinDataSource) dataSource; |
| if (joinDataSource.getLeftFilter() == null) { |
| return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry)); |
| } |
| //TODO: We should avoid promoting the time filter as interval for right outer and full outer joins. This is not |
| // done now as we apply the intervals to left base table today irrespective of the join type. |
| |
| // If the join is left or inner, we can pull the intervals up to the query. This is done |
| // so that broker can prune the segments to query. |
| Filtration leftFiltration = Filtration.create(joinDataSource.getLeftFilter()) |
| .optimize(virtualColumnRegistry.getFullRowSignature()); |
| // Adds the intervals from the join left filter to query filtration |
| Filtration queryFiltration = Filtration.create(filter, leftFiltration.getIntervals()) |
| .optimize(virtualColumnRegistry.getFullRowSignature()); |
| JoinDataSource newDataSource = JoinDataSource.create( |
| joinDataSource.getLeft(), |
| joinDataSource.getRight(), |
| joinDataSource.getRightPrefix(), |
| joinDataSource.getConditionAnalysis(), |
| joinDataSource.getJoinType(), |
| leftFiltration.getDimFilter() |
| ); |
| return Pair.of(newDataSource, queryFiltration); |
| } |
| |
| private static Filtration toFiltration(DimFilter filter, VirtualColumnRegistry virtualColumnRegistry) |
| { |
| return Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); |
| } |
| |
| public DataSource getDataSource() |
| { |
| return dataSource; |
| } |
| |
| @Nullable |
| public Grouping getGrouping() |
| { |
| return grouping; |
| } |
| |
| public RelDataType getOutputRowType() |
| { |
| return outputRowType; |
| } |
| |
| public RowSignature getOutputRowSignature() |
| { |
| return outputRowSignature; |
| } |
| |
| public Query getQuery() |
| { |
| return query; |
| } |
| |
| /** |
| * Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery}, |
| * {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery} |
| * |
| * @return Druid query |
| */ |
| private Query computeQuery() |
| { |
| if (dataSource instanceof QueryDataSource) { |
| // If there is a subquery then the outer query must be a groupBy. |
| final GroupByQuery outerQuery = toGroupByQuery(); |
| |
| if (outerQuery == null) { |
| // Bug in the planner rules. They shouldn't allow this to happen. |
| throw new IllegalStateException("Can't use QueryDataSource without an outer groupBy query!"); |
| } |
| |
| return outerQuery; |
| } |
| |
| final TimeseriesQuery tsQuery = toTimeseriesQuery(); |
| if (tsQuery != null) { |
| return tsQuery; |
| } |
| |
| final TopNQuery topNQuery = toTopNQuery(); |
| if (topNQuery != null) { |
| return topNQuery; |
| } |
| |
| final GroupByQuery groupByQuery = toGroupByQuery(); |
| if (groupByQuery != null) { |
| return groupByQuery; |
| } |
| |
| final ScanQuery scanQuery = toScanQuery(); |
| if (scanQuery != null) { |
| return scanQuery; |
| } |
| |
| throw new CannotBuildQueryException("Cannot convert query parts into an actual query"); |
| } |
| |
| /** |
| * Return this query as a Timeseries query, or null if this query is not compatible with Timeseries. |
| * |
| * @return query |
| */ |
| @Nullable |
| public TimeseriesQuery toTimeseriesQuery() |
| { |
| if (grouping == null |
| || grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs()) |
| || grouping.getHavingFilter() != null) { |
| return null; |
| } |
| |
| if (sorting != null && sorting.getOffsetLimit().hasOffset()) { |
| // Timeseries cannot handle offsets. |
| return null; |
| } |
| |
| final Granularity queryGranularity; |
| final boolean descending; |
| int timeseriesLimit = 0; |
| final Map<String, Object> theContext = new HashMap<>(); |
| if (grouping.getDimensions().isEmpty()) { |
| queryGranularity = Granularities.ALL; |
| descending = false; |
| } else if (grouping.getDimensions().size() == 1) { |
| final DimensionExpression dimensionExpression = Iterables.getOnlyElement(grouping.getDimensions()); |
| queryGranularity = Expressions.toQueryGranularity( |
| dimensionExpression.getDruidExpression(), |
| plannerContext.getExprMacroTable() |
| ); |
| |
| if (queryGranularity == null) { |
| // Timeseries only applies if the single dimension is granular __time. |
| return null; |
| } |
| theContext.put( |
| TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, |
| Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec().getOutputName() |
| ); |
| if (sorting != null) { |
| if (sorting.getOffsetLimit().hasLimit()) { |
| final long limit = sorting.getOffsetLimit().getLimit(); |
| |
| if (limit == 0) { |
| // Can't handle zero limit (the Timeseries query engine would treat it as unlimited). |
| return null; |
| } |
| |
| timeseriesLimit = Ints.checkedCast(limit); |
| } |
| |
| switch (sorting.getSortKind(dimensionExpression.getOutputName())) { |
| case UNORDERED: |
| case TIME_ASCENDING: |
| descending = false; |
| break; |
| case TIME_DESCENDING: |
| descending = true; |
| break; |
| default: |
| // Sorting on a metric, maybe. Timeseries cannot handle. |
| return null; |
| } |
| } else { |
| // No limitSpec. |
| descending = false; |
| } |
| } else { |
| // More than one dimension, timeseries cannot handle. |
| return null; |
| } |
| |
| // An aggregation query should return one row per group, with no grouping (e.g. ALL granularity), the entire table |
| // is the group, so we should not skip empty buckets. When there are no results, this means we return the |
| // initialized state for given aggregators instead of nothing. |
| if (!Granularities.ALL.equals(queryGranularity)) { |
| theContext.put(TimeseriesQuery.SKIP_EMPTY_BUCKETS, true); |
| } |
| theContext.putAll(plannerContext.getQueryContext()); |
| |
| final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration( |
| dataSource, |
| filter, |
| virtualColumnRegistry |
| ); |
| final DataSource newDataSource = dataSourceFiltrationPair.lhs; |
| final Filtration filtration = dataSourceFiltrationPair.rhs; |
| |
| final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators()); |
| if (sorting != null && sorting.getProjection() != null) { |
| postAggregators.addAll(sorting.getProjection().getPostAggregators()); |
| } |
| |
| return new TimeseriesQuery( |
| newDataSource, |
| filtration.getQuerySegmentSpec(), |
| descending, |
| getVirtualColumns(false), |
| filtration.getDimFilter(), |
| queryGranularity, |
| grouping.getAggregatorFactories(), |
| postAggregators, |
| timeseriesLimit, |
| ImmutableSortedMap.copyOf(theContext) |
| ); |
| } |
| |
| /** |
| * Return this query as a TopN query, or null if this query is not compatible with TopN. |
| * |
| * @return query or null |
| */ |
| @Nullable |
| public TopNQuery toTopNQuery() |
| { |
| // Must have GROUP BY one column, no GROUPING SETS, ORDER BY ≤ 1 column, LIMIT > 0 and ≤ maxTopNLimit, |
| // no OFFSET, no HAVING. |
| final boolean topNOk = grouping != null |
| && grouping.getDimensions().size() == 1 |
| && !grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs()) |
| && sorting != null |
| && (sorting.getOrderBys().size() <= 1 |
| && sorting.getOffsetLimit().hasLimit() |
| && sorting.getOffsetLimit().getLimit() > 0 |
| && sorting.getOffsetLimit().getLimit() <= plannerContext.getPlannerConfig() |
| .getMaxTopNLimit() |
| && !sorting.getOffsetLimit().hasOffset()) |
| && grouping.getHavingFilter() == null; |
| |
| if (!topNOk) { |
| return null; |
| } |
| |
| final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec(); |
| final OrderByColumnSpec limitColumn; |
| if (sorting.getOrderBys().isEmpty()) { |
| limitColumn = new OrderByColumnSpec( |
| dimensionSpec.getOutputName(), |
| OrderByColumnSpec.Direction.ASCENDING, |
| Calcites.getStringComparatorForValueType(dimensionSpec.getOutputType()) |
| ); |
| } else { |
| limitColumn = Iterables.getOnlyElement(sorting.getOrderBys()); |
| } |
| final TopNMetricSpec topNMetricSpec; |
| |
| if (limitColumn.getDimension().equals(dimensionSpec.getOutputName())) { |
| // DimensionTopNMetricSpec is exact; always return it even if allowApproximate is false. |
| final DimensionTopNMetricSpec baseMetricSpec = new DimensionTopNMetricSpec( |
| null, |
| limitColumn.getDimensionComparator() |
| ); |
| topNMetricSpec = limitColumn.getDirection() == OrderByColumnSpec.Direction.ASCENDING |
| ? baseMetricSpec |
| : new InvertedTopNMetricSpec(baseMetricSpec); |
| } else if (plannerContext.getPlannerConfig().isUseApproximateTopN()) { |
| // ORDER BY metric |
| final NumericTopNMetricSpec baseMetricSpec = new NumericTopNMetricSpec(limitColumn.getDimension()); |
| topNMetricSpec = limitColumn.getDirection() == OrderByColumnSpec.Direction.ASCENDING |
| ? new InvertedTopNMetricSpec(baseMetricSpec) |
| : baseMetricSpec; |
| } else { |
| return null; |
| } |
| |
| final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration( |
| dataSource, |
| filter, |
| virtualColumnRegistry |
| ); |
| final DataSource newDataSource = dataSourceFiltrationPair.lhs; |
| final Filtration filtration = dataSourceFiltrationPair.rhs; |
| |
| final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators()); |
| if (sorting.getProjection() != null) { |
| postAggregators.addAll(sorting.getProjection().getPostAggregators()); |
| } |
| |
| return new TopNQuery( |
| newDataSource, |
| getVirtualColumns(true), |
| dimensionSpec, |
| topNMetricSpec, |
| Ints.checkedCast(sorting.getOffsetLimit().getLimit()), |
| filtration.getQuerySegmentSpec(), |
| filtration.getDimFilter(), |
| Granularities.ALL, |
| grouping.getAggregatorFactories(), |
| postAggregators, |
| ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) |
| ); |
| } |
| |
| /** |
| * Return this query as a GroupBy query, or null if this query is not compatible with GroupBy. |
| * |
| * @return query or null |
| */ |
| @Nullable |
| public GroupByQuery toGroupByQuery() |
| { |
| if (grouping == null) { |
| return null; |
| } |
| |
| if (sorting != null && sorting.getOffsetLimit().hasLimit() && sorting.getOffsetLimit().getLimit() <= 0) { |
| // Cannot handle zero or negative limits. |
| return null; |
| } |
| |
| final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration( |
| dataSource, |
| filter, |
| virtualColumnRegistry |
| ); |
| final DataSource newDataSource = dataSourceFiltrationPair.lhs; |
| final Filtration filtration = dataSourceFiltrationPair.rhs; |
| |
| final DimFilterHavingSpec havingSpec; |
| if (grouping.getHavingFilter() != null) { |
| havingSpec = new DimFilterHavingSpec( |
| Filtration.create(grouping.getHavingFilter()) |
| .optimizeFilterOnly(grouping.getOutputRowSignature()) |
| .getDimFilter(), |
| true |
| ); |
| } else { |
| havingSpec = null; |
| } |
| final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators()); |
| if (sorting != null && sorting.getProjection() != null) { |
| postAggregators.addAll(sorting.getProjection().getPostAggregators()); |
| } |
| |
| GroupByQuery query = new GroupByQuery( |
| newDataSource, |
| filtration.getQuerySegmentSpec(), |
| getVirtualColumns(true), |
| filtration.getDimFilter(), |
| Granularities.ALL, |
| grouping.getDimensionSpecs(), |
| grouping.getAggregatorFactories(), |
| postAggregators, |
| havingSpec, |
| Optional.ofNullable(sorting).orElse(Sorting.none()).limitSpec(), |
| grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()), |
| ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) |
| ); |
| // We don't apply timestamp computation optimization yet when limit is pushed down. Maybe someday. |
| if (query.getLimitSpec() instanceof DefaultLimitSpec && query.isApplyLimitPushDown()) { |
| return query; |
| } |
| Map<String, Object> theContext = new HashMap<>(); |
| |
| Granularity queryGranularity = null; |
| |
| // sql like "group by city_id,time_floor(__time to day)", |
| // the original translated query is granularity=all and dimensions:[d0, d1] |
| // the better plan is granularity=day and dimensions:[d0] |
| // but the ResultRow structure is changed from [d0, d1] to [__time, d0] |
| // this structure should be fixed as [d0, d1] (actually it is [d0, __time]) before postAggs are called. |
| // |
| // the above is the general idea of this optimization. |
| // but from coding perspective, the granularity=all and "d0" dimension are referenced by many places, |
| // eg: subtotals, having, grouping set, post agg, |
| // there would be many many places need to be fixed if "d0" dimension is removed from query.dimensions |
| // and the same to the granularity change. |
| // so from easier coding perspective, this optimization is coded as groupby engine-level inner process change. |
| // the most part of codes are in GroupByStrategyV2 about the process change between broker and compute node. |
| // the basic logic like nested queries and subtotals are kept unchanged, |
| // they will still see the granularity=all and the "d0" dimension. |
| // |
| // the tradeoff is that GroupByStrategyV2 behaviors differently according to the below query contexts. |
| // in another word, |
| // the query generated by "explain plan for select ..." doesn't match to the native query ACTUALLY being executed, |
| // the granularity and dimensions are slightly different. |
| // now, part of the query plan logic is handled in GroupByStrategyV2. |
| if (!grouping.getDimensions().isEmpty()) { |
| for (DimensionExpression dimensionExpression : grouping.getDimensions()) { |
| Granularity granularity = Expressions.toQueryGranularity( |
| dimensionExpression.getDruidExpression(), |
| plannerContext.getExprMacroTable() |
| ); |
| if (granularity == null) { |
| continue; |
| } |
| if (queryGranularity != null) { |
| // group by more than one timestamp_floor |
| // eg: group by timestamp_floor(__time to DAY),timestamp_floor(__time, to HOUR) |
| queryGranularity = null; |
| break; |
| } |
| queryGranularity = granularity; |
| int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression); |
| // these settings will only affect the most inner query sent to the down streaming compute nodes |
| theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, dimensionExpression.getOutputName()); |
| theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, timestampDimensionIndexInDimensions); |
| theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, queryGranularity); |
| } |
| } |
| if (queryGranularity == null) { |
| return query; |
| } |
| return query.withOverriddenContext(theContext); |
| } |
| |
| /** |
| * Return this query as a Scan query, or null if this query is not compatible with Scan. |
| * |
| * @return query or null |
| */ |
| @Nullable |
| public ScanQuery toScanQuery() |
| { |
| if (grouping != null) { |
| // Scan cannot GROUP BY. |
| return null; |
| } |
| |
| if (outputRowSignature.size() == 0) { |
| // Should never do a scan query without any columns that we're interested in. This is probably a planner bug. |
| throw new ISE("Cannot convert to Scan query without any columns."); |
| } |
| |
| final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration( |
| dataSource, |
| filter, |
| virtualColumnRegistry |
| ); |
| final DataSource newDataSource = dataSourceFiltrationPair.lhs; |
| final Filtration filtration = dataSourceFiltrationPair.rhs; |
| |
| final ScanQuery.Order order; |
| long scanOffset = 0L; |
| long scanLimit = 0L; |
| |
| if (sorting != null) { |
| scanOffset = sorting.getOffsetLimit().getOffset(); |
| |
| if (sorting.getOffsetLimit().hasLimit()) { |
| final long limit = sorting.getOffsetLimit().getLimit(); |
| |
| if (limit == 0) { |
| // Can't handle zero limit (the Scan query engine would treat it as unlimited). |
| return null; |
| } |
| |
| scanLimit = limit; |
| } |
| |
| final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME); |
| |
| if (sortKind == Sorting.SortKind.UNORDERED) { |
| order = ScanQuery.Order.NONE; |
| } else if (sortKind == Sorting.SortKind.TIME_ASCENDING) { |
| order = ScanQuery.Order.ASCENDING; |
| } else if (sortKind == Sorting.SortKind.TIME_DESCENDING) { |
| order = ScanQuery.Order.DESCENDING; |
| } else { |
| assert sortKind == Sorting.SortKind.NON_TIME; |
| |
| // Scan cannot ORDER BY non-time columns. |
| return null; |
| } |
| } else { |
| order = ScanQuery.Order.NONE; |
| } |
| |
| // Compute the list of columns to select. |
| final Set<String> columns = new HashSet<>(outputRowSignature.getColumnNames()); |
| |
| if (order != ScanQuery.Order.NONE) { |
| columns.add(ColumnHolder.TIME_COLUMN_NAME); |
| } |
| |
| return new ScanQuery( |
| newDataSource, |
| filtration.getQuerySegmentSpec(), |
| getVirtualColumns(true), |
| ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, |
| 0, |
| scanOffset, |
| scanLimit, |
| order, |
| filtration.getDimFilter(), |
| Ordering.natural().sortedCopy(columns), |
| false, |
| ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) |
| ); |
| } |
| } |