| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.query.groupby; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonIgnore; |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Ordering; |
| import com.google.common.primitives.Longs; |
| import it.unimi.dsi.fastutil.ints.IntArrayList; |
| import it.unimi.dsi.fastutil.ints.IntList; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.query.BaseQuery; |
| import org.apache.druid.query.DataSource; |
| import org.apache.druid.query.Queries; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryDataSource; |
| import org.apache.druid.query.TableDataSource; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.PostAggregator; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.dimension.DimensionSpec; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.query.groupby.having.HavingSpec; |
| import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; |
| import org.apache.druid.query.groupby.orderby.LimitSpec; |
| import org.apache.druid.query.groupby.orderby.NoopLimitSpec; |
| import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; |
| import org.apache.druid.query.ordering.StringComparator; |
| import org.apache.druid.query.ordering.StringComparators; |
| import org.apache.druid.query.spec.LegacySegmentSpec; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.segment.DimensionHandlerUtils; |
| import org.apache.druid.segment.VirtualColumn; |
| import org.apache.druid.segment.VirtualColumns; |
| import org.apache.druid.segment.column.ColumnHolder; |
| import org.apache.druid.segment.column.RowSignature; |
| import org.apache.druid.segment.column.ValueType; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nullable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| /** |
| * |
| */ |
| public class GroupByQuery extends BaseQuery<ResultRow> |
| { |
| public static final String CTX_KEY_SORT_BY_DIMS_FIRST = "sortByDimsFirst"; |
| public static final String CTX_TIMESTAMP_RESULT_FIELD = "timestampResultField"; |
| public static final String CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY = "timestampResultFieldGranularity"; |
| public static final String CTX_TIMESTAMP_RESULT_FIELD_INDEX = "timestampResultFieldInOriginalDimensions"; |
| private static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; |
| |
| private static final Comparator<ResultRow> NON_GRANULAR_TIME_COMP = |
| (ResultRow lhs, ResultRow rhs) -> Longs.compare(lhs.getLong(0), rhs.getLong(0)); |
| |
| public static Builder builder() |
| { |
| return new Builder(); |
| } |
| |
| private final VirtualColumns virtualColumns; |
| private final LimitSpec limitSpec; |
| @Nullable |
| private final HavingSpec havingSpec; |
| @Nullable |
| private final DimFilter dimFilter; |
| private final List<DimensionSpec> dimensions; |
| private final List<AggregatorFactory> aggregatorSpecs; |
| private final List<PostAggregator> postAggregatorSpecs; |
| @Nullable |
| private final List<List<String>> subtotalsSpec; |
| |
| private final Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn; |
| private final RowSignature resultRowSignature; |
| |
| /** |
| * This is set when we know that all rows will have the same timestamp, and allows us to not actually store |
| * and track it throughout the query execution process. |
| */ |
| @Nullable |
| private final DateTime universalTimestamp; |
| |
| private final boolean canDoLimitPushDown; |
| |
| /** |
| * A flag to force limit pushdown to historicals. |
| * Lazily initialized when calling {@link #validateAndGetForceLimitPushDown()}. |
| */ |
| @Nullable |
| private Boolean forceLimitPushDown; |
| |
| @JsonCreator |
| public GroupByQuery( |
| @JsonProperty("dataSource") DataSource dataSource, |
| @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, |
| @JsonProperty("virtualColumns") VirtualColumns virtualColumns, |
| @JsonProperty("filter") @Nullable DimFilter dimFilter, |
| @JsonProperty("granularity") Granularity granularity, |
| @JsonProperty("dimensions") List<DimensionSpec> dimensions, |
| @JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs, |
| @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs, |
| @JsonProperty("having") @Nullable HavingSpec havingSpec, |
| @JsonProperty("limitSpec") @Nullable LimitSpec limitSpec, |
| @JsonProperty("subtotalsSpec") @Nullable List<List<String>> subtotalsSpec, |
| @JsonProperty("context") Map<String, Object> context |
| ) |
| { |
| this( |
| dataSource, |
| querySegmentSpec, |
| virtualColumns, |
| dimFilter, |
| granularity, |
| dimensions, |
| aggregatorSpecs, |
| postAggregatorSpecs, |
| havingSpec, |
| limitSpec, |
| subtotalsSpec, |
| null, |
| context |
| ); |
| } |
| |
| private Function<Sequence<ResultRow>, Sequence<ResultRow>> makePostProcessingFn() |
| { |
| Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn = limitSpec.build(this); |
| |
| if (havingSpec != null) { |
| postProcessingFn = Functions.compose( |
| postProcessingFn, |
| (Sequence<ResultRow> input) -> { |
| havingSpec.setQuery(this); |
| return Sequences.filter(input, havingSpec::eval); |
| } |
| ); |
| } |
| return postProcessingFn; |
| } |
| |
| /** |
| * A private constructor that avoids recomputing postProcessingFn. |
| */ |
| private GroupByQuery( |
| final DataSource dataSource, |
| final QuerySegmentSpec querySegmentSpec, |
| final VirtualColumns virtualColumns, |
| final @Nullable DimFilter dimFilter, |
| final Granularity granularity, |
| final @Nullable List<DimensionSpec> dimensions, |
| final @Nullable List<AggregatorFactory> aggregatorSpecs, |
| final @Nullable List<PostAggregator> postAggregatorSpecs, |
| final @Nullable HavingSpec havingSpec, |
| final @Nullable LimitSpec limitSpec, |
| final @Nullable List<List<String>> subtotalsSpec, |
| final @Nullable Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn, |
| final Map<String, Object> context |
| ) |
| { |
| super(dataSource, querySegmentSpec, false, context, granularity); |
| |
| this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); |
| this.dimFilter = dimFilter; |
| this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; |
| for (DimensionSpec spec : this.dimensions) { |
| Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec"); |
| } |
| |
| this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; |
| this.postAggregatorSpecs = Queries.prepareAggregations( |
| this.dimensions.stream().map(DimensionSpec::getOutputName).collect(Collectors.toList()), |
| this.aggregatorSpecs, |
| postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs |
| ); |
| |
| // Verify no duplicate names between dimensions, aggregators, and postAggregators. |
| // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. |
| verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); |
| |
| this.universalTimestamp = computeUniversalTimestamp(); |
| this.resultRowSignature = computeResultRowSignature(); |
| this.havingSpec = havingSpec; |
| this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); |
| this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, this.dimensions); |
| |
| this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn(); |
| |
| // Check if limit push down configuration is valid and check if limit push down will be applied |
| this.canDoLimitPushDown = canDoLimitPushDown( |
| this.limitSpec, |
| this.havingSpec, |
| this.subtotalsSpec |
| ); |
| } |
| |
| @Nullable |
| private List<List<String>> verifySubtotalsSpec( |
| @Nullable List<List<String>> subtotalsSpec, |
| List<DimensionSpec> dimensions |
| ) |
| { |
| // if subtotalsSpec exists then validate that all are subsets of dimensions spec. |
| if (subtotalsSpec != null) { |
| for (List<String> subtotalSpec : subtotalsSpec) { |
| for (String s : subtotalSpec) { |
| boolean found = false; |
| for (DimensionSpec ds : dimensions) { |
| if (s.equals(ds.getOutputName())) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| throw new IAE( |
| "Subtotal spec %s is either not a subset of top level dimensions.", |
| subtotalSpec |
| ); |
| } |
| } |
| } |
| } |
| |
| return subtotalsSpec; |
| } |
| |
| @JsonProperty |
| @Override |
| public VirtualColumns getVirtualColumns() |
| { |
| return virtualColumns; |
| } |
| |
| @Nullable |
| @JsonProperty("filter") |
| public DimFilter getDimFilter() |
| { |
| return dimFilter; |
| } |
| |
| @JsonProperty |
| public List<DimensionSpec> getDimensions() |
| { |
| return dimensions; |
| } |
| |
| @JsonProperty("aggregations") |
| public List<AggregatorFactory> getAggregatorSpecs() |
| { |
| return aggregatorSpecs; |
| } |
| |
| @JsonProperty("postAggregations") |
| public List<PostAggregator> getPostAggregatorSpecs() |
| { |
| return postAggregatorSpecs; |
| } |
| |
| @JsonProperty("having") |
| public HavingSpec getHavingSpec() |
| { |
| return havingSpec; |
| } |
| |
| @JsonProperty |
| public LimitSpec getLimitSpec() |
| { |
| return limitSpec; |
| } |
| |
| @JsonInclude(JsonInclude.Include.NON_NULL) |
| @JsonProperty("subtotalsSpec") |
| @Nullable |
| public List<List<String>> getSubtotalsSpec() |
| { |
| return subtotalsSpec; |
| } |
| |
| /** |
| * Returns a list of field names, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the |
| * order that they will appear in ResultRows for this query. |
| * |
| * @see ResultRow for documentation about the order that fields will be in |
| */ |
| public RowSignature getResultRowSignature() |
| { |
| return resultRowSignature; |
| } |
| |
| /** |
| * Returns the size of ResultRows for this query when they do not include post-aggregators. |
| */ |
| public int getResultRowSizeWithoutPostAggregators() |
| { |
| return getResultRowPostAggregatorStart(); |
| } |
| |
| /** |
| * Returns the size of ResultRows for this query when they include post-aggregators. |
| */ |
| public int getResultRowSizeWithPostAggregators() |
| { |
| return resultRowSignature.size(); |
| } |
| |
| /** |
| * If this query has a single universal timestamp, return it. Otherwise return null. |
| * |
| * If {@link #getIntervals()} is empty, there are no results (or timestamps) so this method returns null. |
| * |
| * This method will return a nonnull timestamp in the following two cases: |
| * |
| * 1) CTX_KEY_FUDGE_TIMESTAMP is set (in which case this timestamp will be returned). |
| * 2) Granularity is "ALL". |
| * |
| * If this method returns null, then {@link #getResultRowHasTimestamp()} will return true. The reverse is also true: |
| * if this method returns nonnull, then {@link #getResultRowHasTimestamp()} will return false. |
| */ |
| @Nullable |
| public DateTime getUniversalTimestamp() |
| { |
| return universalTimestamp; |
| } |
| |
| /** |
| * Returns true if ResultRows for this query include timestamps, false otherwise. |
| * |
| * @see #getUniversalTimestamp() for details about when timestamps are included in ResultRows |
| */ |
| public boolean getResultRowHasTimestamp() |
| { |
| return universalTimestamp == null; |
| } |
| |
| /** |
| * Returns the position of the first dimension in ResultRows for this query. |
| */ |
| public int getResultRowDimensionStart() |
| { |
| return getResultRowHasTimestamp() ? 1 : 0; |
| } |
| |
| /** |
| * Returns the position of the first aggregator in ResultRows for this query. |
| */ |
| public int getResultRowAggregatorStart() |
| { |
| return getResultRowDimensionStart() + dimensions.size(); |
| } |
| |
| /** |
| * Returns the position of the first post-aggregator in ResultRows for this query. |
| */ |
| public int getResultRowPostAggregatorStart() |
| { |
| return getResultRowAggregatorStart() + aggregatorSpecs.size(); |
| } |
| |
| @Override |
| public boolean hasFilters() |
| { |
| return dimFilter != null; |
| } |
| |
| @Override |
| @Nullable |
| public DimFilter getFilter() |
| { |
| return dimFilter; |
| } |
| |
| @Override |
| public String getType() |
| { |
| return GROUP_BY; |
| } |
| |
| @JsonIgnore |
| public boolean getContextSortByDimsFirst() |
| { |
| return getContextBoolean(CTX_KEY_SORT_BY_DIMS_FIRST, false); |
| } |
| |
| @JsonIgnore |
| public boolean isApplyLimitPushDown() |
| { |
| if (forceLimitPushDown == null) { |
| forceLimitPushDown = validateAndGetForceLimitPushDown(); |
| } |
| return forceLimitPushDown || canDoLimitPushDown; |
| } |
| |
| @JsonIgnore |
| public boolean getApplyLimitPushDownFromContext() |
| { |
| return getContextBoolean(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true); |
| } |
| |
| @Override |
| public Ordering getResultOrdering() |
| { |
| final Ordering<ResultRow> rowOrdering = getRowOrdering(false); |
| |
| return Ordering.from( |
| (lhs, rhs) -> { |
| if (lhs instanceof ResultRow) { |
| return rowOrdering.compare((ResultRow) lhs, (ResultRow) rhs); |
| } else { |
| //noinspection unchecked (Probably bySegment queries; see BySegmentQueryRunner for details) |
| return ((Ordering) Comparators.naturalNullsFirst()).compare(lhs, rhs); |
| } |
| } |
| ); |
| } |
| |
| private boolean validateAndGetForceLimitPushDown() |
| { |
| final boolean forcePushDown = getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, false); |
| if (forcePushDown) { |
| if (!(limitSpec instanceof DefaultLimitSpec)) { |
| throw new IAE("When forcing limit push down, a limit spec must be provided."); |
| } |
| |
| if (!((DefaultLimitSpec) limitSpec).isLimited()) { |
| throw new IAE("When forcing limit push down, the provided limit spec must have a limit."); |
| } |
| |
| if (havingSpec != null) { |
| throw new IAE("Cannot force limit push down when a having spec is present."); |
| } |
| |
| for (OrderByColumnSpec orderBySpec : ((DefaultLimitSpec) limitSpec).getColumns()) { |
| if (OrderByColumnSpec.getPostAggIndexForOrderBy(orderBySpec, postAggregatorSpecs) > -1) { |
| throw new UnsupportedOperationException("Limit push down when sorting by a post aggregator is not supported."); |
| } |
| } |
| } |
| return forcePushDown; |
| } |
| |
| private RowSignature computeResultRowSignature() |
| { |
| final RowSignature.Builder builder = RowSignature.builder(); |
| |
| if (universalTimestamp == null) { |
| builder.addTimeColumn(); |
| } |
| |
| return builder.addDimensions(dimensions) |
| .addAggregators(aggregatorSpecs) |
| .addPostAggregators(postAggregatorSpecs) |
| .build(); |
| } |
| |
| private boolean canDoLimitPushDown( |
| @Nullable LimitSpec limitSpec, |
| @Nullable HavingSpec havingSpec, |
| @Nullable List<List<String>> subtotalsSpec |
| ) |
| { |
| if (subtotalsSpec != null && !subtotalsSpec.isEmpty()) { |
| return false; |
| } |
| |
| if (limitSpec instanceof DefaultLimitSpec) { |
| DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit(); |
| |
| // If only applying an orderby without a limit, don't try to push down |
| if (!limitSpecWithoutOffset.isLimited()) { |
| return false; |
| } |
| |
| if (!getApplyLimitPushDownFromContext()) { |
| return false; |
| } |
| |
| if (havingSpec != null) { |
| return false; |
| } |
| |
| // If the sorting order only uses columns in the grouping key, we can always push the limit down |
| // to the buffer grouper without affecting result accuracy |
| boolean sortHasNonGroupingFields = DefaultLimitSpec.sortingOrderHasNonGroupingFields( |
| (DefaultLimitSpec) limitSpec, |
| getDimensions() |
| ); |
| |
| return !sortHasNonGroupingFields; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * When limit push down is applied, the partial results would be sorted by the ordering specified by the |
| * limit/order spec (unlike non-push down case where the results always use the default natural ascending order), |
| * so when merging these partial result streams, the merge needs to use the same ordering to get correct results. |
| */ |
| private Ordering<ResultRow> getRowOrderingForPushDown( |
| final boolean granular, |
| final DefaultLimitSpec limitSpec |
| ) |
| { |
| final boolean sortByDimsFirst = getContextSortByDimsFirst(); |
| |
| final IntList orderedFieldNumbers = new IntArrayList(); |
| final Set<Integer> dimsInOrderBy = new HashSet<>(); |
| final List<Boolean> needsReverseList = new ArrayList<>(); |
| final List<ValueType> dimensionTypes = new ArrayList<>(); |
| final List<StringComparator> comparators = new ArrayList<>(); |
| |
| for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) { |
| boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; |
| int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); |
| if (dimIndex >= 0) { |
| DimensionSpec dim = dimensions.get(dimIndex); |
| orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName())); |
| dimsInOrderBy.add(dimIndex); |
| needsReverseList.add(needsReverse); |
| final ValueType type = dimensions.get(dimIndex).getOutputType(); |
| dimensionTypes.add(type); |
| comparators.add(orderSpec.getDimensionComparator()); |
| } |
| } |
| |
| for (int i = 0; i < dimensions.size(); i++) { |
| if (!dimsInOrderBy.contains(i)) { |
| orderedFieldNumbers.add(resultRowSignature.indexOf(dimensions.get(i).getOutputName())); |
| needsReverseList.add(false); |
| final ValueType type = dimensions.get(i).getOutputType(); |
| dimensionTypes.add(type); |
| comparators.add(StringComparators.LEXICOGRAPHIC); |
| } |
| } |
| |
| final Comparator<ResultRow> timeComparator = getTimeComparator(granular); |
| |
| if (timeComparator == null) { |
| return Ordering.from( |
| (lhs, rhs) -> compareDimsForLimitPushDown( |
| orderedFieldNumbers, |
| needsReverseList, |
| dimensionTypes, |
| comparators, |
| lhs, |
| rhs |
| ) |
| ); |
| } else if (sortByDimsFirst) { |
| return Ordering.from( |
| (lhs, rhs) -> { |
| final int cmp = compareDimsForLimitPushDown( |
| orderedFieldNumbers, |
| needsReverseList, |
| dimensionTypes, |
| comparators, |
| lhs, |
| rhs |
| ); |
| if (cmp != 0) { |
| return cmp; |
| } |
| |
| return timeComparator.compare(lhs, rhs); |
| } |
| ); |
| } else { |
| return Ordering.from( |
| (lhs, rhs) -> { |
| final int timeCompare = timeComparator.compare(lhs, rhs); |
| |
| if (timeCompare != 0) { |
| return timeCompare; |
| } |
| |
| return compareDimsForLimitPushDown( |
| orderedFieldNumbers, |
| needsReverseList, |
| dimensionTypes, |
| comparators, |
| lhs, |
| rhs |
| ); |
| } |
| ); |
| } |
| } |
| |
| public Ordering<ResultRow> getRowOrdering(final boolean granular) |
| { |
| if (isApplyLimitPushDown()) { |
| if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) { |
| return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec); |
| } |
| } |
| |
| final boolean sortByDimsFirst = getContextSortByDimsFirst(); |
| final Comparator<ResultRow> timeComparator = getTimeComparator(granular); |
| |
| if (timeComparator == null) { |
| return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs)); |
| } else if (sortByDimsFirst) { |
| return Ordering.from( |
| (lhs, rhs) -> { |
| final int cmp = compareDims(dimensions, lhs, rhs); |
| if (cmp != 0) { |
| return cmp; |
| } |
| |
| return timeComparator.compare(lhs, rhs); |
| } |
| ); |
| } else { |
| return Ordering.from( |
| (lhs, rhs) -> { |
| final int timeCompare = timeComparator.compare(lhs, rhs); |
| |
| if (timeCompare != 0) { |
| return timeCompare; |
| } |
| |
| return compareDims(dimensions, lhs, rhs); |
| } |
| ); |
| } |
| } |
| |
| @Nullable |
| private Comparator<ResultRow> getTimeComparator(boolean granular) |
| { |
| if (Granularities.ALL.equals(getGranularity())) { |
| return null; |
| } else { |
| if (!getResultRowHasTimestamp()) { |
| // Sanity check (should never happen). |
| throw new ISE("Cannot do time comparisons!"); |
| } |
| |
| if (granular) { |
| return (lhs, rhs) -> Longs.compare( |
| getGranularity().bucketStart(lhs.getLong(0)), |
| getGranularity().bucketStart(rhs.getLong(0)) |
| ); |
| } else { |
| return NON_GRANULAR_TIME_COMP; |
| } |
| } |
| } |
| |
| private int compareDims(List<DimensionSpec> dimensions, ResultRow lhs, ResultRow rhs) |
| { |
| final int dimensionStart = getResultRowDimensionStart(); |
| |
| for (int i = 0; i < dimensions.size(); i++) { |
| DimensionSpec dimension = dimensions.get(i); |
| final int dimCompare = DimensionHandlerUtils.compareObjectsAsType( |
| lhs.get(dimensionStart + i), |
| rhs.get(dimensionStart + i), |
| dimension.getOutputType() |
| ); |
| if (dimCompare != 0) { |
| return dimCompare; |
| } |
| } |
| |
| return 0; |
| } |
| |
| /** |
| * Computes the timestamp that will be returned by {@link #getUniversalTimestamp()}. |
| */ |
| @Nullable |
| private DateTime computeUniversalTimestamp() |
| { |
| final String timestampStringFromContext = getContextValue(CTX_KEY_FUDGE_TIMESTAMP, ""); |
| final Granularity granularity = getGranularity(); |
| |
| if (!timestampStringFromContext.isEmpty()) { |
| return DateTimes.utc(Long.parseLong(timestampStringFromContext)); |
| } else if (Granularities.ALL.equals(granularity)) { |
| final List<Interval> intervals = getIntervals(); |
| if (intervals.isEmpty()) { |
| // null, the "universal timestamp" of nothing |
| return null; |
| } |
| final DateTime timeStart = intervals.get(0).getStart(); |
| return granularity.getIterable(new Interval(timeStart, timeStart.plus(1))).iterator().next().getStart(); |
| } else { |
| return null; |
| } |
| } |
| |
| private static int compareDimsForLimitPushDown( |
| final IntList fields, |
| final List<Boolean> needsReverseList, |
| final List<ValueType> dimensionTypes, |
| final List<StringComparator> comparators, |
| final ResultRow lhs, |
| final ResultRow rhs |
| ) |
| { |
| for (int i = 0; i < fields.size(); i++) { |
| final int fieldNumber = fields.getInt(i); |
| final StringComparator comparator = comparators.get(i); |
| final ValueType dimensionType = dimensionTypes.get(i); |
| |
| final int dimCompare; |
| final Object lhsObj = lhs.get(fieldNumber); |
| final Object rhsObj = rhs.get(fieldNumber); |
| |
| if (ValueType.isNumeric(dimensionType)) { |
| if (comparator.equals(StringComparators.NUMERIC)) { |
| dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType); |
| } else { |
| dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj)); |
| } |
| } else { |
| dimCompare = comparator.compare((String) lhsObj, (String) rhsObj); |
| } |
| |
| if (dimCompare != 0) { |
| return needsReverseList.get(i) ? -dimCompare : dimCompare; |
| } |
| } |
| return 0; |
| } |
| |
| /** |
| * Apply the havingSpec and limitSpec. Because havingSpecs are not thread safe, and because they are applied during |
| * accumulation of the returned sequence, callers must take care to avoid accumulating two different Sequences |
| * returned by this method in two different threads. |
| * |
| * @param results sequence of rows to apply havingSpec and limitSpec to |
| * |
| * @return sequence of rows after applying havingSpec and limitSpec |
| */ |
| public Sequence<ResultRow> postProcess(Sequence<ResultRow> results) |
| { |
| return postProcessingFn.apply(results); |
| } |
| |
| @Nullable |
| @Override |
| public Set<String> getRequiredColumns() |
| { |
| return Queries.computeRequiredColumns( |
| virtualColumns, |
| dimFilter, |
| dimensions, |
| aggregatorSpecs, |
| Collections.emptyList() |
| ); |
| } |
| |
| @Override |
| public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride) |
| { |
| return new Builder(this).overrideContext(contextOverride).build(); |
| } |
| |
| @Override |
| public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec) |
| { |
| return new Builder(this).setQuerySegmentSpec(spec).build(); |
| } |
| |
| public GroupByQuery withVirtualColumns(final VirtualColumns virtualColumns) |
| { |
| return new Builder(this).setVirtualColumns(virtualColumns).build(); |
| } |
| |
| public GroupByQuery withDimFilter(@Nullable final DimFilter dimFilter) |
| { |
| return new Builder(this).setDimFilter(dimFilter).build(); |
| } |
| |
| @Override |
| public Query<ResultRow> withDataSource(DataSource dataSource) |
| { |
| return new Builder(this).setDataSource(dataSource).build(); |
| } |
| |
| public GroupByQuery withDimensionSpecs(final List<DimensionSpec> dimensionSpecs) |
| { |
| return new Builder(this).setDimensions(dimensionSpecs).build(); |
| } |
| |
| public GroupByQuery withLimitSpec(LimitSpec limitSpec) |
| { |
| return new Builder(this).setLimitSpec(limitSpec).build(); |
| } |
| |
| public GroupByQuery withAggregatorSpecs(final List<AggregatorFactory> aggregatorSpecs) |
| { |
| return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build(); |
| } |
| |
| public GroupByQuery withSubtotalsSpec(@Nullable final List<List<String>> subtotalsSpec) |
| { |
| return new Builder(this).setSubtotalsSpec(subtotalsSpec).build(); |
| } |
| |
| public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs) |
| { |
| return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build(); |
| } |
| |
| private static void verifyOutputNames( |
| List<DimensionSpec> dimensions, |
| List<AggregatorFactory> aggregators, |
| List<PostAggregator> postAggregators |
| ) |
| { |
| final Set<String> outputNames = new HashSet<>(); |
| for (DimensionSpec dimension : dimensions) { |
| if (!outputNames.add(dimension.getOutputName())) { |
| throw new IAE("Duplicate output name[%s]", dimension.getOutputName()); |
| } |
| } |
| |
| for (AggregatorFactory aggregator : aggregators) { |
| if (!outputNames.add(aggregator.getName())) { |
| throw new IAE("Duplicate output name[%s]", aggregator.getName()); |
| } |
| } |
| |
| for (PostAggregator postAggregator : postAggregators) { |
| if (!outputNames.add(postAggregator.getName())) { |
| throw new IAE("Duplicate output name[%s]", postAggregator.getName()); |
| } |
| } |
| |
| if (outputNames.contains(ColumnHolder.TIME_COLUMN_NAME)) { |
| throw new IAE( |
| "'%s' cannot be used as an output name for dimensions, aggregators, or post-aggregators.", |
| ColumnHolder.TIME_COLUMN_NAME |
| ); |
| } |
| } |
| |
| public static class Builder |
| { |
| @Nullable |
| private static List<List<String>> copySubtotalSpec(@Nullable List<List<String>> subtotalsSpec) |
| { |
| if (subtotalsSpec == null) { |
| return null; |
| } |
| return subtotalsSpec.stream().map(ArrayList::new).collect(Collectors.toList()); |
| } |
| |
| private DataSource dataSource; |
| private QuerySegmentSpec querySegmentSpec; |
| private VirtualColumns virtualColumns; |
| @Nullable |
| private DimFilter dimFilter; |
| private Granularity granularity; |
| @Nullable |
| private List<DimensionSpec> dimensions; |
| @Nullable |
| private List<AggregatorFactory> aggregatorSpecs; |
| @Nullable |
| private List<PostAggregator> postAggregatorSpecs; |
| @Nullable |
| private HavingSpec havingSpec; |
| @Nullable |
| private Map<String, Object> context; |
| |
| @Nullable |
| private List<List<String>> subtotalsSpec = null; |
| @Nullable |
| private LimitSpec limitSpec = null; |
| @Nullable |
| private Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn; |
| private List<OrderByColumnSpec> orderByColumnSpecs = new ArrayList<>(); |
| private int limit = Integer.MAX_VALUE; |
| |
| public Builder() |
| { |
| } |
| |
| public Builder(GroupByQuery query) |
| { |
| dataSource = query.getDataSource(); |
| querySegmentSpec = query.getQuerySegmentSpec(); |
| virtualColumns = query.getVirtualColumns(); |
| dimFilter = query.getDimFilter(); |
| granularity = query.getGranularity(); |
| dimensions = query.getDimensions(); |
| aggregatorSpecs = query.getAggregatorSpecs(); |
| postAggregatorSpecs = query.getPostAggregatorSpecs(); |
| havingSpec = query.getHavingSpec(); |
| limitSpec = query.getLimitSpec(); |
| subtotalsSpec = query.subtotalsSpec; |
| postProcessingFn = query.postProcessingFn; |
| context = query.getContext(); |
| } |
| |
| public Builder(Builder builder) |
| { |
| dataSource = builder.dataSource; |
| querySegmentSpec = builder.querySegmentSpec; |
| virtualColumns = builder.virtualColumns; |
| dimFilter = builder.dimFilter; |
| granularity = builder.granularity; |
| dimensions = builder.dimensions; |
| aggregatorSpecs = builder.aggregatorSpecs; |
| postAggregatorSpecs = builder.postAggregatorSpecs; |
| havingSpec = builder.havingSpec; |
| limitSpec = builder.limitSpec; |
| subtotalsSpec = copySubtotalSpec(builder.subtotalsSpec); |
| postProcessingFn = builder.postProcessingFn; |
| limit = builder.limit; |
| orderByColumnSpecs = new ArrayList<>(builder.orderByColumnSpecs); |
| context = builder.context; |
| } |
| |
| public Builder setDataSource(DataSource dataSource) |
| { |
| this.dataSource = dataSource; |
| return this; |
| } |
| |
| public Builder setDataSource(String dataSource) |
| { |
| this.dataSource = new TableDataSource(dataSource); |
| return this; |
| } |
| |
| public Builder setDataSource(Query query) |
| { |
| this.dataSource = new QueryDataSource(query); |
| return this; |
| } |
| |
| public Builder setInterval(QuerySegmentSpec interval) |
| { |
| return setQuerySegmentSpec(interval); |
| } |
| |
| public Builder setInterval(List<Interval> intervals) |
| { |
| return setQuerySegmentSpec(new LegacySegmentSpec(intervals)); |
| } |
| |
| public Builder setInterval(Interval interval) |
| { |
| return setQuerySegmentSpec(new LegacySegmentSpec(interval)); |
| } |
| |
| public Builder setInterval(String interval) |
| { |
| return setQuerySegmentSpec(new LegacySegmentSpec(interval)); |
| } |
| |
| public Builder setVirtualColumns(VirtualColumns virtualColumns) |
| { |
| this.virtualColumns = Preconditions.checkNotNull(virtualColumns, "virtualColumns"); |
| return this; |
| } |
| |
| public Builder setVirtualColumns(VirtualColumn... virtualColumns) |
| { |
| this.virtualColumns = VirtualColumns.create(Arrays.asList(virtualColumns)); |
| return this; |
| } |
| |
| public Builder setLimit(int limit) |
| { |
| ensureExplicitLimitSpecNotSet(); |
| this.limit = limit; |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setSubtotalsSpec(@Nullable List<List<String>> subtotalsSpec) |
| { |
| this.subtotalsSpec = subtotalsSpec; |
| return this; |
| } |
| |
| public Builder addOrderByColumn(String dimension) |
| { |
| return addOrderByColumn(dimension, null); |
| } |
| |
| public Builder addOrderByColumn(String dimension, @Nullable OrderByColumnSpec.Direction direction) |
| { |
| return addOrderByColumn(new OrderByColumnSpec(dimension, direction)); |
| } |
| |
| public Builder addOrderByColumn(OrderByColumnSpec columnSpec) |
| { |
| ensureExplicitLimitSpecNotSet(); |
| this.orderByColumnSpecs.add(columnSpec); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setLimitSpec(LimitSpec limitSpec) |
| { |
| Preconditions.checkNotNull(limitSpec); |
| ensureFluentLimitsNotSet(); |
| this.limitSpec = limitSpec; |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| private void ensureExplicitLimitSpecNotSet() |
| { |
| if (limitSpec != null) { |
| throw new ISE("Ambiguous build, limitSpec[%s] already set", limitSpec); |
| } |
| } |
| |
| private void ensureFluentLimitsNotSet() |
| { |
| if (!(limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty())) { |
| throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs); |
| } |
| } |
| |
| public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) |
| { |
| this.querySegmentSpec = querySegmentSpec; |
| return this; |
| } |
| |
| public Builder setDimFilter(@Nullable DimFilter dimFilter) |
| { |
| this.dimFilter = dimFilter; |
| return this; |
| } |
| |
| public Builder setGranularity(Granularity granularity) |
| { |
| this.granularity = granularity; |
| return this; |
| } |
| |
| public Builder addDimension(String column) |
| { |
| return addDimension(column, column); |
| } |
| |
| public Builder addDimension(String column, String outputName) |
| { |
| return addDimension(new DefaultDimensionSpec(column, outputName)); |
| } |
| |
| public Builder addDimension(DimensionSpec dimension) |
| { |
| if (dimensions == null) { |
| dimensions = new ArrayList<>(); |
| } |
| |
| dimensions.add(dimension); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setDimensions(List<DimensionSpec> dimensions) |
| { |
| this.dimensions = Lists.newArrayList(dimensions); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setDimensions(DimensionSpec... dimensions) |
| { |
| this.dimensions = new ArrayList<>(Arrays.asList(dimensions)); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder addAggregator(AggregatorFactory aggregator) |
| { |
| if (aggregatorSpecs == null) { |
| aggregatorSpecs = new ArrayList<>(); |
| } |
| |
| aggregatorSpecs.add(aggregator); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setAggregatorSpecs(List<AggregatorFactory> aggregatorSpecs) |
| { |
| this.aggregatorSpecs = Lists.newArrayList(aggregatorSpecs); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setAggregatorSpecs(AggregatorFactory... aggregatorSpecs) |
| { |
| this.aggregatorSpecs = new ArrayList<>(Arrays.asList(aggregatorSpecs)); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setPostAggregatorSpecs(List<PostAggregator> postAggregatorSpecs) |
| { |
| this.postAggregatorSpecs = Lists.newArrayList(postAggregatorSpecs); |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder setContext(Map<String, Object> context) |
| { |
| this.context = context; |
| return this; |
| } |
| |
| public Builder randomQueryId() |
| { |
| return queryId(UUID.randomUUID().toString()); |
| } |
| |
| public Builder queryId(String queryId) |
| { |
| context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); |
| return this; |
| } |
| |
| public Builder overrideContext(Map<String, Object> contextOverride) |
| { |
| this.context = computeOverriddenContext(context, contextOverride); |
| return this; |
| } |
| |
| public Builder setHavingSpec(@Nullable HavingSpec havingSpec) |
| { |
| this.havingSpec = havingSpec; |
| this.postProcessingFn = null; |
| return this; |
| } |
| |
| public Builder copy() |
| { |
| return new Builder(this); |
| } |
| |
| public GroupByQuery build() |
| { |
| final LimitSpec theLimitSpec; |
| if (limitSpec == null) { |
| if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) { |
| theLimitSpec = NoopLimitSpec.instance(); |
| } else { |
| theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, 0, limit); |
| } |
| } else { |
| theLimitSpec = limitSpec; |
| } |
| |
| return new GroupByQuery( |
| dataSource, |
| querySegmentSpec, |
| virtualColumns, |
| dimFilter, |
| granularity, |
| dimensions, |
| aggregatorSpecs, |
| postAggregatorSpecs, |
| havingSpec, |
| theLimitSpec, |
| subtotalsSpec, |
| postProcessingFn, |
| context |
| ); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "GroupByQuery{" + |
| "dataSource='" + getDataSource() + '\'' + |
| ", querySegmentSpec=" + getQuerySegmentSpec() + |
| ", virtualColumns=" + virtualColumns + |
| ", limitSpec=" + limitSpec + |
| ", dimFilter=" + dimFilter + |
| ", granularity=" + getGranularity() + |
| ", dimensions=" + dimensions + |
| ", aggregatorSpecs=" + aggregatorSpecs + |
| ", postAggregatorSpecs=" + postAggregatorSpecs + |
| (subtotalsSpec != null ? (", subtotalsSpec=" + subtotalsSpec) : "") + |
| ", havingSpec=" + havingSpec + |
| ", context=" + getContext() + |
| '}'; |
| } |
| |
| @Override |
| public boolean equals(final Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| if (!super.equals(o)) { |
| return false; |
| } |
| final GroupByQuery that = (GroupByQuery) o; |
| return Objects.equals(virtualColumns, that.virtualColumns) && |
| Objects.equals(limitSpec, that.limitSpec) && |
| Objects.equals(havingSpec, that.havingSpec) && |
| Objects.equals(dimFilter, that.dimFilter) && |
| Objects.equals(dimensions, that.dimensions) && |
| Objects.equals(aggregatorSpecs, that.aggregatorSpecs) && |
| Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs) && |
| Objects.equals(subtotalsSpec, that.subtotalsSpec); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash( |
| super.hashCode(), |
| virtualColumns, |
| limitSpec, |
| havingSpec, |
| dimFilter, |
| dimensions, |
| aggregatorSpecs, |
| postAggregatorSpecs, |
| subtotalsSpec |
| ); |
| } |
| } |