blob: a12d40c0501d5588c9dfd88c6c2fef3f703c1115 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for
* basic scan plans, to avoid loading large quantities of data from HBase in one go.
*
* <p>
* Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17,
* we rely on pacing the server side scanners instead of pulling rows from the server in chunks.
* </p>
*/
@Deprecated
public class ChunkedResultIterator implements PeekingResultIterator {
private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
private final ParallelIteratorFactory delegateIteratorFactory;
private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
private final StatementContext context;
private final TableRef tableRef;
private final long chunkSize;
private final MutationState mutationState;
private Scan scan;
private PeekingResultIterator resultIterator;
private QueryPlan plan;
/**
* Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17,
* we rely on pacing the server side scanners instead of pulling rows from the server in chunks.
*/
@Deprecated
public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
private final ParallelIteratorFactory delegateFactory;
private final TableRef tableRef;
private final MutationState mutationState;
public ChunkedResultIteratorFactory(ParallelIteratorFactory
delegateFactory, MutationState mutationState, TableRef tableRef) {
this.delegateFactory = delegateFactory;
this.tableRef = tableRef;
// Clone MutationState, as the one on the connection may change if auto commit is on
// while we need a handle to the original one (for it's transaction state).
this.mutationState = new MutationState(mutationState);
}
@Override
public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan,
mutationState.getConnection().getQueryServices().getProps().getLong(
QueryServices.SCAN_RESULT_CHUNK_SIZE,
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner, plan);
}
}
private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState,
StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
this.scan = scan;
this.chunkSize = chunkSize;
this.mutationState = mutationState;
this.plan = plan;
// Instantiate single chunk iterator and the delegate iterator in constructor
// to get parallel scans kicked off in separate threads. If we delay this,
// we'll get serialized behavior (see PHOENIX-
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
String tableName = tableRef.getTable().getPhysicalName().getString();
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
@Override
public Tuple peek() throws SQLException {
return getResultIterator().peek();
}
@Override
public Tuple next() throws SQLException {
return getResultIterator().next();
}
@Override
public void explain(List<String> planSteps) {
resultIterator.explain(planSteps);
}
@Override
public void close() throws SQLException {
resultIterator.close();
}
private PeekingResultIterator getResultIterator() throws SQLException {
if (resultIterator.peek() == null && lastKey != null) {
resultIterator.close();
scan = ScanUtil.newScan(scan);
if(ScanUtil.isLocalIndex(scan)) {
scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey));
} else {
scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
}
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
String tableName = tableRef.getTable().getPhysicalName().getString();
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
ResultIterator singleChunkResultIterator =
new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()),
chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
return resultIterator;
}
/**
* ResultIterator that runs over a single chunk of results (i.e. a portion of a scan).
*/
private class SingleChunkResultIterator implements ResultIterator {
private int rowCount = 0;
private boolean chunkComplete;
private final ResultIterator delegate;
private final long chunkSize;
private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) {
Preconditions.checkArgument(chunkSize > 0);
this.delegate = delegate;
this.chunkSize = chunkSize;
}
@Override
public Tuple next() throws SQLException {
if (chunkComplete || lastKey == null) {
return null;
}
Tuple next = delegate.next();
if (next != null) {
// We actually keep going past the chunk size until the row key changes. This is
// necessary for (at least) hash joins, as they can return multiple rows with the
// same row key. Stopping a chunk at a row key boundary is necessary in order to
// be able to start the next chunk on the next row key
if (rowCount == chunkSize) {
next.getKey(lastKey);
} else if (rowCount > chunkSize && rowKeyChanged(next)) {
chunkComplete = true;
return null;
}
rowCount++;
} else {
lastKey = null;
}
return next;
}
@Override
public void explain(List<String> planSteps) {
delegate.explain(planSteps);
}
@Override
public void close() throws SQLException {
delegate.close();
}
private boolean rowKeyChanged(Tuple newTuple) {
byte[] currentKey = lastKey.get();
int offset = lastKey.getOffset();
int length = lastKey.getLength();
newTuple.getKey(lastKey);
return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
}
@Override
public String toString() {
return "SingleChunkResultIterator [rowCount=" + rowCount
+ ", chunkComplete=" + chunkComplete + ", delegate="
+ delegate + ", chunkSize=" + chunkSize + "]";
}
}
}