blob: c228c36a3f324b425e40851ea2405d33a246202c [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
package org.trafodion.sql;
import org.trafodion.sql.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import org.apache.commons.codec.binary.Hex;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.nio.ByteOrder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.transactional.RMInterface;
import org.apache.hadoop.hbase.client.transactional.TransactionalAggregationClient;
import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
import org.apache.log4j.Logger;
// H98 coprocessor needs
import java.util.*;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.*;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.ipc.*;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.*;
import org.apache.hadoop.hbase.util.*;
//import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
// classes to do hbase pushdown filtering
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileUtil;
import java.util.UUID;
import java.security.InvalidParameterException;
public class HTableClient {
private static final int GET_ROW = 1;
private static final int BATCH_GET = 2;
private static final int SCAN_FETCH = 3;
private boolean useTRex;
private boolean useTRexScanner;
private static boolean envUseTRex;
private static boolean envUseTRexScanner;
private String tableName;
private static Connection connection;
private ResultScanner scanner = null;
private ScanHelper scanHelper = null;
Result[] getResultSet = null;
RMInterface table = null;
private boolean writeToWAL = false;
int numRowsCached = 1;
int numColsInScan = 0;
int[] kvValLen = null;
int[] kvValOffset = null;
int[] kvQualLen = null;
int[] kvQualOffset = null;
int[] kvFamLen = null;
int[] kvFamOffset = null;
long[] kvTimestamp = null;
byte[][] kvBuffer = null;
byte[][] rowIDs = null;
int[] kvsPerRow = null;
byte[][] kvFamArray = null;
byte[][] kvQualArray = null;
static ExecutorService executorService = null;
Future future = null;
boolean preFetch = false;
int fetchType = 0;
long jniObject = 0;
SnapshotScanHelper snapHelper = null;
class SnapshotScanHelper
{
Path snapRestorePath = null;
Admin admin = null;
Configuration conf = null;
SnapshotDescription snpDesc = null;
String tmpLocation = null;
FileSystem fs = null;
SnapshotScanHelper( Configuration cnfg , String tmpLoc, String snapName)
throws IOException
{
conf = cnfg;
tmpLocation = tmpLoc;
setSnapshotDescription(snapName);
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
fs = rootDir.getFileSystem(conf);
setSnapRestorePath();
}
String getTmpLocation()
{
return tmpLocation;
}
String getSnapshotName()
{
if (snpDesc == null)
return null;
return snpDesc.getName();
}
void setSnapRestorePath() throws IOException
{
String restoreDirStr = tmpLocation + getSnapshotDescription().getName(); ;
snapRestorePath = new Path(restoreDirStr);
snapRestorePath = snapRestorePath.makeQualified(fs.getUri(), snapRestorePath);
}
Path getSnapRestorePath() throws IOException
{
return snapRestorePath;
}
boolean snapshotExists() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.snapshotExists() called. ");
Admin admin = connection.getAdmin();
boolean retcode = !(admin.listSnapshots(snpDesc.getName()).isEmpty());
admin.close();
return retcode;
}
void deleteSnapshot() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteSnapshot() called. ");
if (snapshotExists())
{
Admin admin = connection.getAdmin();
admin.deleteSnapshot(snpDesc.getName());
admin.close();
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteSnapshot(). snapshot: " + snpDesc.getName() + " deleted.");
}
else
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteSnapshot(). snapshot: " + snpDesc.getName() + " does not exist.");
}
}
void deleteRestorePath() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteRestorePath() called. ");
if (fs.exists(snapRestorePath))
{
fs.delete(snapRestorePath, true);
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteRestorePath(). restorePath: " + snapRestorePath + " deleted.");
}
else
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteRestorePath(). restorePath: " + snapRestorePath + " does not exist.");
}
}
void createTableSnapshotScanner(int timeout, int slp, long nbre, Scan scan) throws InterruptedException, IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.createTableSnapshotScanner() called. ");
int xx=0;
IOException ioExc = null;
while (xx < timeout)
{
xx++;
scanner = null;
try
{
ioExc = null;
scanner = new TableSnapshotScanner(connection.getConfiguration(), snapHelper.getSnapRestorePath(), snapHelper.getSnapshotName(), scan);
}
catch(IOException e )
{
ioExc = e;
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.createTableSnapshotScanner(). espNumber: " + nbre +
" snapshot " + snpDesc.getName() + " TableSnapshotScanner Exception :" + e);
Thread.sleep(slp);
continue;
}
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.createTableSnapshotScanner(). espNumber: " +
nbre + " snapshot " + snpDesc.getName() + " TableSnapshotScanner Done - Scanner:" + scanner );
break;
}
if (ioExc != null)
throw ioExc;
}
void setSnapshotDescription( String snapName)
{
if (snapName == null )
throw new InvalidParameterException ("snapshotName is null.");
SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
builder.setTable(Bytes.toString(table.getTableName()));
builder.setName(snapName);
builder.setType(SnapshotDescription.Type.FLUSH);
snpDesc = builder.build();
}
SnapshotDescription getSnapshotDescription()
{
return snpDesc;
}
public void release() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("HTableClient.release(" + (tableName == null ? " tableName is null " : tableName) + ") called.");
if (admin != null)
{
admin.close();
admin = null;
}
}
}
public HTableClient(Connection connection) {
this.connection = connection;
}
class ScanHelper implements Callable {
public Result[] call() throws IOException {
return scanner.next(numRowsCached);
}
}
static Logger logger = Logger.getLogger(HTableClient.class.getName());;
static public byte[] getFamily(byte[] qc) {
byte[] family = null;
if (qc != null && qc.length > 0) {
int pos = Bytes.indexOf(qc, (byte) ':');
if (pos == -1)
family = Bytes.toBytes("cf1");
else
family = Arrays.copyOfRange(qc, 0, pos);
}
return family;
}
static public byte[] getName(byte[] qc) {
byte[] name = null;
if (qc != null && qc.length > 0) {
int pos = Bytes.indexOf(qc, (byte) ':');
if (pos == -1)
name = qc;
else
name = Arrays.copyOfRange(qc, pos + 1, qc.length);
}
return name;
}
public boolean setWriteBufferSize(long writeBufferSize) throws IOException {
if (logger.isDebugEnabled()) logger.debug("Enter HTableClient::setWriteBufferSize, size : " + writeBufferSize);
table.setWriteBufferSize(writeBufferSize);
return true;
}
public long getWriteBufferSize() {
if (logger.isDebugEnabled()) logger.debug("Enter HTableClient::getWriteBufferSize, size return : " + table.getWriteBufferSize());
return table.getWriteBufferSize();
}
public boolean setWriteToWAL(boolean v) {
if (logger.isDebugEnabled()) logger.debug("Enter HTableClient::setWriteToWALL, size : " + v);
writeToWAL = v;
return true;
}
public boolean init(String tblName,
boolean useTRex) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("Enter HTableClient::init, tableName: " + tblName);
this.useTRex = useTRex;
tableName = tblName;
if ( !this.useTRex ) {
this.useTRexScanner = false;
}
else {
// If the parameter useTRex is false, then do not go thru this logic
this.useTRex = envUseTRex;
this.useTRexScanner = envUseTRexScanner;
}
table = new RMInterface(tblName, connection);
if (logger.isDebugEnabled()) logger.debug("Exit HTableClient::init, useTRex: " + this.useTRex + ", useTRexScanner: "
+ this.useTRexScanner + ", table object: " + table);
return true;
}
String getTableName() {
return tableName;
}
String getHTableName() {
if (table == null)
return null;
else
return new String(table.getTableName());
}
private enum Op {
EQUAL, EQUAL_NULL, NOT_EQUAL, NOT_EQUAL_NULL, LESS, LESS_NULL, LESS_OR_EQUAL, LESS_OR_EQUAL_NULL, GREATER, GREATER_NULL,
GREATER_OR_EQUAL, GREATER_OR_EQUAL_NULL, NO_OP, NO_OP_NULL,IS_NULL, IS_NULL_NULL, IS_NOT_NULL, IS_NOT_NULL_NULL, AND, OR};
private Filter SingleColumnValueExcludeOrNotFilter(byte[] columnToFilter,
CompareOp op,
ByteArrayComparable comparator,
HashMap<String,Object> columnsToRemove,
Boolean... filterIfMissing){
Filter result;
boolean fMissing = filterIfMissing.length>0?filterIfMissing[0]:false;//default to false
if ((columnsToRemove == null) || !columnsToRemove.containsKey(new String(columnToFilter))){
result = new SingleColumnValueFilter(getFamily(columnToFilter), getName(columnToFilter), op, comparator);
((SingleColumnValueFilter)result).setFilterIfMissing(fMissing);
}
else{
result= new SingleColumnValueExcludeFilter(getFamily(columnToFilter), getName(columnToFilter), op, comparator);
((SingleColumnValueExcludeFilter)result).setFilterIfMissing(fMissing);
}
return result;
}
// construct the hbase filter
// optimizes for OR and AND associativity
// optimizes for detection of a<? and a>? on nullable and non nullable column equivalent to a<>?
// optimize for null check factorization (A not null and (A <op> ?)) or (A not null and A <op2> ?) -> A not null and (A <op> ? or A <op2> ?)
// this is an important optimzation for IN statement on non null column
// uses the columnToRemove parametter to know if we need to use the SingleColumnValue Exclude or not method to limit returned columns
private Filter constructV2Filter(Object[] colNamesToFilter,
Object[] compareOpList,
Object[] colValuesToCompare,
HashMap<String,Object> columnsToRemove){
LinkedList linkedList = new LinkedList();
//populate the list with nodes in reverse polish notation order.
int k=0;//column index
int kk=0;//value index
for (int i=1; i<compareOpList.length; i++){ // skip first one containing "V2" marker
String opStr = new String((byte[])compareOpList[i]);
switch(Op.valueOf(opStr)){
case EQUAL:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove
));
k++;kk++;
break;
case EQUAL_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true //filterIfMissing
),
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case NOT_EQUAL:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NOT_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case NOT_EQUAL_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NOT_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case LESS:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.LESS,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case LESS_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.LESS,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case LESS_OR_EQUAL:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.LESS_OR_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case LESS_OR_EQUAL_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.LESS_OR_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case GREATER:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.GREATER,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case GREATER_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.GREATER,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case GREATER_OR_EQUAL:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.GREATER_OR_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case GREATER_OR_EQUAL_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.GREATER_OR_EQUAL,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case NO_OP:
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NO_OP,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove));
k++;kk++;
break;
case NO_OP_NULL:
linkedList.addLast(new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}),//check for null indicator = 0 representing non null
columnsToRemove,
true), //filterIfMissing,
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NO_OP,
new BinaryComparator((byte[])colValuesToCompare[kk]),
columnsToRemove)));
k++;kk++;
break;
case IS_NULL:
// is null on a non nullable column!
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NO_OP, //exclude everything
new BinaryPrefixComparator((new byte[]{})),
columnsToRemove));
k++;
break;
case IS_NULL_NULL:
// is_null on nullable column: is absent OR has the first byte set to FF indicating NULL.
linkedList.addLast(
new FilterList(FilterList.Operator.MUST_PASS_ONE, //OR
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new NullComparator(),//is absent?
columnsToRemove),
SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{-1}),//0xFF has null prefix indicator
columnsToRemove)));
k++;
break;
case IS_NOT_NULL:
// is not null on a non nullable column!
// do nothing, always true
k++;
break;
case IS_NOT_NULL_NULL:
// is_not_null on nullable column: is not absent AND has the first byte not set to FF indicating NULL.
linkedList.addLast(SingleColumnValueExcludeOrNotFilter(
(byte[])colNamesToFilter[k],
CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(new byte[]{-1}),// 0xFF has null prefix indicator
columnsToRemove,
true));//filter if missing (if absent null)
k++;
break;
case AND:
linkedList.addLast("AND");
break;
case OR:
linkedList.addLast("OR");
break;
default:
}//switch
}//for
//evaluate the reverse polish notation list
while (linkedList.size()>1){// evaluate until only one element is left in the list
//look for first operator (AND or OR)
int j=0;
while (j<linkedList.size() && !(linkedList.get(j) instanceof String)){
j++;
}
//here j points on the first operator; (all operands are of type Filter)
if (j==linkedList.size()){logger.error("j==linkedList.size()");return null;} // should not happen
Filter leftOperand;
Filter rightOperand;
switch(Op.valueOf((String)linkedList.get(j))){
case AND:
FilterList filterListAnd = new FilterList(FilterList.Operator.MUST_PASS_ALL); //AND filterList
//left operand
leftOperand = (Filter)linkedList.get(j-2);
if (leftOperand instanceof FilterList && ((FilterList)leftOperand).getOperator()==FilterList.Operator.MUST_PASS_ALL){//associativity of AND optimization
//for(Filter f:((FilterList)leftOperand).getFilters())
// filterListAnd.addFilter(f);
filterListAnd = (FilterList)leftOperand; //more efficient than the 2 lines above (kept commented out for code lisibility)
}else{
filterListAnd.addFilter(leftOperand);
}
// right operand
rightOperand = (Filter)linkedList.get(j-1);
if (rightOperand instanceof FilterList && ((FilterList)rightOperand).getOperator()==FilterList.Operator.MUST_PASS_ALL){//associativity of AND optimization
for(Filter f:((FilterList)rightOperand).getFilters())
filterListAnd.addFilter(f);
}else{
filterListAnd.addFilter(rightOperand);
}
// setup evaluated filter
linkedList.set(j,filterListAnd); // replace the operator with the constructer filter
linkedList.remove(j-1);// remove right operand
linkedList.remove(j-2);// remove left operand. warning order matter
break;
case OR:
FilterList filterListOr = new FilterList(FilterList.Operator.MUST_PASS_ONE); //OR filterList
leftOperand = (Filter)linkedList.get(j-2);
rightOperand = (Filter)linkedList.get(j-1);
//begin detection of null check factorization (A not null and (A <op> ?)) or (A not null and A <op2> ?) -> A not null and (A <op> ? or A <op2> ?)
//the code is doing more than just nullcheck, but any factorization where left operands are identical
if (leftOperand instanceof FilterList && rightOperand instanceof FilterList &&
((FilterList)leftOperand).getOperator() == FilterList.Operator.MUST_PASS_ALL &&
((FilterList)rightOperand).getOperator() == FilterList.Operator.MUST_PASS_ALL &&
((FilterList)leftOperand).getFilters().size() == 2 &&
((FilterList)rightOperand).getFilters().size() == 2 &&
((FilterList)leftOperand).getFilters().get(0) instanceof SingleColumnValueFilter && //cannot be SingleColumnValueExcludeFilter when we have the optimization scenario
((FilterList)rightOperand).getFilters().get(0) instanceof SingleColumnValueFilter){//cannot be SingleColumnValueExcludeFilter when we have the optimization scenario
SingleColumnValueFilter scvfLeft = (SingleColumnValueFilter)((FilterList)leftOperand).getFilters().get(0);
SingleColumnValueFilter scvfRight = (SingleColumnValueFilter)((FilterList)rightOperand).getFilters().get(0);
if (scvfLeft.getOperator() == scvfRight.getOperator() && //more general case than just for null check (identical operands)
Arrays.equals(scvfLeft.getQualifier(),scvfRight.getQualifier()) &&
Arrays.equals(scvfLeft.getFamily(),scvfRight.getFamily()) &&
Arrays.equals(scvfLeft.getComparator().getValue(),scvfRight.getComparator().getValue()) &&
(scvfLeft.getFilterIfMissing() == scvfRight.getFilterIfMissing())){
Filter left = ((FilterList)leftOperand).getFilters().get(1);
Filter right = ((FilterList)rightOperand).getFilters().get(1);
if (left instanceof FilterList && ((FilterList)left).getOperator()==FilterList.Operator.MUST_PASS_ONE){//associativity of OR optimization
//for(Filter f:((FilterList)left).getFilters())
// filterListOr.addFilter(f);
filterListOr = (FilterList)left; // more efficient than the 2 lines above (kept commented out for code lisibility)
}else{
filterListOr.addFilter(left);
}
// right operand
if (right instanceof FilterList && ((FilterList)right).getOperator()==FilterList.Operator.MUST_PASS_ONE){//associativity of OR optimization
for(Filter f:((FilterList)right).getFilters())
filterListOr.addFilter(f);
}else{
filterListOr.addFilter(right);
}
linkedList.set(j,new FilterList(FilterList.Operator.MUST_PASS_ALL,scvfLeft,filterListOr));//resulting factorized AND filter
linkedList.remove(j-1);// remove right operand
linkedList.remove(j-2);// remove left operand. warning order matter
break;
}
}
//end detection of null (and more) check factorization
//begin detection of RangeSpec a<>? transformed to a<? or a>? to convert it back to a <> ? when we push down
//check for <> on non nullable columns
if (leftOperand instanceof SingleColumnValueFilter && rightOperand instanceof SingleColumnValueFilter){
SingleColumnValueFilter leftscvf = (SingleColumnValueFilter)leftOperand;
SingleColumnValueFilter rightscvf = (SingleColumnValueFilter)rightOperand;
if (leftscvf.getOperator() == CompareOp.LESS && rightscvf.getOperator()== CompareOp.GREATER &&
Arrays.equals(leftscvf.getQualifier(), rightscvf.getQualifier()) &&
Arrays.equals(leftscvf.getFamily(), rightscvf.getFamily()) &&
Arrays.equals(leftscvf.getComparator().getValue(),rightscvf.getComparator().getValue())
){
// setup evaluated filter
linkedList.set(j,new SingleColumnValueFilter(leftscvf.getFamily(), leftscvf.getQualifier(), CompareOp.NOT_EQUAL, leftscvf.getComparator())); // replace the operator with the constructer filter
linkedList.remove(j-1);// remove right operand
linkedList.remove(j-2);// remove left operand. warning order matter
break;
}
}
//check for <> on nullable column
if( leftOperand instanceof FilterList && rightOperand instanceof FilterList){
//no need to check FilterList size, as all possible case FilterList size is at least 2.
if (((FilterList)leftOperand).getFilters().get(1) instanceof SingleColumnValueFilter &&
((FilterList)rightOperand).getFilters().get(1) instanceof SingleColumnValueFilter){
SingleColumnValueFilter leftscvf = (SingleColumnValueFilter)((FilterList)leftOperand).getFilters().get(1);
SingleColumnValueFilter rightscvf = (SingleColumnValueFilter)((FilterList)rightOperand).getFilters().get(1);
if (leftscvf.getOperator() == CompareOp.LESS && rightscvf.getOperator()== CompareOp.GREATER &&
Arrays.equals(leftscvf.getQualifier(), rightscvf.getQualifier()) &&
Arrays.equals(leftscvf.getFamily(), rightscvf.getFamily()) &&
Arrays.equals(leftscvf.getComparator().getValue(),rightscvf.getComparator().getValue())
){
// setup evaluated filter
SingleColumnValueFilter nullCheck = new SingleColumnValueFilter(// null checker
leftscvf.getFamily(), leftscvf.getQualifier(),
CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[]{0x00}));
nullCheck.setFilterIfMissing(true);
linkedList.set(j,new FilterList(FilterList.Operator.MUST_PASS_ALL, //AND between if not null and the actual
nullCheck,
new SingleColumnValueFilter(
leftscvf.getFamily(), leftscvf.getQualifier(),
CompareOp.NOT_EQUAL,
leftscvf.getComparator())));
linkedList.remove(j-1);// remove right operand
linkedList.remove(j-2);// remove left operand. warning order matter
break;
}
}
}
//end detection of RangeSpec a<>?
//now general case...
//left operand
if (leftOperand instanceof FilterList && ((FilterList)leftOperand).getOperator()==FilterList.Operator.MUST_PASS_ONE){//associativity of OR optimization
//for(Filter f:((FilterList)leftOperand).getFilters())
// filterListOr.addFilter(f);
filterListOr = (FilterList)leftOperand; // more efficient than the 2 lines above (kept commented out for code lisibility)
}else{
filterListOr.addFilter(leftOperand);
}
// right operand
if (rightOperand instanceof FilterList && ((FilterList)rightOperand).getOperator()==FilterList.Operator.MUST_PASS_ONE){//associativity of OR optimization
for(Filter f:((FilterList)rightOperand).getFilters())
filterListOr.addFilter(f);
}else{
filterListOr.addFilter(rightOperand);
}
// setup evaluated filter
linkedList.set(j,filterListOr); // replace the operator with the constructer filter
linkedList.remove(j-1);// remove right operand
linkedList.remove(j-2);// remove left operand. warning order matter
break;
default:
logger.error("operator different than OR or AND???");
return null;//should never happen
}
}
// after evaluation, the linkedList contains only one element containing the filter built
return (Filter)linkedList.pop();
}
public boolean startScan(long transID, byte[] startRow, byte[] stopRow,
Object[] columns, long timestamp,
boolean cacheBlocks, boolean smallScanner, int numCacheRows,
Object[] colNamesToFilter,
Object[] compareOpList,
Object[] colValuesToCompare,
float dopParallelScanner,
float samplePercent,
boolean inPreFetch,
boolean useSnapshotScan,
int snapTimeout,
String snapName,
String tmpLoc,
int espNum,
int versions)
throws IOException {
Scan scan;
if (logger.isTraceEnabled()) logger.trace("Enter startScan() " + tableName + " txid: " + transID + " startRow="
+ ((startRow != null) ? (Bytes.equals(startRow, HConstants.EMPTY_START_ROW) ? "INFINITE" : Hex.encodeHexString(startRow)) : "NULL")
+ " stopRow=" + ((stopRow != null) ? (Bytes.equals(stopRow, HConstants.EMPTY_START_ROW) ? "INFINITE" : Hex.encodeHexString(stopRow)) : "NULL")
+ " CacheBlocks: " + cacheBlocks + " numCacheRows: " + numCacheRows + " Bulkread: " + useSnapshotScan);
if (startRow != null && startRow.toString() == "")
startRow = null;
if (stopRow != null && stopRow.toString() == "")
stopRow = null;
if (startRow != null && stopRow != null)
scan = new Scan(startRow, stopRow);
else
scan = new Scan();
if (versions != 0)
{
if (versions == -1)
scan.setMaxVersions();
else if (versions == -2)
{
scan.setMaxVersions();
scan.setRaw(true);
columns = null;
}
else if (versions > 0)
{
scan.setMaxVersions(versions);
}
}
if (cacheBlocks == true) {
scan.setCacheBlocks(true);
}
else
scan.setCacheBlocks(false);
scan.setSmall(smallScanner);
scan.setCaching(numCacheRows);
numRowsCached = numCacheRows;
if (columns != null) {
numColsInScan = columns.length;
for (int i = 0; i < columns.length ; i++) {
byte[] col = (byte[])columns[i];
scan.addColumn(getFamily(col), getName(col));
}
}
else
numColsInScan = 0;
if (colNamesToFilter != null) {
FilterList list;
boolean narrowDownResultColumns = false; //to check if we need a narrow down column filter (V2 only feature)
if (compareOpList == null)return false;
if (new String((byte[])compareOpList[0]).equals("V2")){ // are we dealing with predicate pushdown V2
list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
HashMap<String,Object> columnsToRemove = new HashMap<String,Object>();
//if columnsToRemove not null, we are narrowing down using the SingleColumnValue[Exclude]Filter method
//else we will use the explicit FamilyFilter and QualifierFilter
//the simplified logic is that we can use the first method if and only if each and every column in the
//pushed down predicate shows up only once.
for (int i = 0; i < colNamesToFilter.length; i++) {
byte[] colName = (byte[])colNamesToFilter[i];
// check if the filter column is already part of the column list, if not add it if we are limiting columns (not *)
if(columns!=null && columns.length > 0){// if not *
boolean columnAlreadyIn = false; //assume column not yet in the scan object
for (int k=0; k<columns.length;k++){
if (Arrays.equals(colName, (byte[])columns[k])){
columnAlreadyIn = true;//found already exist
break;//no need to look further
}
}
if (!columnAlreadyIn){// column was not already in, so add it
scan.addColumn(getFamily(colName),getName(colName));
narrowDownResultColumns = true; //since we added a column for predicate eval, we need to remove it later out of result set
String strColName = new String(colName);
if (columnsToRemove != null && columnsToRemove.containsKey(strColName)){// if we already added this column, it means it shows up more than once
columnsToRemove = null; // therefore, use the FamilyFilter/QualifierFilter method
}else if (columnsToRemove != null)// else
columnsToRemove.put(strColName,null); // add it to the list of column that should be nuked with the Exclude version of the SingleColumnValueFilter
}
}
}
if (columnsToRemove != null)
{ //we are almost done checking if Exclude version of SingleColumnnValueFilter can be used. Th elast check s about to know if there is a IS_NULL_NULL
//operation that cannot be using the Exclude method, as it is transformed in a filterList with OR, therefore we cannot guaranty that the SingleColumnValueExcludeFilter
//performing the exclusion will be reached.
boolean is_null_nullFound = false;
for (Object o:compareOpList ){
if (new String((byte[])o).equals("IS_NULL_NULL")){
is_null_nullFound = true;
break;
}
}
if (is_null_nullFound){
columnsToRemove = null; // disable Exclude method version of SingleColumnnValueFilter
}else
narrowDownResultColumns = false; // we will use the Exclude version of SingleColumnnValueFilter, so bypass the Family/QualifierFilter method
}
Filter f =constructV2Filter(colNamesToFilter,compareOpList,colValuesToCompare, columnsToRemove);
if (f==null) return false; // error logging done inside constructV2Filter
list.addFilter(f);
}//end V2
else{// deal with V1
list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
for (int i = 0; i < colNamesToFilter.length; i++) {
byte[] colName = (byte[])colNamesToFilter[i];
byte[] coByte = (byte[])compareOpList[i];
byte[] colVal = (byte[])colValuesToCompare[i];
if ((coByte == null) || (colVal == null)) {
return false;
}
String coStr = new String(coByte);
CompareOp co = CompareOp.valueOf(coStr);
SingleColumnValueFilter filter1 =
new SingleColumnValueFilter(getFamily(colName), getName(colName),
co, colVal);
list.addFilter(filter1);
}
}//end V1
// if we added a column for predicate eval, we need to filter down result columns
FilterList resultColumnsOnlyFilter = null;
if (narrowDownResultColumns){
HashMap<String,ArrayList<byte[]>> hm = new HashMap<String,ArrayList<byte[]>>(3);//use to deal with multiple family table
// initialize hm with list of columns requested for output
for (int i=0; i<columns.length; i++){ // if we are here we know columns is not null
if (hm.containsKey(new String(getFamily((byte[])columns[i])))){
hm.get(new String(getFamily((byte[])columns[i]))).add((byte[])columns[i]);
}else{
ArrayList<byte[]> al = new ArrayList<byte[]>();
al.add((byte[])columns[i]);
hm.put(new String(getFamily((byte[])columns[i])), al);
}
}
if (hm.size()==1){//only one column family
resultColumnsOnlyFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (columns.length == 1){
resultColumnsOnlyFilter.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(getName((byte[])columns[0]))));
}else{// more than one column
FilterList flColumns = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0; i<columns.length;i++)
flColumns.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(getName((byte[])columns[i]))));
resultColumnsOnlyFilter.addFilter(flColumns);
}
// note the optimization puting family check at the end
resultColumnsOnlyFilter.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(getFamily((byte[])columns[0]))));
}else{//more than one column family
resultColumnsOnlyFilter = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (Map.Entry<String,ArrayList<byte[]>> entry : hm.entrySet()){//for each column family
ArrayList<byte[]> alb = entry.getValue();
if (alb.size() == 1){// when only one column for the family
resultColumnsOnlyFilter.addFilter(
new FilterList(FilterList.Operator.MUST_PASS_ALL,
new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(getName(alb.get(0)))),
new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(getFamily(alb.get(0)))))
);
}else{// when multiple columns for the family
FamilyFilter familyFilter = null;
FilterList filterListCol = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int j = 0; j<alb.size(); j++){
if (familyFilter == null)
familyFilter = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(getFamily(alb.get(0))));
filterListCol.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(getName(alb.get(j)))));
}
resultColumnsOnlyFilter.addFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,filterListCol,familyFilter));
}
}
}
list.addFilter(resultColumnsOnlyFilter); // add column limiting filter
}//end narrowDownResultColumns
if (samplePercent > 0.0f)
list.addFilter(new RandomRowFilter(samplePercent));
// last optimization is making sure we remove top level filter list if it is singleton MUST_PASS_ALL filterlist
if (list.getFilters().size()==1){
scan.setFilter(list.getFilters().get(0));
if (logger.isTraceEnabled()) logger.trace("Pushed down filter:"+list.getFilters().get(0));
}else{
scan.setFilter(list);
if (logger.isTraceEnabled()) logger.trace("Pushed down filter:"+list );
}
} else if (samplePercent > 0.0f) {
scan.setFilter(new RandomRowFilter(samplePercent));
}
if (!useSnapshotScan || transID != 0)
{
if (useTRexScanner && (transID != 0)) {
scanner = table.getScanner(transID, scan);
} else {
scanner = table.getScanner(scan,dopParallelScanner);
}
if (logger.isTraceEnabled()) logger.trace("startScan(). After getScanner. Scanner: " + scanner+ " dop:"+
dopParallelScanner + "TransID " + transID + " " + useTRexScanner + " " + getTableName());
}
else
{
snapHelper = new SnapshotScanHelper(connection.getConfiguration(), tmpLoc,snapName);
if (logger.isTraceEnabled())
logger.trace("[Snapshot Scan] HTableClient.startScan(). useSnapshotScan: " + useSnapshotScan +
" espNumber: " + espNum +
" tmpLoc: " + snapHelper.getTmpLocation() +
" snapshot name: " + snapHelper.getSnapshotName());
if (!snapHelper.snapshotExists())
throw new IOException ("Snapshot " + snapHelper.getSnapshotName() + " does not exist.");
try {
snapHelper.createTableSnapshotScanner(snapTimeout, 5, espNum, scan);
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
if (useSnapshotScan)
preFetch = false;
else
preFetch = inPreFetch;
if (preFetch)
{
scanHelper = new ScanHelper();
future = executorService.submit(scanHelper);
}
fetchType = SCAN_FETCH;
if (logger.isTraceEnabled()) logger.trace("Exit startScan().");
return true;
}
public int startGet(long transID, byte[] rowID,
Object[] columns,
long timestamp) throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter startGet(" + tableName +
" #cols: " + ((columns == null) ? 0:columns.length ) +
" rowID: " + new String(rowID));
fetchType = GET_ROW;
Get get = new Get(rowID);
if (columns != null)
{
for (int i = 0; i < columns.length; i++) {
byte[] col = (byte[]) columns[i];
get.addColumn(getFamily(col), getName(col));
}
numColsInScan = columns.length;
}
else
numColsInScan = 0;
Result getResult;
if (useTRex && (transID != 0)) {
getResult = table.get(transID, get);
} else {
getResult = table.get(get);
}
if (getResult == null
|| getResult.isEmpty()) {
setJavaObject(jniObject);
return 0;
}
if (logger.isTraceEnabled()) logger.trace("startGet, result: " + getResult);
pushRowsToJni(getResult);
return 1;
}
// The TransactionalTable class is missing the batch get operation,
// so work around it.
private Result[] batchGet(long transactionID, List<Get> gets)
throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter batchGet(multi-row) " + tableName);
Result [] results = new Result[gets.size()];
int i=0;
for (Get g : gets) {
Result r = table.get(transactionID, g);
results[i++] = r;
}
return results;
}
public int startGet(long transID, Object[] rows,
Object[] columns, long timestamp)
throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter startGet(multi-row) " + tableName);
List<Get> listOfGets = new ArrayList<Get>();
for (int i = 0; i < rows.length; i++) {
byte[] rowID = (byte[])rows[i];
Get get = new Get(rowID);
listOfGets.add(get);
if (columns != null)
{
for (int j = 0; j < columns.length; j++ ) {
byte[] col = (byte[])columns[j];
get.addColumn(getFamily(col), getName(col));
}
}
}
if (columns != null)
numColsInScan = columns.length;
else
numColsInScan = 0;
if (useTRex && (transID != 0)) {
getResultSet = batchGet(transID, listOfGets);
fetchType = GET_ROW;
} else {
getResultSet = table.get(listOfGets);
fetchType = BATCH_GET;
}
if (getResultSet != null && getResultSet.length > 0) {
pushRowsToJni(getResultSet);
return getResultSet.length;
}
else {
setJavaObject(jniObject);
return 0;
}
}
public int getRows(long transID, short rowIDLen, Object rowIDs,
Object[] columns)
throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter getRows " + tableName);
ByteBuffer bbRowIDs = (ByteBuffer)rowIDs;
List<Get> listOfGets = new ArrayList<Get>();
short numRows = bbRowIDs.getShort();
short actRowIDLen ;
byte rowIDSuffix;
byte[] rowID;
for (int i = 0; i < numRows; i++) {
rowIDSuffix = bbRowIDs.get();
if (rowIDSuffix == '1')
actRowIDLen = (short)(rowIDLen+1);
else
actRowIDLen = rowIDLen;
rowID = new byte[actRowIDLen];
bbRowIDs.get(rowID, 0, actRowIDLen);
Get get = new Get(rowID);
listOfGets.add(get);
if (columns != null) {
for (int j = 0; j < columns.length; j++ ) {
byte[] col = (byte[])columns[j];
get.addColumn(getFamily(col), getName(col));
}
}
}
if (columns != null)
numColsInScan = columns.length;
else
numColsInScan = 0;
if (useTRex && (transID != 0)) {
getResultSet = batchGet(transID, listOfGets);
fetchType = GET_ROW;
} else {
getResultSet = table.get(listOfGets);
fetchType = BATCH_GET;
}
if (getResultSet.length != numRows)
throw new IOException("Number of rows retunred is not equal to requested number of rows");
pushRowsToJni(getResultSet);
return getResultSet.length;
}
public int fetchRows() throws IOException,
InterruptedException, ExecutionException {
int rowsReturned = 0;
if (logger.isTraceEnabled()) logger.trace("Enter fetchRows(). Table: " + tableName);
if (getResultSet != null)
{
rowsReturned = pushRowsToJni(getResultSet);
getResultSet = null;
return rowsReturned;
}
else
{
if (scanner == null) {
throw new IOException("HTableClient.FetchRows() called before scanOpen().");
}
Result[] result = null;
if (preFetch)
{
result = (Result[])future.get();
rowsReturned = pushRowsToJni(result);
future = null;
if ((rowsReturned <= 0 || rowsReturned < numRowsCached))
return rowsReturned;
future = executorService.submit(scanHelper);
}
else
{
result = scanner.next(numRowsCached);
rowsReturned = pushRowsToJni(result);
}
return rowsReturned;
}
}
protected int pushRowsToJni(Result[] result)
throws IOException {
if (result == null || result.length == 0)
return 0;
int rowsReturned = result.length;
int numTotalCells = 0;
if (numColsInScan == 0)
{
for (int i = 0; i < result.length; i++) {
numTotalCells += result[i].size();
}
}
else
// There can be maximum of 2 versions per kv
// So, allocate place holder to keep cell info
// for that many KVs
numTotalCells = 2 * rowsReturned * numColsInScan;
int numColsReturned;
Cell[] kvList;
Cell kv;
if (kvValLen == null ||
(kvValLen != null && numTotalCells > kvValLen.length))
{
kvValLen = new int[numTotalCells];
kvValOffset = new int[numTotalCells];
kvQualLen = new int[numTotalCells];
kvQualOffset = new int[numTotalCells];
kvFamLen = new int[numTotalCells];
kvFamOffset = new int[numTotalCells];
kvTimestamp = new long[numTotalCells];
kvBuffer = new byte[numTotalCells][];
kvFamArray = new byte[numTotalCells][];
kvQualArray = new byte[numTotalCells][];
}
if (rowIDs == null || (rowIDs != null &&
rowsReturned > rowIDs.length))
{
rowIDs = new byte[rowsReturned][];
kvsPerRow = new int[rowsReturned];
}
int cellNum = 0;
boolean colFound = false;
for (int rowNum = 0; rowNum < rowsReturned ; rowNum++)
{
rowIDs[rowNum] = result[rowNum].getRow();
kvList = result[rowNum].rawCells();
numColsReturned = kvList.length;
if ((cellNum + numColsReturned) > numTotalCells)
throw new IOException("Insufficient cell array pre-allocated");
kvsPerRow[rowNum] = numColsReturned;
for (int colNum = 0 ; colNum < numColsReturned ; colNum++, cellNum++)
{
kv = kvList[colNum];
kvValLen[cellNum] = kv.getValueLength();
kvValOffset[cellNum] = kv.getValueOffset();
kvQualLen[cellNum] = kv.getQualifierLength();
kvQualOffset[cellNum] = kv.getQualifierOffset();
kvFamLen[cellNum] = kv.getFamilyLength();
kvFamOffset[cellNum] = kv.getFamilyOffset();
kvTimestamp[cellNum] = kv.getTimestamp();
kvBuffer[cellNum] = kv.getValueArray();
kvFamArray[cellNum] = kv.getFamilyArray();
kvQualArray[cellNum] = kv.getQualifierArray();
colFound = true;
}
}
int cellsReturned;
if (colFound)
cellsReturned = cellNum++;
else
cellsReturned = 0;
if (cellsReturned == 0)
setResultInfo(jniObject, null, null,
null, null, null, null,
null, null, null, null, rowIDs, kvsPerRow, cellsReturned, rowsReturned);
else
setResultInfo(jniObject, kvValLen, kvValOffset,
kvQualLen, kvQualOffset, kvFamLen, kvFamOffset,
kvTimestamp, kvBuffer, kvFamArray, kvQualArray, rowIDs, kvsPerRow, cellsReturned, rowsReturned);
return rowsReturned;
}
protected int pushRowsToJni(Result result)
throws IOException {
int rowsReturned = 1;
int numTotalCells;
if (numColsInScan == 0)
numTotalCells = result.size();
else
// There can be maximum of 2 versions per kv
// So, allocate place holder to keep cell info
// for that many KVs
numTotalCells = 2 * rowsReturned * numColsInScan;
int numColsReturned;
Cell[] kvList;
Cell kv;
if (kvValLen == null ||
(kvValLen != null && numTotalCells > kvValLen.length))
{
kvValLen = new int[numTotalCells];
kvValOffset = new int[numTotalCells];
kvQualLen = new int[numTotalCells];
kvQualOffset = new int[numTotalCells];
kvFamLen = new int[numTotalCells];
kvFamOffset = new int[numTotalCells];
kvTimestamp = new long[numTotalCells];
kvBuffer = new byte[numTotalCells][];
kvFamArray = new byte[numTotalCells][];
kvQualArray = new byte[numTotalCells][];
}
if (rowIDs == null)
{
rowIDs = new byte[rowsReturned][];
kvsPerRow = new int[rowsReturned];
}
kvList = result.rawCells();
if (kvList == null)
numColsReturned = 0;
else
numColsReturned = kvList.length;
if ((numColsReturned) > numTotalCells)
throw new IOException("Insufficient cell array pre-allocated");
rowIDs[0] = result.getRow();
kvsPerRow[0] = numColsReturned;
for (int colNum = 0 ; colNum < numColsReturned ; colNum++)
{
kv = kvList[colNum];
kvValLen[colNum] = kv.getValueLength();
kvValOffset[colNum] = kv.getValueOffset();
kvQualLen[colNum] = kv.getQualifierLength();
kvQualOffset[colNum] = kv.getQualifierOffset();
kvFamLen[colNum] = kv.getFamilyLength();
kvFamOffset[colNum] = kv.getFamilyOffset();
kvTimestamp[colNum] = kv.getTimestamp();
kvBuffer[colNum] = kv.getValueArray();
kvFamArray[colNum] = kv.getFamilyArray();
kvQualArray[colNum] = kv.getQualifierArray();
}
if (numColsReturned == 0)
setResultInfo(jniObject, null, null,
null, null, null, null,
null, null, null, null, rowIDs, kvsPerRow, numColsReturned, rowsReturned);
else
setResultInfo(jniObject, kvValLen, kvValOffset,
kvQualLen, kvQualOffset, kvFamLen, kvFamOffset,
kvTimestamp, kvBuffer, kvFamArray, kvQualArray, rowIDs, kvsPerRow, numColsReturned, rowsReturned);
return rowsReturned;
}
public boolean deleteRow(final long transID, byte[] rowID,
Object[] columns,
long timestamp,
boolean asyncOperation,
final boolean useRegionXn) throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter deleteRow transID " + transID
+ " (" + new String(rowID) + ", " + timestamp + ") " + tableName);
final Delete del;
if (timestamp == -1)
del = new Delete(rowID);
else
del = new Delete(rowID, timestamp);
if (columns != null) {
for (int i = 0; i < columns.length ; i++) {
byte[] col = (byte[]) columns[i];
del.deleteColumns(getFamily(col), getName(col));
}
}
if (asyncOperation) {
future = executorService.submit(new Callable() {
public Object call() throws IOException {
boolean res = true;
if (useTRex && (transID != 0)) {
table.delete(transID, del);
}
else if (useRegionXn){
table.deleteRegionTx(del, /* auto-commit */ true);
}
else {
table.delete(del);
}
return new Boolean(res);
}
});
return true;
}
else {
if (useTRex && (transID != 0)) {
table.delete(transID, del);
}
else if (useRegionXn){
table.deleteRegionTx(del, /* auto-commit */ true);
}
else {
table.delete(del);
}
}
if (logger.isTraceEnabled()) logger.trace("Exit deleteRow");
return true;
}
public boolean deleteRows(final long transID, short rowIDLen, Object rowIDs,
long timestamp,
boolean asyncOperation) throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter deleteRowsInt() transID "
+ transID + " " + tableName);
final List<Delete> listOfDeletes = new ArrayList<Delete>();
listOfDeletes.clear();
ByteBuffer bbRowIDs = (ByteBuffer)rowIDs;
short numRows = bbRowIDs.getShort();
byte[] rowID;
byte rowIDSuffix;
short actRowIDLen;
for (short rowNum = 0; rowNum < numRows; rowNum++) {
rowIDSuffix = bbRowIDs.get();
if (rowIDSuffix == '1')
actRowIDLen = (short)(rowIDLen+1);
else
actRowIDLen = rowIDLen;
rowID = new byte[actRowIDLen];
bbRowIDs.get(rowID, 0, actRowIDLen);
Delete del;
if (timestamp == -1)
del = new Delete(rowID);
else
del = new Delete(rowID, timestamp);
listOfDeletes.add(del);
}
if (asyncOperation) {
future = executorService.submit(new Callable() {
public Object call() throws IOException {
boolean res = true;
if (useTRex && (transID != 0))
table.delete(transID, listOfDeletes);
else
table.delete(listOfDeletes);
return new Boolean(res);
}
});
return true;
}
else {
if (useTRex && (transID != 0))
table.delete(transID, listOfDeletes);
else
table.delete(listOfDeletes);
}
if (logger.isTraceEnabled()) logger.trace("Exit deleteRows");
return true;
}
public byte[] intToByteArray(int value) {
return new byte[] {
(byte)(value >>> 24),
(byte)(value >>> 16),
(byte)(value >>> 8),
(byte)value};
}
public boolean checkAndDeleteRow(long transID, byte[] rowID,
byte[] columnToCheck, byte[] colValToCheck,
long timestamp, final boolean useRegionXn) throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter checkAndDeleteRow transID " + transID
+ " (" + new String(rowID) + ", " + new String(columnToCheck) + ", " + new String(colValToCheck) + ", " + timestamp + ") " + tableName);
Delete del;
if (timestamp == -1)
del = new Delete(rowID);
else
del = new Delete(rowID, timestamp);
byte[] family = null;
byte[] qualifier = null;
if (columnToCheck.length > 0) {
family = getFamily(columnToCheck);
qualifier = getName(columnToCheck);
}
boolean res;
if (useTRex && (transID != 0)) {
res = table.checkAndDelete(transID, rowID, family, qualifier, colValToCheck, del);
}
else if (useRegionXn){
res = table.checkAndDeleteRegionTx(rowID, family, qualifier, colValToCheck,
del, /* autoCommit */ true);
}
else {
res = table.checkAndDelete(rowID, family, qualifier, colValToCheck, del);
}
if (res == false)
return false;
return true;
}
public boolean putRow(final long transID, final byte[] rowID, Object row,
byte[] columnToCheck, final byte[] colValToCheck,
final short colIndexToCheck,
final boolean checkAndPut, boolean asyncOperation,
final boolean useRegionXn) throws IOException, InterruptedException,
ExecutionException
{
if (logger.isTraceEnabled()) logger.trace("Enter putRow() " + tableName +
" transID: " + transID +
" useTRex: " + useTRex +
" useRegionXn: " + useRegionXn);
final Put put;
ByteBuffer bb;
short numCols;
short colNameLen;
int colValueLen;
byte[] family = null;
byte[] qualifier = null;
byte[] colName, colValue;
bb = (ByteBuffer)row;
put = new Put(rowID);
numCols = bb.getShort();
for (short colIndex = 0; colIndex < numCols; colIndex++)
{
colNameLen = bb.getShort();
colName = new byte[colNameLen];
bb.get(colName, 0, colNameLen);
colValueLen = bb.getInt();
colValue = new byte[colValueLen];
bb.get(colValue, 0, colValueLen);
put.add(getFamily(colName), getName(colName), colValue);
if (checkAndPut && colIndex == colIndexToCheck) {
family = getFamily(colName);
qualifier = getName(colName);
}
}
if (columnToCheck != null && columnToCheck.length > 0) {
family = getFamily(columnToCheck);
qualifier = getName(columnToCheck);
}
final byte[] family1 = family;
final byte[] qualifier1 = qualifier;
if (asyncOperation) {
future = executorService.submit(new Callable() {
public Object call() throws IOException {
boolean res = true;
if (checkAndPut) {
if (useTRex && (transID != 0)){
res = table.checkAndPut(transID, rowID,
family1, qualifier1, colValToCheck, put);
}
else if (useRegionXn){
if (logger.isTraceEnabled()) logger.trace("checkAndPutRegionTx with regionTX ");
res = table.checkAndPutRegionTx(rowID,
family1, qualifier1, colValToCheck, put, /* auto-commit */ true);
}
else {
res = table.checkAndPut(rowID,
family1, qualifier1, colValToCheck, put);
}
}
else {
if (useTRex && (transID != 0)){
table.put(transID, put);
}
else if (useRegionXn){
if (logger.isTraceEnabled()) logger.trace("putRow using putRegionTx");
table.putRegionTx(put, /* auto-commit */ true);
}else{
table.put(put);
}
}
return new Boolean(res);
}
});
return true;
} else {
boolean result = true;
if (checkAndPut) {
if (useTRex && (transID != 0)){
result = table.checkAndPut(transID, rowID,
family1, qualifier1, colValToCheck, put);
}
else if (useRegionXn){
if (logger.isTraceEnabled()) logger.trace("checkAndPutRegionTx using regionTX ");
result = table.checkAndPutRegionTx(rowID, family1, qualifier1,
colValToCheck, put, /* auto-commit */ true);
}
else {
result = table.checkAndPut(rowID,
family1, qualifier1, colValToCheck, put);
}
}
else {
if (useTRex && (transID != 0)){
table.put(transID, put);
}
else if (useRegionXn){
if (logger.isTraceEnabled()) logger.trace("putRow using putRegionTx");
table.putRegionTx(put, true /* also commit */);
}else{
table.put(put);
}
}
return result;
}
}
/* public boolean insertRow(long transID, byte[] rowID,
Object row,
long timestamp,
boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
return putRow(transID, rowID, row, null, null, 0,
false, asyncOperation, false);
} */
public boolean putRows(final long transID, short rowIDLen, Object rowIDs,
Object rows,
long timestamp, boolean asyncOperation)
throws IOException, InterruptedException, ExecutionException {
if (logger.isTraceEnabled()) logger.trace("Enter putRows() " + tableName +
" transID: " + transID +
" useTRex: " + useTRex);
Put put;
ByteBuffer bbRows, bbRowIDs;
short numCols, numRows;
short colNameLen;
int colValueLen;
byte[] colName, colValue, rowID;
byte rowIDSuffix;
short actRowIDLen;
bbRowIDs = (ByteBuffer)rowIDs;
bbRows = (ByteBuffer)rows;
final List<Put> listOfPuts = new ArrayList<Put>();
numRows = bbRowIDs.getShort();
for (short rowNum = 0; rowNum < numRows; rowNum++) {
rowIDSuffix = bbRowIDs.get();
if (rowIDSuffix == '1')
actRowIDLen = (short)(rowIDLen+1);
else
actRowIDLen = rowIDLen;
rowID = new byte[actRowIDLen];
bbRowIDs.get(rowID, 0, actRowIDLen);
put = new Put(rowID);
numCols = bbRows.getShort();
for (short colIndex = 0; colIndex < numCols; colIndex++)
{
colNameLen = bbRows.getShort();
colName = new byte[colNameLen];
bbRows.get(colName, 0, colNameLen);
colValueLen = bbRows.getInt();
colValue = new byte[colValueLen];
bbRows.get(colValue, 0, colValueLen);
put.add(getFamily(colName), getName(colName), colValue);
}
if (writeToWAL)
put.setWriteToWAL(writeToWAL);
listOfPuts.add(put);
}
if (asyncOperation) {
future = executorService.submit(new Callable() {
public Object call() throws IOException {
boolean res = true;
if (useTRex && (transID != 0))
table.put(transID, listOfPuts);
else
table.put(listOfPuts);
return new Boolean(res);
}
});
}
else {
if (useTRex && (transID != 0))
table.put(transID, listOfPuts);
else
table.put(listOfPuts);
}
return true;
}
public boolean completeAsyncOperation(int timeout, boolean resultArray[])
throws InterruptedException, ExecutionException
{
if (timeout == -1) {
if (! future.isDone())
return false;
}
try {
Boolean result = (Boolean)future.get(timeout, TimeUnit.MILLISECONDS);
// Need to enhance to return the result
// for each Put object
for (int i = 0; i < resultArray.length; i++)
resultArray[i] = result.booleanValue();
future = null;
} catch(TimeoutException te) {
return false;
}
return true;
}
/* public boolean checkAndInsertRow(long transID, byte[] rowID,
Object row,
long timestamp,
boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
return putRow(transID, rowID, row, null, null, 0,
true, asyncOperation, false);
} */
public boolean checkAndUpdateRow(long transID, byte[] rowID,
Object columns, byte[] columnToCheck, byte[] colValToCheck,
long timestamp, boolean asyncOperation) throws IOException, InterruptedException,
ExecutionException, Throwable {
short colIndexToCheck = 0; // overridden by columnToCheck
return putRow(transID, rowID, columns, columnToCheck,
colValToCheck, colIndexToCheck,
true, asyncOperation, false);
}
public byte[] coProcAggr(long transID, int aggrType,
byte[] startRowID,
byte[] stopRowID, byte[] colFamily, byte[] colName,
boolean cacheBlocks, int numCacheRows)
throws IOException, Throwable {
Configuration customConf = connection.getConfiguration();
long rowCount = 0;
if (transID > 0) {
TransactionalAggregationClient aggregationClient =
new TransactionalAggregationClient(customConf, connection);
Scan scan = new Scan();
scan.addFamily(colFamily);
scan.setCacheBlocks(false);
final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
new LongColumnInterpreter();
byte[] tname = getTableName().getBytes();
TransactionalTable lv_ttable = new TransactionalTable(getTableName(), connection);
TransactionState ts = table.registerTransaction(lv_ttable, transID, startRowID);
rowCount = aggregationClient.rowCount(transID, ts.getStartId(),
org.apache.hadoop.hbase.TableName.valueOf(getTableName()),
ci,
scan);
}
else {
AggregationClient aggregationClient =
new AggregationClient(customConf);
Scan scan = new Scan();
scan.addFamily(colFamily);
scan.setCacheBlocks(false);
final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
new LongColumnInterpreter();
byte[] tname = getTableName().getBytes();
rowCount = aggregationClient.rowCount(
org.apache.hadoop.hbase.TableName.valueOf(getTableName()),
ci,
scan);
}
byte[] rcBytes =
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(rowCount).array();
return rcBytes;
}
public boolean release(boolean cleanJniObject) throws IOException {
boolean retcode = false;
// Complete the pending IO
if (future != null) {
try {
future.get(30, TimeUnit.SECONDS);
} catch(TimeoutException e) {
logger.error("Asynchronous Thread is Cancelled (timeout), " + e);
retcode = true;
future.cancel(true); // Interrupt the thread
} catch(InterruptedException e) {
logger.error("Asynchronous Thread is Cancelled (interrupt), " + e);
retcode = true;
future.cancel(true); // Interrupt the thread
} catch (ExecutionException ee)
{
}
future = null;
}
if (scanner != null) {
if (logger.isTraceEnabled()) logger.trace("scanner.close() " + tableName + " " + scanner + " "
+ retcode );
scanner.close();
scanner = null;
}
if (snapHelper !=null)
{
snapHelper.release();
snapHelper = null;
}
cleanScan();
getResultSet = null;
if (cleanJniObject) {
if (jniObject != 0)
cleanup(jniObject);
tableName = null;
}
scanHelper = null;
jniObject = 0;
table.close();
return retcode;
}
public boolean close(boolean clearRegionCache, boolean cleanJniObject) throws IOException {
if (logger.isTraceEnabled()) logger.trace("Enter close() " + tableName);
if (table != null)
{
/*
if (clearRegionCache)
{
connection.clearRegionCache(tableName.getBytes());
}
*/
table.close();
table = null;
}
return true;
}
public byte[][] getStartKeys() throws IOException
{
return table.getStartKeys();
}
public byte[][] getEndKeys() throws IOException
{
return table.getEndKeys();
}
private void cleanScan()
{
if (fetchType == GET_ROW || fetchType == BATCH_GET)
return;
numRowsCached = 1;
numColsInScan = 0;
kvValLen = null;
kvValOffset = null;
kvQualLen = null;
kvQualOffset = null;
kvFamLen = null;
kvFamOffset = null;
kvTimestamp = null;
kvBuffer = null;
rowIDs = null;
kvsPerRow = null;
}
protected void setJniObject(long inJniObject) {
jniObject = inJniObject;
}
private native int setResultInfo(long jniObject,
int[] kvValLen, int[] kvValOffset,
int[] kvQualLen, int[] kvQualOffset,
int[] kvFamLen, int[] kvFamOffset,
long[] timestamp,
byte[][] kvBuffer,
byte[][] kvFamArray,
byte[][] kvQualArray,
byte[][] rowIDs,
int[] kvsPerRow, int numCellsReturned,
int rowsReturned);
private native void cleanup(long jniObject);
protected native int setJavaObject(long jniObject);
static {
envUseTRex = true;
envUseTRexScanner = true;
String useTransactions = System.getenv("USE_TRANSACTIONS");
if (useTransactions != null) {
int lv_useTransactions = (Integer.parseInt(useTransactions));
if (lv_useTransactions == 0)
envUseTRex = false;
}
String useTransactionsScanner = System.getenv("USE_TRANSACTIONS_SCANNER");
if (useTransactionsScanner != null) {
int lv_useTransactionsScanner = (Integer.parseInt(useTransactionsScanner));
if (lv_useTransactionsScanner == 0)
envUseTRexScanner = false;
}
executorService = Executors.newCachedThreadPool();
System.loadLibrary("executor");
}
}