| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.execute; |
| |
| |
| import static org.apache.phoenix.util.ScanUtil.isPacingScannersPossible; |
| import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible; |
| |
| import java.sql.SQLException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.phoenix.cache.ServerCacheClient.ServerCache; |
| import org.apache.phoenix.compile.GroupByCompiler.GroupBy; |
| import org.apache.phoenix.compile.OrderByCompiler.OrderBy; |
| import org.apache.phoenix.compile.QueryPlan; |
| import org.apache.phoenix.compile.RowProjector; |
| import org.apache.phoenix.compile.StatementContext; |
| import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol; |
| import org.apache.phoenix.coprocessor.ScanRegionObserver; |
| import org.apache.phoenix.execute.visitor.ByteCountVisitor; |
| import org.apache.phoenix.execute.visitor.QueryPlanVisitor; |
| import org.apache.phoenix.expression.Expression; |
| import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; |
| import org.apache.phoenix.iterate.BaseResultIterators; |
| import org.apache.phoenix.iterate.ChunkedResultIterator; |
| import org.apache.phoenix.iterate.ConcatResultIterator; |
| import org.apache.phoenix.iterate.LimitingResultIterator; |
| import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator; |
| import org.apache.phoenix.iterate.MergeSortTopNResultIterator; |
| import org.apache.phoenix.iterate.OffsetResultIterator; |
| import org.apache.phoenix.iterate.ParallelIteratorFactory; |
| import org.apache.phoenix.iterate.ParallelIterators; |
| import org.apache.phoenix.iterate.ParallelScanGrouper; |
| import org.apache.phoenix.iterate.ResultIterator; |
| import org.apache.phoenix.iterate.RoundRobinResultIterator; |
| import org.apache.phoenix.iterate.SequenceResultIterator; |
| import org.apache.phoenix.iterate.SerialIterators; |
| import org.apache.phoenix.iterate.SpoolingResultIterator; |
| import org.apache.phoenix.optimize.Cost; |
| import org.apache.phoenix.parse.FilterableStatement; |
| import org.apache.phoenix.parse.HintNode; |
| import org.apache.phoenix.query.ConnectionQueryServices; |
| import org.apache.phoenix.query.KeyRange; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTable.IndexType; |
| import org.apache.phoenix.schema.SaltingUtil; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.stats.StatisticsUtil; |
| import org.apache.phoenix.util.CostUtil; |
| import org.apache.phoenix.util.ExpressionUtil; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.ScanUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| |
| /** |
| * |
| * Query plan for a basic table scan |
| * |
| * |
| * @since 0.1 |
| */ |
| public class ScanPlan extends BaseQueryPlan { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ScanPlan.class); |
| private List<KeyRange> splits; |
| private List<List<Scan>> scans; |
| private boolean allowPageFilter; |
| private boolean isSerial; |
| private boolean isDataToScanWithinThreshold; |
| private Long serialRowsEstimate; |
| private Long serialBytesEstimate; |
| private Long serialEstimateInfoTs; |
| private OrderBy actualOutputOrderBy; |
| |
| public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, |
| Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, |
| QueryPlan dataPlan) throws SQLException { |
| this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan); |
| } |
| |
| private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, |
| OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws SQLException { |
| super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY, |
| parallelIteratorFactory != null ? parallelIteratorFactory : |
| buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan); |
| this.allowPageFilter = allowPageFilter; |
| boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); |
| if (isOrdered) { // TopN |
| ScanRegionObserver.serializeIntoScan(context.getScan(), |
| limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset), |
| orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize()); |
| ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION); |
| } |
| Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; |
| perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset); |
| Pair<Long, Long> estimate = getEstimateOfDataSizeToScanIfWithinThreshold(context, table.getTable(), perScanLimit); |
| this.isDataToScanWithinThreshold = estimate != null; |
| this.isSerial = isSerial(context, statement, tableRef, orderBy, isDataToScanWithinThreshold); |
| if (isSerial) { |
| serialBytesEstimate = estimate.getFirst(); |
| serialRowsEstimate = estimate.getSecond(); |
| serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS; |
| } |
| this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, context); |
| } |
| |
| private static boolean isSerial(StatementContext context, FilterableStatement statement, |
| TableRef tableRef, OrderBy orderBy, boolean isDataWithinThreshold) throws SQLException { |
| if (isDataWithinThreshold) { |
| PTable table = tableRef.getTable(); |
| boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL); |
| boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context); |
| if (!canBeExecutedSerially) { |
| if (hasSerialHint) { |
| LOGGER.warn("This query cannot be executed serially. Ignoring the hint"); |
| } |
| return false; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * @return Pair of numbers in which the first part is estimated number of bytes that will be |
| * scanned and the second part is estimated number of rows. Returned value is null if |
| * estimated size of data to scan is beyond a threshold. |
| * @throws SQLException |
| */ |
| private static Pair<Long, Long> getEstimateOfDataSizeToScanIfWithinThreshold(StatementContext context, PTable table, Integer perScanLimit) throws SQLException { |
| Scan scan = context.getScan(); |
| ConnectionQueryServices services = context.getConnection().getQueryServices(); |
| long estRowSize = SchemaUtil.estimateRowSize(table); |
| long regionSize = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE, |
| HConstants.DEFAULT_MAX_FILE_SIZE); |
| if (perScanLimit == null || scan.getFilter() != null) { |
| /* |
| * If a limit is not provided or if we have a filter, then we are not able to decide whether |
| * the amount of data we need to scan is less than the threshold. |
| */ |
| return null; |
| } |
| float factor = |
| services.getProps().getFloat(QueryServices.LIMITED_QUERY_SERIAL_THRESHOLD, |
| QueryServicesOptions.DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD); |
| long threshold = (long)(factor * regionSize); |
| long estimatedBytes = perScanLimit * estRowSize; |
| long estimatedRows = perScanLimit; |
| return (perScanLimit * estRowSize < threshold) ? new Pair<>(estimatedBytes, estimatedRows) : null; |
| } |
| |
| @SuppressWarnings("deprecation") |
| private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement, |
| TableRef tableRef, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException { |
| |
| if ((isSerial(context, statement, tableRef, orderBy, getEstimateOfDataSizeToScanIfWithinThreshold(context, tableRef.getTable(), QueryUtil.getOffsetLimit(limit, offset)) != null) |
| || isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context))) { |
| return ParallelIteratorFactory.NOOP_FACTORY; |
| } |
| ParallelIteratorFactory spoolingResultIteratorFactory = |
| new SpoolingResultIterator.SpoolingResultIteratorFactory( |
| context.getConnection().getQueryServices()); |
| |
| // If we're doing an order by then we need the full result before we can do anything, |
| // so we don't bother chunking it. If we're just doing a simple scan then we chunk |
| // the scan to have a quicker initial response. |
| if (!orderBy.getOrderByExpressions().isEmpty()) { |
| return spoolingResultIteratorFactory; |
| } else { |
| return new ChunkedResultIterator.ChunkedResultIteratorFactory( |
| spoolingResultIteratorFactory, context.getConnection().getMutationState(), tableRef); |
| } |
| } |
| |
| @Override |
| public Cost getCost() { |
| Long byteCount = null; |
| try { |
| byteCount = getEstimatedBytesToScan(); |
| } catch (SQLException e) { |
| // ignored. |
| } |
| Double outputBytes = this.accept(new ByteCountVisitor()); |
| |
| if (byteCount == null || outputBytes == null) { |
| return Cost.UNKNOWN; |
| } |
| |
| int parallelLevel = CostUtil.estimateParallelLevel( |
| true, context.getConnection().getQueryServices()); |
| Cost cost = new Cost(0, 0, byteCount); |
| if (!orderBy.getOrderByExpressions().isEmpty()) { |
| Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel); |
| cost = cost.plus(orderByCost); |
| } |
| return cost; |
| } |
| |
| @Override |
| public List<KeyRange> getSplits() { |
| if (splits == null) |
| return Collections.emptyList(); |
| else |
| return splits; |
| } |
| |
| @Override |
| public List<List<Scan>> getScans() { |
| if (scans == null) |
| return Collections.emptyList(); |
| else |
| return scans; |
| } |
| |
| private static boolean isOffsetPossibleOnServer(StatementContext context, OrderBy orderBy, Integer offset, |
| boolean isSalted, IndexType indexType) { |
| return offset != null && orderBy.getOrderByExpressions().isEmpty() |
| && !((isSalted || indexType == IndexType.LOCAL) |
| && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)); |
| } |
| |
| @Override |
| protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { |
| // Set any scan attributes before creating the scanner, as it will be too late afterwards |
| scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); |
| ResultIterator scanner; |
| TableRef tableRef = this.getTableRef(); |
| PTable table = tableRef.getTable(); |
| boolean isSalted = table.getBucketNum() != null; |
| /* If no limit or topN, use parallel iterator so that we get results faster. Otherwise, if |
| * limit is provided, run query serially. |
| */ |
| boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); |
| Integer perScanLimit = !allowPageFilter || isOrdered ? null : QueryUtil.getOffsetLimit(limit, offset); |
| boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType()); |
| /* |
| * For queries that are doing a row key order by and are not possibly querying more than a |
| * threshold worth of data, then we only need to initialize scanners corresponding to the |
| * first (or last, if reverse) scan per region. |
| */ |
| boolean initFirstScanOnly = |
| (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) |
| && isDataToScanWithinThreshold; |
| BaseResultIterators iterators; |
| if (isOffsetOnServer) { |
| iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); |
| } else if (isSerial) { |
| iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); |
| } else { |
| iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches, dataPlan); |
| } |
| estimatedRows = iterators.getEstimatedRowCount(); |
| estimatedSize = iterators.getEstimatedByteCount(); |
| estimateInfoTimestamp = iterators.getEstimateInfoTimestamp(); |
| splits = iterators.getSplits(); |
| scans = iterators.getScans(); |
| if (isOffsetOnServer) { |
| scanner = new ConcatResultIterator(iterators); |
| if (limit != null) { |
| scanner = new LimitingResultIterator(scanner, limit); |
| } |
| } else if (isOrdered) { |
| scanner = new MergeSortTopNResultIterator(iterators, limit, offset, orderBy.getOrderByExpressions()); |
| } else { |
| if ((isSalted || table.getIndexType() == IndexType.LOCAL) && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)) { |
| /* |
| * For salted tables or local index, a merge sort is needed if: |
| * 1) The config phoenix.query.force.rowkeyorder is set to true |
| * 2) Or if the query has an order by that wants to sort |
| * the results by the row key (forward or reverse ordering) |
| */ |
| scanner = new MergeSortRowKeyResultIterator(iterators, isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY); |
| } else if (useRoundRobinIterator()) { |
| /* |
| * For any kind of tables, round robin is possible if there is |
| * no ordering of rows needed. |
| */ |
| scanner = new RoundRobinResultIterator(iterators, this); |
| } else { |
| scanner = new ConcatResultIterator(iterators); |
| } |
| if (offset != null) { |
| scanner = new OffsetResultIterator(scanner, offset); |
| } |
| if (limit != null) { |
| scanner = new LimitingResultIterator(scanner, limit); |
| } |
| } |
| |
| if (context.getSequenceManager().getSequenceCount() > 0) { |
| scanner = new SequenceResultIterator(scanner, context.getSequenceManager()); |
| } |
| return scanner; |
| } |
| |
| @Override |
| public boolean useRoundRobinIterator() throws SQLException { |
| return ScanUtil.isRoundRobinPossible(orderBy, context); |
| } |
| |
| @Override |
| public <T> T accept(QueryPlanVisitor<T> visitor) { |
| return visitor.visit(this); |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| if (isSerial) { |
| return serialRowsEstimate; |
| } |
| return super.getEstimatedRowsToScan(); |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| if (isSerial) { |
| return serialBytesEstimate; |
| } |
| return super.getEstimatedBytesToScan(); |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| if (isSerial) { |
| return serialEstimateInfoTs; |
| } |
| return super.getEstimateInfoTimestamp(); |
| } |
| |
| private static OrderBy convertActualOutputOrderBy(OrderBy orderBy, StatementContext statementContext) throws SQLException { |
| if(!orderBy.isEmpty()) { |
| return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy); |
| } |
| |
| if(!ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, statementContext)) { |
| return OrderBy.EMPTY_ORDER_BY; |
| } |
| |
| TableRef tableRef = statementContext.getResolver().getTables().get(0); |
| return ExpressionUtil.getOrderByFromTable( |
| tableRef, |
| statementContext.getConnection(), |
| orderBy == OrderBy.REV_ROW_KEY_ORDER_BY).getFirst(); |
| } |
| |
| @Override |
| public List<OrderBy> getOutputOrderBys() { |
| return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy); |
| } |
| } |