blob: aa99cabeabf5a0c18295395a293fdcce5b5ebc0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.execute;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
import org.apache.phoenix.execute.visitor.ByteCountVisitor;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.execute.visitor.RowCountVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.iterate.AggregatingResultIterator;
import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator;
import org.apache.phoenix.iterate.ClientHashAggregatingResultIterator;
import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.OffsetResultIterator;
import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.TupleUtil;
import com.google.common.collect.Lists;
public class ClientAggregatePlan extends ClientProcessingPlan {
private final GroupBy groupBy;
private final Expression having;
private final ServerAggregators serverAggregators;
private final ClientAggregators clientAggregators;
private final boolean useHashAgg;
private OrderBy actualOutputOrderBy;
public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
Integer limit, Integer offset, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) {
super(context, statement, table, projector, limit, offset, where, orderBy, delegate);
this.groupBy = groupBy;
this.having = having;
this.clientAggregators = context.getAggregationManager().getAggregators();
// We must deserialize rather than clone based off of client aggregators because
// upon deserialization we create the server-side aggregators instead of the client-side
// aggregators. We use the Configuration directly here to avoid the expense of creating
// another one.
this.serverAggregators = ServerAggregators.deserialize(context.getScan()
.getAttribute(BaseScannerRegionObserver.AGGREGATORS), context.getConnection().getQueryServices().getConfiguration(), null);
// Extract hash aggregate hint, if any.
HintNode hints = statement.getHint();
useHashAgg = hints != null && hints.hasHint(HintNode.Hint.HASH_AGGREGATE);
this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, groupBy, context);
}
@Override
public Cost getCost() {
Double outputBytes = this.accept(new ByteCountVisitor());
Double inputRows = this.getDelegate().accept(new RowCountVisitor());
Double rowWidth = this.accept(new AvgRowWidthVisitor());
if (inputRows == null || outputBytes == null || rowWidth == null) {
return Cost.UNKNOWN;
}
double inputBytes = inputRows * rowWidth;
double rowsBeforeHaving = RowCountVisitor.aggregate(
RowCountVisitor.filter(
inputRows.doubleValue(),
RowCountVisitor.stripSkipScanFilter(
context.getScan().getFilter())),
groupBy);
double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
double bytesAfterHaving = rowWidth * rowsAfterHaving;
int parallelLevel = CostUtil.estimateParallelLevel(
false, context.getConnection().getQueryServices());
Cost cost = CostUtil.estimateAggregateCost(
inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
if (!orderBy.getOrderByExpressions().isEmpty()) {
Cost orderByCost = CostUtil.estimateOrderByCost(
bytesAfterHaving, outputBytes, parallelLevel);
cost = cost.plus(orderByCost);
}
return super.getCost().plus(cost);
}
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
iterator = new FilterResultIterator(iterator, where);
}
AggregatingResultIterator aggResultIterator;
if (groupBy.isEmpty()) {
aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
} else {
List<Expression> keyExpressions = groupBy.getKeyExpressions();
if (groupBy.isOrderPreserving()) {
aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
} else {
long thresholdBytes =
context.getConnection().getQueryServices().getProps().getLong(
QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
boolean spoolingEnabled =
context.getConnection().getQueryServices().getProps().getBoolean(
QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
for (Expression keyExpression : keyExpressions) {
/**
* Sort the result tuples by the GroupBy expressions.
* If some GroupBy expression is SortOrder.DESC, then sorted results on that expression are DESC, not ASC.
* for ClientAggregatePlan,the orderBy should not be OrderBy.REV_ROW_KEY_ORDER_BY, which is different from {@link AggregatePlan.OrderingResultIteratorFactory#newIterator}
**/
keyExpressionOrderBy.add(OrderByExpression.createByCheckIfOrderByReverse(keyExpression, false, true, false));
}
if (useHashAgg) {
// Pass in orderBy to apply any sort that has been optimized away
aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, orderBy);
} else {
iterator =
new OrderedResultIterator(iterator, keyExpressionOrderBy,
spoolingEnabled, thresholdBytes, null, null,
projector.getEstimatedRowByteSize());
aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
}
}
aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
}
if (having != null) {
aggResultIterator = new FilterAggregatingResultIterator(aggResultIterator, having);
}
if (statement.isDistinct() && statement.isAggregate()) { // Dedup on client if select distinct and aggregation
aggResultIterator = new DistinctAggregatingResultIterator(aggResultIterator, getProjector());
}
ResultIterator resultScanner = aggResultIterator;
if (orderBy.getOrderByExpressions().isEmpty()) {
if (offset != null) {
resultScanner = new OffsetResultIterator(resultScanner, offset);
}
if (limit != null) {
resultScanner = new LimitingResultIterator(resultScanner, limit);
}
} else {
long thresholdBytes =
context.getConnection().getQueryServices().getProps().getLong(
QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
boolean spoolingEnabled =
context.getConnection().getQueryServices().getProps().getBoolean(
QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
resultScanner =
new OrderedAggregatingResultIterator(aggResultIterator,
orderBy.getOrderByExpressions(), spoolingEnabled, thresholdBytes, limit,
offset);
}
if (context.getSequenceManager().getSequenceCount() > 0) {
resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager());
}
return resultScanner;
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
if (where != null) {
planSteps.add("CLIENT FILTER BY " + where.toString());
}
if (groupBy.isEmpty()) {
planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
} else if (groupBy.isOrderPreserving()) {
planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
} else if (useHashAgg) {
planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) {
planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString());
}
} else {
planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString());
planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
}
if (having != null) {
planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + having.toString());
}
if (statement.isDistinct() && statement.isAggregate()) {
planSteps.add("CLIENT DISTINCT ON " + projector.toString());
}
if (offset != null) {
planSteps.add("CLIENT OFFSET " + offset);
}
if (orderBy.getOrderByExpressions().isEmpty()) {
if (limit != null) {
planSteps.add("CLIENT " + limit + " ROW LIMIT");
}
} else {
planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderBy.getOrderByExpressions().toString());
}
if (context.getSequenceManager().getSequenceCount() > 0) {
int nSequences = context.getSequenceManager().getSequenceCount();
planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S"));
}
return new ExplainPlan(planSteps);
}
@Override
public GroupBy getGroupBy() {
return groupBy;
}
@Override
public <T> T accept(QueryPlanVisitor<T> visitor) {
return visitor.visit(this);
}
public Expression getHaving() {
return having;
}
private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
private final List<Expression> groupByExpressions;
public ClientGroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators, List<Expression> groupByExpressions) {
super(iterator, aggregators);
this.groupByExpressions = groupByExpressions;
}
@Override
protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
ImmutableBytesWritable ptr) throws SQLException {
try {
ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
ptr.set(key.get(), key.getOffset(), key.getLength());
return ptr;
} catch (IOException e) {
throw new SQLException(e);
}
}
@Override
protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
}
@Override
public String toString() {
return "ClientGroupedAggregatingResultIterator [resultIterator="
+ resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
+ groupByExpressions + "]";
}
}
private static class ClientUngroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
public ClientUngroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators) {
super(iterator, aggregators);
}
@Override
protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
ImmutableBytesWritable ptr) throws SQLException {
ptr.set(UNGROUPED_AGG_ROW_KEY);
return ptr;
}
@Override
protected Tuple wrapKeyValueAsResult(KeyValue keyValue)
throws SQLException {
return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
}
@Override
public String toString() {
return "ClientUngroupedAggregatingResultIterator [resultIterator="
+ resultIterator + ", aggregators=" + aggregators + "]";
}
}
private OrderBy convertActualOutputOrderBy(OrderBy orderBy, GroupBy groupBy, StatementContext statementContext) {
if(!orderBy.isEmpty()) {
return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy);
}
if(this.useHashAgg &&
!groupBy.isEmpty() &&
!groupBy.isOrderPreserving() &&
orderBy != OrderBy.FWD_ROW_KEY_ORDER_BY &&
orderBy != OrderBy.REV_ROW_KEY_ORDER_BY) {
return OrderBy.EMPTY_ORDER_BY;
}
return ExpressionUtil.convertGroupByToOrderBy(groupBy, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
}
@Override
public List<OrderBy> getOutputOrderBys() {
return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
}
}