blob: a17258ac4e5904e0241128441cc5ababf5ab4aa9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for
* basic scan plans, to avoid loading large quantities of data from HBase in one go.
public class ChunkedResultIterator implements PeekingResultIterator {
private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
private final ParallelIteratorFactory delegateIteratorFactory;
private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
private final StatementContext context;
private final TableRef tableRef;
private final long chunkSize;
private final MutationState mutationState;
private Scan scan;
private PeekingResultIterator resultIterator;
public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
private final ParallelIteratorFactory delegateFactory;
private final TableRef tableRef;
private final MutationState mutationState;
public ChunkedResultIteratorFactory(ParallelIteratorFactory
delegateFactory, MutationState mutationState, TableRef tableRef) {
this.delegateFactory = delegateFactory;
this.tableRef = tableRef;
// Clone MutationState, as the one on the connection may change if auto commit is on
// while we need a handle to the original one (for it's transaction state).
this.mutationState = new MutationState(mutationState);
public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan,
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState,
StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
this.scan = scan;
this.chunkSize = chunkSize;
this.mutationState = mutationState;
// Instantiate single chunk iterator and the delegate iterator in constructor
// to get parallel scans kicked off in separate threads. If we delay this,
// we'll get serialized behavior (see PHOENIX-
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
String tableName = tableRef.getTable().getPhysicalName().getString();
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
public Tuple peek() throws SQLException {
return getResultIterator().peek();
public Tuple next() throws SQLException {
return getResultIterator().next();
public void explain(List<String> planSteps) {
public void close() throws SQLException {
private PeekingResultIterator getResultIterator() throws SQLException {
if (resultIterator.peek() == null && lastKey != null) {
scan = ScanUtil.newScan(scan);
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
String tableName = tableRef.getTable().getPhysicalName().getString();
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
return resultIterator;
* ResultIterator that runs over a single chunk of results (i.e. a portion of a scan).
private class SingleChunkResultIterator implements ResultIterator {
private int rowCount = 0;
private boolean chunkComplete;
private final ResultIterator delegate;
private final long chunkSize;
private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) {
Preconditions.checkArgument(chunkSize > 0);
this.delegate = delegate;
this.chunkSize = chunkSize;
public Tuple next() throws SQLException {
if (chunkComplete || lastKey == null) {
return null;
Tuple next =;
if (next != null) {
// We actually keep going past the chunk size until the row key changes. This is
// necessary for (at least) hash joins, as they can return multiple rows with the
// same row key. Stopping a chunk at a row key boundary is necessary in order to
// be able to start the next chunk on the next row key
if (rowCount == chunkSize) {
if (scan.getAttribute(STARTKEY_OFFSET) != null) {
} else if (rowCount > chunkSize && rowKeyChanged(next)) {
chunkComplete = true;
return null;
} else {
lastKey = null;
return next;
public void explain(List<String> planSteps) {
public void close() throws SQLException {
private boolean rowKeyChanged(Tuple newTuple) {
byte[] currentKey = lastKey.get();
int offset = lastKey.getOffset();
int length = lastKey.getLength();
if (scan.getAttribute(STARTKEY_OFFSET) != null) {
return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
* Prefix region start key to last key to form actual row key in case of local index scan.
private void addRegionStartKeyToLaskKey() {
byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET);
if (offsetBytes != null) {
int startKeyOffset = Bytes.toInt(offsetBytes);
byte[] actualLastkey =
new byte[startKeyOffset + lastKey.getLength() - lastKey.getOffset()];
System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, startKeyOffset);
System.arraycopy(lastKey.get(), lastKey.getOffset(), actualLastkey,
startKeyOffset, lastKey.getLength());
public String toString() {
return "SingleChunkResultIterator [rowCount=" + rowCount
+ ", chunkComplete=" + chunkComplete + ", delegate="
+ delegate + ", chunkSize=" + chunkSize + "]";