blob: ee8a84079a462ce61a495532570706ef59bec554 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
import static org.apache.phoenix.util.ScanUtil.hasDynamicColumns;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PrefixByteCodec;
import org.apache.phoenix.util.PrefixByteDecoder;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
*
* Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with
* the results accessible through {@link #getIterators()}
*
*
* @since 0.1
*/
public abstract class BaseResultIterators extends ExplainTable implements ResultIterators {
private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12");
private final List<List<Scan>> scans;
private final List<KeyRange> splits;
private final byte[] physicalTableName;
protected final QueryPlan plan;
protected final String scanId;
protected final MutationState mutationState;
protected final ParallelScanGrouper scanGrouper;
// TODO: too much nesting here - breakup into new classes.
private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
private Long estimatedRows;
private Long estimatedSize;
private boolean hasGuidePosts;
private Scan scan;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
public KeyRange apply(HRegionLocation region) {
return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
}
};
private PTable getTable() {
return plan.getTableRef().getTable();
}
protected boolean useStats() {
/*
* Don't use guide posts:
* 1) If we're collecting stats, as in this case we need to scan entire
* regions worth of data to track where to put the guide posts.
* 2) If the query is going to be executed serially.
*/
if (ScanUtil.isAnalyzeTable(scan)) {
return false;
}
return true;
}
private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException {
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
// Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys
if (context.getConnection().isDescVarLengthRowKeyUpgrade()) {
// We project *all* KeyValues across all column families as we make a pass over
// a physical table and we want to make sure we catch all KeyValues that may be
// dynamic or part of an updatable view.
familyMap.clear();
scan.setMaxVersions();
scan.setFilter(null); // Remove any filter
scan.setRaw(true); // Traverse (and subsequently clone) all KeyValues
// Pass over PTable so we can re-write rows according to the row key schema
scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, UngroupedAggregateRegionObserver.serialize(table));
} else {
FilterableStatement statement = plan.getStatement();
RowProjector projector = plan.getProjector();
boolean optimizeProjection = false;
boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereConditionColumns().isEmpty();
if (!projector.projectEverything()) {
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
// Where condition columns also will get added into familyMap
// When where conditions are present, we cannot add FirstKeyOnlyFilter at beginning.
// FIXME: we only enter this if the number of column families is 1 because otherwise
// local indexes break because it appears that the column families in the PTable do
// not match the actual column families of the table (which is bad).
if (keyOnlyFilter && table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
} else {
optimizeProjection = true;
if (projector.projectEveryRow()) {
if (table.getViewType() == ViewType.MAPPED) {
// Since we don't have the empty key value in MAPPED tables,
// we must project all CFs in HRS. However, only the
// selected column values are returned back to client.
context.getWhereConditionColumns().clear();
for (PColumnFamily family : table.getColumnFamilies()) {
context.addWhereConditionColumn(family.getName().getBytes(), null);
}
} else {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
}
}
}
}
}
// Add FirstKeyOnlyFilter if there are no references to key value columns
if (keyOnlyFilter) {
ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
}
if (perScanLimit != null) {
ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
}
if(offset!=null){
ScanUtil.addOffsetAttribute(scan, offset);
}
int cols = plan.getGroupBy().getOrderPreservingColumnCount();
if (cols > 0 && keyOnlyFilter &&
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
cols < plan.getTableRef().getTable().getRowKeySchema().getFieldCount() &&
plan.getGroupBy().isOrderPreserving() &&
(context.getAggregationManager().isEmpty() || plan.getGroupBy().isUngroupedAggregate())) {
ScanUtil.andFilterAtEnd(scan,
new DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(),
cols));
if (plan.getLimit() != null) { // We can push the limit to the server
ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit()));
}
}
scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new byte[]{table.getEncodingScheme().getSerializedMetadataValue()});
scan.setAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME, new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()});
// we use this flag on the server side to determine which value column qualifier to use in the key value we return from server.
scan.setAttribute(BaseScannerRegionObserver.USE_NEW_VALUE_COLUMN_QUALIFIER, Bytes.toBytes(true));
// When analyzing the table, there is no look up for key values being done.
// So there is no point setting the range.
if (!ScanUtil.isAnalyzeTable(scan)) {
setQualifierRanges(keyOnlyFilter, table, scan, context);
}
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
}
}
private static void setQualifierRanges(boolean keyOnlyFilter, PTable table, Scan scan,
StatementContext context) throws SQLException {
if (EncodedColumnsUtil.useEncodedQualifierListOptimization(table)) {
Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] cq = whereCol.getSecond();
if (cq != null) {
int qualifier = table.getEncodingScheme().decode(cq);
adjustQualifierRange(qualifier, minMaxQualifiers);
}
}
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
if (entry.getValue() != null) {
for (byte[] cq : entry.getValue()) {
if (cq != null) {
int qualifier = table.getEncodingScheme().decode(cq);
adjustQualifierRange(qualifier, minMaxQualifiers);
}
}
} else {
byte[] cf = entry.getKey();
String family = Bytes.toString(cf);
if (table.getType() == INDEX && table.getIndexType() == LOCAL
&& !IndexUtil.isLocalIndexFamily(family)) {
// TODO: samarth confirm with James why do we need this hack here :(
family = IndexUtil.getLocalIndexColumnFamily(family);
}
byte[] familyBytes = Bytes.toBytes(family);
NavigableSet<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
if (Bytes.equals(familyBytes, SchemaUtil.getEmptyColumnFamily(table))) {
// If the column family is also the empty column family, project the
// empty key value column
Pair<byte[], byte[]> emptyKeyValueInfo =
EncodedColumnsUtil.getEmptyKeyValueInfo(table);
qualifierSet.add(emptyKeyValueInfo.getFirst());
}
// In case of a keyOnlyFilter, we only need to project the
// empty key value column
if (!keyOnlyFilter) {
Pair<Integer, Integer> qualifierRangeForFamily =
EncodedColumnsUtil.setQualifiersForColumnsInFamily(table, family,
qualifierSet);
familyMap.put(familyBytes, qualifierSet);
if (qualifierRangeForFamily != null) {
adjustQualifierRange(qualifierRangeForFamily.getFirst(),
minMaxQualifiers);
adjustQualifierRange(qualifierRangeForFamily.getSecond(),
minMaxQualifiers);
}
}
}
}
if (minMaxQualifiers.getFirst() != null) {
scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER,
Bytes.toBytes(minMaxQualifiers.getFirst()));
scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER,
Bytes.toBytes(minMaxQualifiers.getSecond()));
ScanUtil.setQualifierRangesOnFilter(scan, minMaxQualifiers);
}
}
}
private static void adjustQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
if (minMaxQualifiers.getFirst() == null) {
minMaxQualifiers.setFirst(qualifier);
minMaxQualifiers.setSecond(qualifier);
} else {
if (minMaxQualifiers.getFirst() > qualifier) {
minMaxQualifiers.setFirst(qualifier);
} else if (minMaxQualifiers.getSecond() < qualifier) {
minMaxQualifiers.setSecond(qualifier);
}
}
}
private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
// columnsTracker contain cf -> qualifiers which should get returned.
Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
int referencedCfCount = familyMap.size();
QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
boolean filteredColumnNotInProjection = false;
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] filteredFamily = whereCol.getFirst();
if (!(familyMap.containsKey(filteredFamily))) {
referencedCfCount++;
filteredColumnNotInProjection = true;
} else if (!filteredColumnNotInProjection) {
NavigableSet<byte[]> projectedColumns = familyMap.get(filteredFamily);
if (projectedColumns != null) {
byte[] filteredColumn = whereCol.getSecond();
if (filteredColumn == null) {
filteredColumnNotInProjection = true;
} else {
filteredColumnNotInProjection = !projectedColumns.contains(filteredColumn);
}
}
}
}
boolean preventSeekToColumn = false;
if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
// Allow seeking to column during filtering
preventSeekToColumn = false;
} else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table)) {
/*
* preventSeekToColumn cannot be true, even if hinted, when encoded qualifier list
* optimization is being used. When using the optimization, it is necessary that we
* explicitly set the column qualifiers of the column family in the scan and not just
* project the entire column family.
*/
if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
// Prevent seeking to column during filtering
preventSeekToColumn = true;
} else {
int hbaseServerVersion = context.getConnection().getQueryServices().getLowestClusterHBaseVersion();
// When only a single column family is referenced, there are no hints, and HBase server version
// is less than when the fix for HBASE-13109 went in (0.98.12), then we prevent seeking to a
// column.
preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
}
}
for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
NavigableSet<byte[]> qs = entry.getValue();
NavigableSet<ImmutableBytesPtr> cols = null;
if (qs != null) {
cols = new TreeSet<ImmutableBytesPtr>();
for (byte[] q : qs) {
cols.add(new ImmutableBytesPtr(q));
if (trackedColumnsBitset != null) {
int qualifier = encodingScheme.decode(q);
trackedColumnsBitset.set(qualifier);
}
}
}
columnsTracker.put(cf, cols);
}
// Making sure that where condition CFs are getting scanned at HRS.
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] family = whereCol.getFirst();
if (preventSeekToColumn) {
if (!(familyMap.containsKey(family))) {
conditionOnlyCfs.add(family);
}
scan.addFamily(family);
} else {
if (familyMap.containsKey(family)) {
// where column's CF is present. If there are some specific columns added against this CF, we
// need to ensure this where column also getting added in it.
// If the select was like select cf1.*, then that itself will select the whole CF. So no need to
// specifically add the where column. Adding that will remove the cf1.* stuff and only this
// where condition column will get returned!
NavigableSet<byte[]> cols = familyMap.get(family);
// cols is null means the whole CF will get scanned.
if (cols != null) {
if (whereCol.getSecond() == null) {
scan.addFamily(family);
} else {
scan.addColumn(family, whereCol.getSecond());
}
}
} else if (whereCol.getSecond() == null) {
scan.addFamily(family);
} else {
// where column's CF itself is not present in family map. We need to add the column
scan.addColumn(family, whereCol.getSecond());
}
}
}
if (!columnsTracker.isEmpty()) {
if (preventSeekToColumn) {
for (ImmutableBytesPtr f : columnsTracker.keySet()) {
// This addFamily will remove explicit cols in scan familyMap and make it as entire row.
// We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
scan.addFamily(f.get());
}
}
// We don't need this filter for aggregates, as we're not returning back what's
// in the scan in this case. We still want the other optimization that causes
// the ExplicitColumnTracker not to be used, though.
if (!statement.isAggregate() && filteredColumnNotInProjection) {
ScanUtil.andFilterAtEnd(scan,
trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme())));
}
}
}
public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(),
plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset);
this.plan = plan;
this.scan = scan;
this.scanGrouper = scanGrouper;
StatementContext context = plan.getContext();
// Clone MutationState as the one on the connection will change if auto commit is on
// yet we need the original one with the original transaction from TableResultIterator.
this.mutationState = new MutationState(context.getConnection().getMutationState());
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
physicalTableName = table.getPhysicalName().getBytes();
Long currentSCN = context.getConnection().getSCN();
if (null == currentSCN) {
currentSCN = HConstants.LATEST_TIMESTAMP;
}
// Used to tie all the scans together during logging
scanId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString();
initializeScan(plan, perScanLimit, offset, scan);
this.scans = getParallelScans();
List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
for (List<Scan> scanList : scans) {
for (Scan aScan : scanList) {
splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
}
}
this.splits = ImmutableList.copyOf(splitRanges);
// If split detected, this will be more than one, but that's unlikely
this.allFutures = Lists.newArrayListWithExpectedSize(1);
}
@Override
public List<KeyRange> getSplits() {
if (splits == null)
return Collections.emptyList();
else
return splits;
}
@Override
public List<List<Scan>> getScans() {
if (scans == null)
return Collections.emptyList();
else
return scans;
}
private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
int nBoundaries = regionLocations.size() - 1;
List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
for (int i = 0; i < nBoundaries; i++) {
HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
ranges.add(regionInfo.getEndKey());
}
return ranges;
}
private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
// If we found an exact match, return the index+1, as the inclusiveKey will be contained
// in the next region (since we're matching on the end boundary).
guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
return guideIndex;
}
private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
// If we found an exact match, return the index we found as the exclusiveKey won't be
// contained in the next region as with getIndexContainingInclusive.
guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
return guideIndex;
}
private GuidePostsInfo getGuidePosts() throws SQLException {
if (!useStats() || !StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))) {
return GuidePostsInfo.NO_GUIDEPOST;
}
TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
byte[] cf = where.getFirst();
if (cf != null) {
whereConditions.add(cf);
}
}
PTable table = getTable();
byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
byte[] cf = null;
if ( !table.getColumnFamilies().isEmpty() && !whereConditions.isEmpty() ) {
for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
byte[] whereCF = where.getFirst();
if (Bytes.compareTo(defaultCF, whereCF) == 0) {
cf = defaultCF;
break;
}
}
if (cf == null) {
cf = context.getWhereConditionColumns().get(0).getFirst();
}
}
if (cf == null) {
cf = defaultCF;
}
GuidePostsKey key = new GuidePostsKey(physicalTableName, cf);
return context.getConnection().getQueryServices().getTableStats(key);
}
private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) {
boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
if (scan != null) {
scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes());
scans.add(scan);
}
if (startNewScan && !scans.isEmpty()) {
parallelScans.add(scans);
scans = Lists.newArrayListWithExpectedSize(1);
}
return scans;
}
private List<List<Scan>> getParallelScans() throws SQLException {
// If the scan boundaries are not matching with scan in context that means we need to get
// parallel scans for the chunk after split/merge.
if (!ScanUtil.isContextScan(scan, context)) {
return getParallelScans(scan);
}
return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
}
/**
* Get parallel scans of the specified scan boundaries. This can be used for getting parallel
* scans when there is split/merges while scanning a chunk. In this case we need not go by all
* the regions or guideposts.
* @param scan
* @return
* @throws SQLException
*/
private List<List<Scan>> getParallelScans(Scan scan) throws SQLException {
List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
.getAllTableRegions(physicalTableName);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
int regionIndex = 0;
int stopIndex = regionBoundaries.size();
if (scan.getStartRow().length > 0) {
regionIndex = getIndexContainingInclusive(regionBoundaries, scan.getStartRow());
}
if (scan.getStopRow().length > 0) {
stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow()));
}
List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
HRegionInfo regionInfo = regionLocation.getRegionInfo();
Scan newScan = ScanUtil.newScan(scan);
byte[] endKey;
if (regionIndex == stopIndex) {
endKey = scan.getStopRow();
} else {
endKey = regionBoundaries.get(regionIndex);
}
if(ScanUtil.isLocalIndex(scan)) {
ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX),
newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
} else {
if(Bytes.compareTo(scan.getStartRow(), regionInfo.getStartKey())<=0) {
newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionInfo.getStartKey());
newScan.setStartRow(regionInfo.getStartKey());
}
if(scan.getStopRow().length == 0 || (regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), regionInfo.getEndKey())>0)) {
newScan.setStopRow(regionInfo.getEndKey());
}
}
scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
regionIndex++;
}
if (!scans.isEmpty()) { // Add any remaining scans
parallelScans.add(scans);
}
return parallelScans;
}
/**
* Compute the list of parallel scans to run for a given query. The inner scans
* may be concatenated together directly, while the other ones may need to be
* merge sorted, depending on the query.
* @return list of parallel scans to run for a given query.
* @throws SQLException
*/
private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
.getAllTableRegions(physicalTableName);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
ScanRanges scanRanges = context.getScanRanges();
PTable table = getTable();
boolean isSalted = table.getBucketNum() != null;
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
GuidePostsInfo gps = getGuidePosts();
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
boolean traverseAllRegions = isSalted || isLocalIndex;
if (!traverseAllRegions) {
byte[] scanStartRow = scan.getStartRow();
if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
startKey = scanStartRow;
}
byte[] scanStopRow = scan.getStopRow();
if (stopKey.length == 0
|| (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
stopKey = scanStopRow;
}
}
int regionIndex = 0;
int stopIndex = regionBoundaries.size();
if (startKey.length > 0) {
regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
}
if (stopKey.length > 0) {
stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
if (isLocalIndex) {
stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
}
}
List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
int gpsSize = gps.getGuidePostsCount();
int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
int keyOffset = 0;
ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
ImmutableBytesWritable guidePosts = gps.getGuidePosts();
ByteArrayInputStream stream = null;
DataInput input = null;
PrefixByteDecoder decoder = null;
int guideIndex = 0;
long estimatedRows = 0;
long estimatedSize = 0;
try {
if (gpsSize > 0) {
stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
input = new DataInputStream(stream);
decoder = new PrefixByteDecoder(gps.getMaxLength());
try {
while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
&& currentKey.getLength() != 0) {
guideIndex++;
}
} catch (EOFException e) {}
}
byte[] currentKeyBytes = currentKey.copyBytes();
// Merge bisect with guideposts for all but the last region
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
HRegionInfo regionInfo = regionLocation.getRegionInfo();
byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
if (regionIndex == stopIndex) {
endKey = stopKey;
} else {
endKey = regionBoundaries.get(regionIndex);
}
if (isLocalIndex) {
endRegionKey = regionInfo.getEndKey();
keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
}
try {
while (guideIndex < gpsSize && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
false);
if (newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
estimatedRows += gps.getRowCounts()[guideIndex];
estimatedSize += gps.getByteCounts()[guideIndex];
}
scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
currentKeyBytes = currentGuidePostBytes;
currentGuidePost = PrefixByteCodec.decode(decoder, input);
currentGuidePostBytes = currentGuidePost.copyBytes();
guideIndex++;
}
} catch (EOFException e) {}
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
if(newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
}
scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
currentKeyBytes = endKey;
regionIndex++;
}
if (scanRanges.isPointLookup()) {
this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount());
this.estimatedSize = this.estimatedRows * SchemaUtil.estimateRowSize(table);
} else if (hasGuidePosts) {
this.estimatedRows = estimatedRows;
this.estimatedSize = estimatedSize;
} else {
this.estimatedRows = null;
this.estimatedSize = null;
}
if (!scans.isEmpty()) { // Add any remaining scans
parallelScans.add(scans);
}
} finally {
if (stream != null) Closeables.closeQuietly(stream);
}
return parallelScans;
}
public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
if (!reverse) {
return list;
}
return Lists.reverse(list);
}
/**
* Executes the scan in parallel across all regions, blocking until all scans are complete.
* @return the result iterators for the scan of each region
*/
@Override
public List<PeekingResultIterator> getIterators() throws SQLException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " + this,
ScanUtil.getCustomAnnotations(scan)));
}
boolean isReverse = ScanUtil.isReversed(scan);
boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
final ConnectionQueryServices services = context.getConnection().getQueryServices();
// Get query time out from Statement
final long startTime = System.currentTimeMillis();
final long maxQueryEndTime = startTime + context.getStatement().getQueryTimeoutInMillis();
int numScans = size();
// Capture all iterators so that if something goes wrong, we close them all
// The iterators list is based on the submission of work, so it may not
// contain them all (for example if work was rejected from the queue)
Queue<PeekingResultIterator> allIterators = new ConcurrentLinkedQueue<>();
List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
ScanWrapper previousScan = new ScanWrapper(null);
return getIterators(scans, services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime,
splits.size(), previousScan);
}
class ScanWrapper {
Scan scan;
public Scan getScan() {
return scan;
}
public void setScan(Scan scan) {
this.scan = scan;
}
public ScanWrapper(Scan scan) {
this.scan = scan;
}
}
private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services,
boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators,
boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan) throws SQLException {
boolean success = false;
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize);
allFutures.add(futures);
SQLException toThrow = null;
int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
try {
submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr = reverseIfNecessary(future,isReverse).iterator();
while (scanPairItr.hasNext()) {
Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next();
try {
long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
if (timeOutForScan < 0) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
}
if (isLocalIndex && previousScan != null && previousScan.getScan() != null
&& (((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) < 0)
|| (isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) > 0)
|| (Bytes.compareTo(scanPair.getFirst().getStopRow(), previousScan.getScan().getStopRow()) == 0))
&& Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
continue;
}
PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
concatIterators.add(iterator);
previousScan.setScan(scanPair.getFirst());
} catch (ExecutionException e) {
try { // Rethrow as SQLException
throw ServerUtil.parseServerException(e);
} catch (StaleRegionBoundaryCacheException e2) {
/*
* Note that a StaleRegionBoundaryCacheException could be thrown in multiple scenarios including splits, region
* moves, table disabled, etc. See ServerUtil.parseServerException() for details.
* Because of HBASE-17122 we need to explicitly check whether this exception is being
* thrown because the table was disabled or because a split happened. This obviously is a HACK.
* With older versions of HBase we were correctly thrown a TableNotEnabledException so this
* kind of hackery wasn't needed.
* TODO: remove this once HBASE-17122 is fixed.
*/
try (HBaseAdmin admin = context.getConnection().getQueryServices().getAdmin()) {
if (admin.isTableDisabled(physicalTableName)) {
throw new TableNotEnabledException(physicalTableName);
}
}
scanPairItr.remove();
// Catch only to try to recover from region boundary cache being out of date
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(physicalTableName);
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
}
// Resubmit just this portion of work again
Scan oldScan = scanPair.getFirst();
byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
byte[] endKey = oldScan.getStopRow();
List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
// Add any concatIterators that were successful so far
// as we need these to be in order
addIterator(iterators, concatIterators);
concatIterators = Lists.newArrayList();
getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
maxQueryEndTime, newNestedScans.size(), previousScan);
}
}
}
addIterator(iterators, concatIterators);
}
success = true;
return iterators;
} catch (TimeoutException e) {
context.getOverallQueryMetrics().queryTimedOut();
GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
// thrown when a thread times out waiting for the future.get() call to return
toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
.setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
.setRootCause(e).build().buildException();
} catch (SQLException e) {
toThrow = e;
} catch (Exception e) {
toThrow = ServerUtil.parseServerException(e);
} finally {
try {
if (!success) {
try {
close();
} catch (Exception e) {
if (toThrow == null) {
toThrow = ServerUtil.parseServerException(e);
} else {
toThrow.setNextException(ServerUtil.parseServerException(e));
}
} finally {
try {
SQLCloseables.closeAll(allIterators);
} catch (Exception e) {
if (toThrow == null) {
toThrow = ServerUtil.parseServerException(e);
} else {
toThrow.setNextException(ServerUtil.parseServerException(e));
}
}
}
}
} finally {
if (toThrow != null) {
GLOBAL_FAILED_QUERY_COUNTER.increment();
context.getOverallQueryMetrics().queryFailed();
throw toThrow;
}
}
}
return null; // Not reachable
}
@Override
public void close() throws SQLException {
if (allFutures.isEmpty()) {
return;
}
// Don't call cancel on already started work, as it causes the HConnection
// to get into a funk. Instead, just cancel queued work.
boolean cancelledWork = false;
try {
List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size());
for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
// When work is rejected, we may have null futurePair entries, because
// we randomize these and set them as they're submitted.
if (futurePair != null) {
Future<PeekingResultIterator> future = futurePair.getSecond();
if (future != null) {
if (future.cancel(false)) {
cancelledWork = true;
} else {
futuresToClose.add(future);
}
}
}
}
}
}
// Wait for already started tasks to complete as we can't interrupt them without
// leaving our HConnection in a funky state.
for (Future<PeekingResultIterator> future : futuresToClose) {
try {
PeekingResultIterator iterator = future.get();
iterator.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
logger.info("Failed to execute task during cancel", e);
continue;
}
}
} finally {
if (cancelledWork) {
context.getConnection().getQueryServices().getExecutor().purge();
}
allFutures.clear();
}
}
private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) throws SQLException {
if (!childIterators.isEmpty()) {
if (plan.useRoundRobinIterator()) {
/*
* When using a round robin iterator we shouldn't concatenate the iterators together. This is because a
* round robin iterator should be calling next() on these iterators directly after selecting them in a
* round robin fashion. This helps take advantage of loading the underlying scanners' caches in parallel
* as well as preventing errors arising out of scanner lease expirations.
*/
parentIterators.addAll(childIterators);
} else {
parentIterators.add(ConcatResultIterator.newIterator(childIterators));
}
}
}
protected static final class ScanLocator {
private final int outerListIndex;
private final int innerListIndex;
private final Scan scan;
private final boolean isFirstScan;
private final boolean isLastScan;
public ScanLocator(Scan scan, int outerListIndex, int innerListIndex, boolean isFirstScan, boolean isLastScan) {
this.outerListIndex = outerListIndex;
this.innerListIndex = innerListIndex;
this.scan = scan;
this.isFirstScan = isFirstScan;
this.isLastScan = isLastScan;
}
public int getOuterListIndex() {
return outerListIndex;
}
public int getInnerListIndex() {
return innerListIndex;
}
public Scan getScan() {
return scan;
}
public boolean isFirstScan() {
return isFirstScan;
}
public boolean isLastScan() {
return isLastScan;
}
}
abstract protected String getName();
abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException;
@Override
public int size() {
return this.scans.size();
}
@Override
public void explain(List<String> planSteps) {
boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
StringBuilder buf = new StringBuilder();
buf.append("CLIENT ");
if (displayChunkCount) {
boolean displayRowCount = context.getConnection().getQueryServices().getProps().getBoolean(
QueryServices.EXPLAIN_ROW_COUNT_ATTRIB,
QueryServicesOptions.DEFAULT_EXPLAIN_ROW_COUNT);
buf.append(this.splits.size()).append("-CHUNK ");
if (displayRowCount && estimatedRows != null) {
buf.append(estimatedRows).append(" ROWS ");
buf.append(estimatedSize).append(" BYTES ");
}
}
buf.append(getName()).append(" ").append(size()).append("-WAY ");
try {
if (plan.useRoundRobinIterator()) {
buf.append("ROUND ROBIN ");
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
explain(buf.toString(),planSteps);
}
public Long getEstimatedRowCount() {
return this.estimatedRows;
}
public Long getEstimatedByteCount() {
return this.estimatedSize;
}
@Override
public String toString() {
return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]";
}
}