blob: a4433ac1e4ec4061dd49b32be76ff93ca1ff0ee4 [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CloseScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CloseScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ServiceException;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
/*
* Transaction Scanner
*/
public class TransactionalScanner extends AbstractClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
public Scan scan;
public Long scannerID;
public TransactionState ts;
public TransactionalTable ttable;
protected boolean closed = false;
// Experiment with this parameter, may be faster without having to send the final close()
protected boolean regionShouldNotCloseOnLast = true;
protected int nbRows = 100;
protected long nextCallSeq = 0;
private boolean hasMore = true;
private boolean moreScanners = true;
private boolean interrupted = false;
public HRegionInfo currentRegion;
public byte[] currentBeginKey;
public byte[] currentEndKey;
protected final LinkedList<Result> cache = new LinkedList<Result>();
public TransactionalScanner(final TransactionalTable ttable, final TransactionState ts, final Scan scan, final Long scannerID) {
super();
this.scan = scan;
this.scannerID = scannerID;
this.ts = ts;
this.ttable = ttable;
this.nbRows = scan.getCaching();
if (nbRows <= 0)
nbRows = 100;
try {
nextScanner(false);
}catch (IOException e) {
LOG.error("nextScanner error");
}
}
protected boolean checkScanStopRow(final byte [] endKey) {
if (this.scan.getStopRow().length > 0) {
byte [] stopRow = scan.getStopRow();
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
endKey, 0, endKey.length);
if (cmp <= 0) {
return true;
}
}
return false;
}
@Override
public void close() {
if(LOG.isTraceEnabled()) LOG.trace("close() -- ENTRY txID: " + ts.getTransactionId() + " " + this.ttable );
if(closed) {
if(LOG.isTraceEnabled()) LOG.trace("close() already closed -- EXIT txID: " + ts.getTransactionId());
return;
}
this.closed = true;
TrxRegionProtos.CloseScannerRequest.Builder requestBuilder = CloseScannerRequest.newBuilder();
requestBuilder.setTransactionId(ts.getTransactionId());
requestBuilder.setRegionName(ByteString.copyFromUtf8(currentRegion.getRegionNameAsString()));
requestBuilder.setScannerId(scannerID);
TrxRegionProtos.CloseScannerRequest closeRequest = requestBuilder.build();
TrxRegionProtos.CloseScannerResponse response = null;
try {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
response = trxService.closeScanner(null, closeRequest);
}
catch (Throwable e) {
String errMsg = "CloseScanner error on coprocessor call, scannerID: " + this.scannerID + " " + e;
LOG.error(errMsg);
//throw new IOException(errMsg, e);
}
if (response != null && response.getHasException()) {
String exception = response.getException();
String errMsg = "closeScanner encountered Exception txID: " +
ts.getTransactionId() + " Exception: " + exception;
LOG.error(errMsg);
//throw new IOException(errMsg);
}
if(LOG.isTraceEnabled()) LOG.trace("close() -- EXIT txID: " + ts.getTransactionId());
}
protected boolean nextScanner(final boolean done) throws IOException{
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- ENTRY txID: " + ts.getTransactionId());
if(this.currentBeginKey != null) {
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() currentBeginKey != null txID: " + ts.getTransactionId()
+ " currentBeginKey=" + (Bytes.equals(this.currentBeginKey, HConstants.EMPTY_START_ROW) ?
"INFINITE" : Hex.encodeHexString(this.currentBeginKey)));
if (regionShouldNotCloseOnLast){ // if the region won't automatically we need to tell it to close
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() regionShouldNotCloseOnLast is true; closing scanner");
close();
}
if((this.currentEndKey == HConstants.EMPTY_END_ROW) ||
Bytes.equals(this.currentEndKey, HConstants.EMPTY_BYTE_ARRAY) ||
checkScanStopRow(this.currentEndKey) ||
done) {
if(LOG.isTraceEnabled()) LOG.trace("endKey: " + (Bytes.equals(this.currentEndKey, HConstants.EMPTY_END_ROW) ?
"INFINITE" : Hex.encodeHexString(this.currentEndKey)));
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- EXIT -- returning false txID: " + ts.getTransactionId());
this.moreScanners = false;
return false;
}
else
//this.currentBeginKey = TransactionManager.binaryIncrementPos(this.currentEndKey,1);
this.currentBeginKey = this.currentEndKey;
}
else {
// First call to nextScanner
this.currentBeginKey = this.scan.getStartRow();
}
this.currentRegion = ttable.getRegionLocation(this.currentBeginKey, false).getRegionInfo();
this.currentEndKey = this.currentRegion.getEndKey();
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() txID: " + ts.getTransactionId() + " Region Info: " + currentRegion.getRegionNameAsString()
+ "currentBeginKey: " + (Bytes.equals(this.currentBeginKey, HConstants.EMPTY_START_ROW) ?
"INFINITE" : Hex.encodeHexString(this.currentBeginKey))
+ " currentEndKey: " + (Bytes.equals(this.currentEndKey, HConstants.EMPTY_END_ROW) ?
"INFINITE" : Hex.encodeHexString(this.currentEndKey)));
//if(this.currentEndKey != HConstants.EMPTY_END_ROW)
// this.currentEndKey = TransactionManager.binaryIncrementPos(currentRegion.getEndKey(), -1);
this.closed = false;
TrxRegionProtos.OpenScannerRequest.Builder requestBuilder = OpenScannerRequest.newBuilder();
requestBuilder.setTransactionId(ts.getTransactionId());
requestBuilder.setStartId(ts.getStartId());
requestBuilder.setRegionName(ByteString.copyFromUtf8(currentRegion.getRegionNameAsString()));
requestBuilder.setScan(ProtobufUtil.toScan(scan));
TrxRegionProtos.OpenScannerRequest openRequest = requestBuilder.build();
TrxRegionProtos.OpenScannerResponse response = null;
try {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
response = trxService.openScanner(null, openRequest);
this.scannerID = response.getScannerId();
}
catch (Throwable e) {
String errMsg = "OpenScanner error on coprocessor call, scannerID: " + this.scannerID ;
LOG.error(errMsg, e);
throw new IOException(errMsg, e);
}
if (response.getHasException()) {
String exception = response.getException();
String errMsg = "nextScanner encountered Exception txID: " +
ts.getTransactionId() + " Exception: " + exception;
LOG.error(errMsg);
throw new IOException(errMsg);
}
this.nextCallSeq = 0;
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- EXIT -- returning true txID: " + ts.getTransactionId());
return true;
}
@Override
public Result next() throws IOException {
// if (LOG.isTraceEnabled()) LOG.trace("next -- ENTRY txID: " + ts.getTransactionId() + " cache size: " + cache.size());
if(cache.size() == 0) {
if (LOG.isTraceEnabled()) LOG.trace("next -- cache.size() == 0 txID: " + ts.getTransactionId());
if(this.hasMore) {
if (LOG.isTraceEnabled())
LOG.trace("next() before coprocessor PerformScan call txID: " + ts.getTransactionId());
final long nextCallSeqInput = this.nextCallSeq;
TrxRegionProtos.PerformScanRequest.Builder requestBuilder = PerformScanRequest.newBuilder();
requestBuilder.setTransactionId(ts.getTransactionId());
requestBuilder.setStartId(ts.getStartId());
requestBuilder.setRegionName(ByteString.copyFromUtf8(currentRegion.getRegionNameAsString()));
requestBuilder.setScannerId(scannerID);
requestBuilder.setNumberOfRows(nbRows);
if (regionShouldNotCloseOnLast)
requestBuilder.setCloseScanner(false);
else
requestBuilder.setCloseScanner(true);
requestBuilder.setNextCallSeq(nextCallSeqInput);
TrxRegionProtos.PerformScanRequest perfScanRequest = requestBuilder.build();
TrxRegionProtos.PerformScanResponse response;
try {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
response = trxService.performScan(null, perfScanRequest);
}
catch (Throwable e) {
String errMsg = "PerformScan error on coprocessor call, scannerID: " + this.scannerID;
if(LOG.isErrorEnabled()) LOG.error(errMsg, e);
throw new IOException(errMsg, e);
}
if (response.getHasException()) {
String exception = response.getException();
String errMsg = "performScan encountered Exception txID: " +
ts.getTransactionId() + " Exception: " + exception;
LOG.error(errMsg);
throw new IOException(errMsg);
}
int count;
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result result;
this.nextCallSeq = response.getNextCallSeq();
count = response.getResultCount();
if (LOG.isTraceEnabled()) LOG.trace("next() nextCallSeq: " + this.nextCallSeq +
" count: " + count);
if(count == 0) {
this.hasMore = false;
}
else {
for (int i = 0; i < count; i++) {
result = response.getResult(i);
if (result != null) {
cache.add(ProtobufUtil.toResult(result));
}
this.hasMore = response.getHasMore();
if (LOG.isTraceEnabled())
LOG.trace(" PerformScan response count " + count + ", hasMore is " + hasMore + ", result " + result);
}
}
}
else {
if(LOG.isTraceEnabled()) LOG.trace("hasMore is false");
if(nextScanner(false)){
if(LOG.isTraceEnabled()) LOG.trace("next(), nextScanner == true nextCallSeq: " + this.nextCallSeq);
this.hasMore = true;
return next();
}
else {
if(LOG.isTraceEnabled()) LOG.trace("next(), nextScanner == false");
this.moreScanners = false;
return null;
}
}
}
if (cache.size() > 0) {
// if(LOG.isTraceEnabled()) LOG.trace("next() returning cache.poll()");
return cache.poll();
}
else if(this.moreScanners){
if(LOG.isTraceEnabled()) LOG.trace("next() more scanners");
return next();
}
else {
if(LOG.isTraceEnabled()) LOG.trace("next() returning null");
return null;
}
}
#ifdef HDP2.3
@Override
#endif
public boolean renewLease() {
return true;
}
}