| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.execute; |
| |
| import static org.apache.phoenix.util.NumberUtil.add; |
| import static org.apache.phoenix.util.NumberUtil.getMin; |
| |
| import java.sql.ParameterMetaData; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.phoenix.compile.ExplainPlan; |
| import org.apache.phoenix.compile.GroupByCompiler.GroupBy; |
| import org.apache.phoenix.compile.OrderByCompiler.OrderBy; |
| import org.apache.phoenix.compile.QueryPlan; |
| import org.apache.phoenix.compile.RowProjector; |
| import org.apache.phoenix.compile.ScanRanges; |
| import org.apache.phoenix.compile.StatementContext; |
| import org.apache.phoenix.execute.visitor.QueryPlanVisitor; |
| import org.apache.phoenix.iterate.ConcatResultIterator; |
| import org.apache.phoenix.iterate.DefaultParallelScanGrouper; |
| import org.apache.phoenix.iterate.LimitingResultIterator; |
| import org.apache.phoenix.iterate.MergeSortTopNResultIterator; |
| import org.apache.phoenix.iterate.OffsetResultIterator; |
| import org.apache.phoenix.iterate.ParallelScanGrouper; |
| import org.apache.phoenix.iterate.ResultIterator; |
| import org.apache.phoenix.iterate.UnionResultIterators; |
| import org.apache.phoenix.jdbc.PhoenixStatement.Operation; |
| import org.apache.phoenix.optimize.Cost; |
| import org.apache.phoenix.parse.FilterableStatement; |
| import org.apache.phoenix.query.KeyRange; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.util.ExpressionUtil; |
| |
| import com.google.common.collect.Sets; |
| |
| |
| public class UnionPlan implements QueryPlan { |
| private static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K |
| |
| private final TableRef tableRef; |
| private final FilterableStatement statement; |
| private final ParameterMetaData paramMetaData; |
| private final OrderBy orderBy; |
| private final StatementContext parentContext; |
| private final Integer limit; |
| private final Integer offset; |
| private final GroupBy groupBy; |
| private final RowProjector projector; |
| private final boolean isDegenerate; |
| private final List<QueryPlan> plans; |
| private UnionResultIterators iterators; |
| private Long estimatedRows; |
| private Long estimatedBytes; |
| private Long estimateInfoTs; |
| private boolean getEstimatesCalled; |
| |
| public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, |
| Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException { |
| this.parentContext = context; |
| this.statement = statement; |
| this.tableRef = table; |
| this.projector = projector; |
| this.limit = limit; |
| this.orderBy = orderBy; |
| this.groupBy = groupBy; |
| this.plans = plans; |
| this.offset= offset; |
| this.paramMetaData = paramMetaData; |
| boolean isDegen = true; |
| for (QueryPlan plan : plans) { |
| if (plan.getContext().getScanRanges() != ScanRanges.NOTHING) { |
| isDegen = false; |
| break; |
| } |
| } |
| this.isDegenerate = isDegen; |
| } |
| |
| @Override |
| public boolean isDegenerate() { |
| return isDegenerate; |
| } |
| |
| @Override |
| public List<KeyRange> getSplits() { |
| if (iterators == null) |
| return null; |
| return iterators.getSplits(); |
| } |
| |
| @Override |
| public List<List<Scan>> getScans() { |
| if (iterators == null) |
| return null; |
| return iterators.getScans(); |
| } |
| |
| public List<QueryPlan> getSubPlans() { |
| return plans; |
| } |
| |
| @Override |
| public GroupBy getGroupBy() { |
| return groupBy; |
| } |
| |
| @Override |
| public OrderBy getOrderBy() { |
| return orderBy; |
| } |
| |
| @Override |
| public TableRef getTableRef() { |
| return tableRef; |
| } |
| |
| @Override |
| public Integer getLimit() { |
| return limit; |
| } |
| |
| @Override |
| public Integer getOffset() { |
| return offset; |
| } |
| |
| @Override |
| public RowProjector getProjector() { |
| return projector; |
| } |
| |
| @Override |
| public ResultIterator iterator() throws SQLException { |
| return iterator(DefaultParallelScanGrouper.getInstance()); |
| } |
| |
| @Override |
| public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { |
| return iterator(scanGrouper, null); |
| } |
| |
| @Override |
| public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { |
| this.iterators = new UnionResultIterators(plans, parentContext); |
| ResultIterator scanner; |
| boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); |
| |
| if (isOrdered) { // TopN |
| scanner = new MergeSortTopNResultIterator(iterators, limit, offset, orderBy.getOrderByExpressions()); |
| } else { |
| scanner = new ConcatResultIterator(iterators); |
| if (offset != null) { |
| scanner = new OffsetResultIterator(scanner, offset); |
| } |
| if (limit != null) { |
| scanner = new LimitingResultIterator(scanner, limit); |
| } |
| } |
| return scanner; |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| List<String> steps = new ArrayList<String>(); |
| steps.add("UNION ALL OVER " + this.plans.size() + " QUERIES"); |
| ResultIterator iterator = iterator(); |
| iterator.explain(steps); |
| // Indent plans steps nested under union, except last client-side merge/concat step (if there is one) |
| int offset = !orderBy.getOrderByExpressions().isEmpty() && limit != null ? 2 : limit != null ? 1 : 0; |
| for (int i = 1 ; i < steps.size()-offset; i++) { |
| steps.set(i, " " + steps.get(i)); |
| } |
| return new ExplainPlan(steps); |
| } |
| |
| |
| @Override |
| public long getEstimatedSize() { |
| return DEFAULT_ESTIMATED_SIZE; |
| } |
| |
| @Override |
| public Cost getCost() { |
| Cost cost = Cost.ZERO; |
| for (QueryPlan plan : plans) { |
| cost = cost.plus(plan.getCost()); |
| } |
| return cost; |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return paramMetaData; |
| } |
| |
| @Override |
| public FilterableStatement getStatement() { |
| return statement; |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return parentContext; |
| } |
| |
| @Override |
| public boolean isRowKeyOrdered() { |
| return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving(); |
| } |
| |
| public List<QueryPlan> getPlans() { |
| return this.plans; |
| } |
| |
| @Override |
| public boolean useRoundRobinIterator() throws SQLException { |
| return false; |
| } |
| |
| @Override |
| public <T> T accept(QueryPlanVisitor<T> visitor) { |
| return visitor.visit(this); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return statement.getOperation(); |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| // TODO is this correct? |
| Set<TableRef> sources = Sets.newHashSetWithExpectedSize(plans.size()); |
| for (QueryPlan plan : plans) { |
| sources.addAll(plan.getSourceRefs()); |
| } |
| return sources; |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| if (!getEstimatesCalled) { |
| getEstimates(); |
| } |
| return estimatedRows; |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| if (!getEstimatesCalled) { |
| getEstimates(); |
| } |
| return estimatedBytes; |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| if (!getEstimatesCalled) { |
| getEstimates(); |
| } |
| return estimateInfoTs; |
| } |
| |
| private void getEstimates() throws SQLException { |
| getEstimatesCalled = true; |
| for (QueryPlan plan : plans) { |
| if (plan.getEstimatedBytesToScan() == null || plan.getEstimatedRowsToScan() == null |
| || plan.getEstimateInfoTimestamp() == null) { |
| /* |
| * If any of the sub plans doesn't have the estimate info available, then we don't |
| * provide estimate for the overall plan |
| */ |
| estimatedBytes = null; |
| estimatedRows = null; |
| estimateInfoTs = null; |
| break; |
| } else { |
| estimatedBytes = add(estimatedBytes, plan.getEstimatedBytesToScan()); |
| estimatedRows = add(estimatedRows, plan.getEstimatedRowsToScan()); |
| estimateInfoTs = getMin(estimateInfoTs, plan.getEstimateInfoTimestamp()); |
| } |
| } |
| } |
| |
| @Override |
| public List<OrderBy> getOutputOrderBys() { |
| assert this.groupBy == GroupBy.EMPTY_GROUP_BY; |
| assert this.orderBy != OrderBy.FWD_ROW_KEY_ORDER_BY && this.orderBy != OrderBy.REV_ROW_KEY_ORDER_BY; |
| if(!this.orderBy.isEmpty()) { |
| return Collections.<OrderBy> singletonList( |
| OrderBy.convertCompiledOrderByToOutputOrderBy(this.orderBy)); |
| } |
| return Collections.<OrderBy> emptyList(); |
| } |
| } |