blob: a302f84e19258138baae1d7bd4228a73e7ddddbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.util.ScanUtil.getDummyResult;
import static org.apache.phoenix.util.ScanUtil.isDummy;
public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(UncoveredIndexRegionScanner.class);
/**
* The states of the processing a page of index rows
*/
protected enum State {
INITIAL, SCANNING_INDEX, SCANNING_DATA, SCANNING_DATA_INTERRUPTED, READY
}
protected State state = State.INITIAL;
protected final byte[][] viewConstants;
protected final RegionCoprocessorEnvironment env;
protected long pageSizeInRows;
protected final long ageThreshold;
protected byte[] emptyCF;
protected byte[] emptyCQ;
protected final Scan scan;
protected final Scan dataTableScan;
protected final RegionScanner innerScanner;
protected final Region region;
protected final IndexMaintainer indexMaintainer;
protected final TupleProjector tupleProjector;
protected final ImmutableBytesWritable ptr;
protected List<List<Cell>> indexRows = null;
protected Map<ImmutableBytesPtr, Result> dataRows = null;
protected Iterator<List<Cell>> indexRowIterator = null;
protected Map<byte[], byte[]> indexToDataRowKeyMap = null;
protected int indexRowCount = 0;
protected final long pageSizeMs;
protected byte[] lastIndexRowKey = null;
public UncoveredIndexRegionScanner(final RegionScanner innerScanner,
final Region region,
final Scan scan,
final RegionCoprocessorEnvironment env,
final Scan dataTableScan,
final TupleProjector tupleProjector,
final IndexMaintainer indexMaintainer,
final byte[][] viewConstants,
final ImmutableBytesWritable ptr,
final long pageSizeMs,
final long queryLimit) {
super(innerScanner);
final Configuration config = env.getConfiguration();
byte[] pageSizeFromScan =
scan.getAttribute(BaseScannerRegionObserver.INDEX_PAGE_ROWS);
if (pageSizeFromScan != null) {
pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
} else {
pageSizeInRows = (int)
config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
}
if (queryLimit != -1) {
pageSizeInRows = Long.min(pageSizeInRows, queryLimit);
}
ageThreshold = env.getConfiguration().getLong(
QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS);
emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
this.indexMaintainer = indexMaintainer;
this.viewConstants = viewConstants;
this.scan = scan;
this.dataTableScan = dataTableScan;
this.innerScanner = innerScanner;
this.region = region;
this.env = env;
this.ptr = ptr;
this.tupleProjector = tupleProjector;
this.pageSizeMs = pageSizeMs;
}
@Override
public long getMvccReadPoint() {
return innerScanner.getMvccReadPoint();
}
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
public boolean isFilterDone() {
return false;
}
@Override
public void close() throws IOException {
innerScanner.close();
}
@Override
public long getMaxResultSize() {
return innerScanner.getMaxResultSize();
}
@Override
public int getBatch() {
return innerScanner.getBatch();
}
protected abstract void scanDataTableRows(long startTime) throws IOException;
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
return prepareDataTableScan(dataRowKeys, false);
}
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys,
boolean includeMultipleVersions) throws IOException {
List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
for (byte[] dataRowKey : dataRowKeys) {
// If the data table scan was interrupted because of paging we retry the scan
// but on retry we should only fetch data table rows which we haven't already
// fetched.
if (!dataRows.containsKey(new ImmutableBytesPtr(dataRowKey))) {
keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC));
}
}
if (!keys.isEmpty()) {
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan dataScan = new Scan(dataTableScan);
dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
scanRanges.initializeScan(dataScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
dataScan.setFilter(new SkipScanFilter(skipScanFilter, includeMultipleVersions));
dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
Bytes.toBytes(Long.valueOf(pageSizeMs)));
return dataScan;
} else {
LOGGER.info("All data rows have already been fetched");
return null;
}
}
protected boolean scanIndexTableRows(List<Cell> result,
final long startTime,
final byte[] actualStartKey,
final int offset) throws IOException {
boolean hasMore = false;
if (actualStartKey != null) {
do {
hasMore = innerScanner.nextRaw(result);
if (result.isEmpty()) {
return hasMore;
}
if (ScanUtil.isDummy(result)) {
return true;
}
Cell firstCell = result.get(0);
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) {
result.clear();
if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
byte[] rowKey = CellUtil.cloneRow(firstCell);
ScanUtil.getDummyResult(rowKey, result);
return true;
}
} else {
break;
}
} while (hasMore);
}
do {
List<Cell> row = new ArrayList<Cell>();
if (result.isEmpty()) {
hasMore = innerScanner.nextRaw(row);
} else {
row.addAll(result);
result.clear();
}
if (!row.isEmpty()) {
if (isDummy(row)) {
result.addAll(row);
// We got a dummy request from lower layers. This means that
// the scan took more than pageSizeMs. Just return true here.
// The client will drop this dummy request and continue to scan.
// Then the lower layer scanner will continue
// wherever it stopped due to this dummy request
return true;
}
Cell firstCell = row.get(0);
byte[] indexRowKey = firstCell.getRowArray();
// Avoid unnecessary byte copy and garbage when the row key is what we need.
if (firstCell.getRowOffset() + offset == 0 && firstCell.getRowLength() - offset == indexRowKey.length) {
lastIndexRowKey = indexRowKey;
}
else {
ptr.set(indexRowKey, firstCell.getRowOffset() + offset,
firstCell.getRowLength() - offset);
lastIndexRowKey = ptr.copyBytes();
}
indexToDataRowKeyMap.put(offset == 0 ? lastIndexRowKey :
CellUtil.cloneRow(firstCell), indexMaintainer.buildDataRowKey(
new ImmutableBytesWritable(lastIndexRowKey),
viewConstants));
indexRows.add(row);
indexRowCount++;
if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime)
>= pageSizeMs) {
getDummyResult(lastIndexRowKey, result);
// We do not need to change the state, State.SCANNING_INDEX
// since we will continue scanning the index table after
// the client drops the dummy request and then calls the next
// method on its ResultScanner within ScanningResultIterator
return true;
}
}
} while (hasMore && indexRowCount < pageSizeInRows);
return hasMore;
}
protected boolean scanIndexTableRows(List<Cell> result,
final long startTime) throws IOException {
return scanIndexTableRows(result, startTime, null, 0);
}
private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey,
long indexTimestamp)
throws IOException {
Put put = new Put(dataRow.getRow());
for (Cell cell : dataRow.rawCells()) {
put.add(cell);
}
if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
if (IndexUtil.getMaxTimestamp(put) != indexTimestamp) {
Mutation[] mutations;
Put indexPut = new Put(indexRowKey);
indexPut.addColumn(emptyCF, emptyCQ, indexTimestamp, QueryConstants.VERIFIED_BYTES);
if ((EnvironmentEdgeManager.currentTimeMillis() - indexTimestamp) > ageThreshold) {
Delete indexDelete = indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.SINGLE_VERSION, indexTimestamp);
mutations = new Mutation[]{indexPut, indexDelete};
} else {
mutations = new Mutation[]{indexPut};
}
region.batchMutate(mutations);
}
return true;
}
indexMaintainer.deleteRowIfAgedEnough(indexRowKey, IndexUtil.getMaxTimestamp(put),
ageThreshold, false, region);
return false;
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
result.addAll(indexRow);
try {
byte[] indexRowKey = CellUtil.cloneRow(indexRow.get(0));
Result dataRow = dataRows.get(new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey)));
if (dataRow != null) {
long ts = indexRow.get(0).getTimestamp();
if (!indexMaintainer.isUncovered()
|| verifyIndexRowAndRepairIfNecessary(dataRow, indexRowKey, ts)) {
if (tupleProjector != null) {
IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
tupleProjector, ptr);
}
} else {
result.clear();
}
} else {
if (indexMaintainer.isUncovered()) {
// Since we also scan the empty column for uncovered global indexes, this mean the data row
// does not exist. Delete the index row if the index is an uncovered global index
indexMaintainer.deleteRowIfAgedEnough(indexRowKey, indexRow.get(0).getTimestamp(),
ageThreshold, false, region);
result.clear();
} else {
// The data row satisfying the scan does not exist. This could be because
// the data row may not include the columns corresponding to the uncovered
// index columns either. Just return the index row. Nothing to do here
}
}
} catch (Throwable e) {
LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
throw e;
}
return true;
} else {
return false;
}
}
/**
* A page of index rows are scanned and then their corresponding data table rows are retrieved
* from the data table regions in parallel. These data rows are then joined with index rows.
* The join is for adding uncovered columns to index rows.
*
* This implementation conforms to server paging such that if the server side operation takes
* more than pageSizeInMs, a dummy result is returned to signal the client that more work
* to do on the server side. This is done to prevent RPC timeouts.
*
* @param result
* @return boolean to indicate if there are more rows to scan
* @throws IOException
*/
@Override
public boolean next(List<Cell> result) throws IOException {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
boolean hasMore;
region.startRegionOperation();
try {
synchronized (innerScanner) {
if (state == State.READY && !indexRowIterator.hasNext()) {
state = State.INITIAL;
}
if (state == State.INITIAL) {
indexRowCount = 0;
indexRows = new ArrayList<>();
dataRows = Maps.newConcurrentMap();
indexToDataRowKeyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
state = State.SCANNING_INDEX;
}
if (state == State.SCANNING_INDEX) {
hasMore = scanIndexTableRows(result, startTime);
if (isDummy(result)) {
return hasMore;
}
state = State.SCANNING_DATA;
}
if (state == State.SCANNING_DATA) {
scanDataTableRows(startTime);
indexRowIterator = indexRows.iterator();
}
if (state == State.READY) {
return getNextCoveredIndexRow(result);
} else {
getDummyResult(lastIndexRowKey, result);
return true;
}
}
} catch (Throwable e) {
LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
throw e;
} finally {
region.closeRegionOperation();
}
}
}