PR 275 Merge vanilla HBase 0.98.10 support
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index e367326..96f3698 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -603,7 +603,7 @@
export HIVE_JAR_DIRS="$APACHE_HIVE_HOME/lib"
- export HBASE_TRX_JAR=hbase-trx-hbase_98_4-${TRAFODION_VER}.jar
+ export HBASE_TRX_JAR=hbase-trx-hbase_98_10-${TRAFODION_VER}.jar
# end of code for Apache Hadoop/HBase installation w/o distro
else
diff --git a/core/sqf/sql/scripts/install_apache_hadoop b/core/sqf/sql/scripts/install_apache_hadoop
index 8d766ea..84b317b 100755
--- a/core/sqf/sql/scripts/install_apache_hadoop
+++ b/core/sqf/sql/scripts/install_apache_hadoop
@@ -554,10 +554,10 @@
#HBASE_MIRROR_URL=http://psg.mtu.edu/pub/apache/hbase/hbase-0.98.3
#HBASE_MIRROR_URL=http://archive.cloudera.com/cdh5/cdh/5
-HBASE_MIRROR_URL=http://archive.apache.org/dist/hbase/hbase-0.98.6/
+HBASE_MIRROR_URL=http://archive.apache.org/dist/hbase/hbase-0.98.10/
#HBASE_TAR=hbase-0.98.6-cdh5.3.0.tar.gz
-HBASE_TAR=hbase-0.98.6-hadoop2-bin.tar.gz
+HBASE_TAR=hbase-0.98.10-hadoop2-bin.tar.gz
#HBASE_TAR=hbase-0.98.4-hadoop2-bin.tar.gz
if [[ "$SQ_HBASE_DISTRO" = "HDP" ]]; then
HBASE_TAR=hbase-0.98.4.2.2.0.0-2041-hadoop2.tar.gz
diff --git a/core/sqf/sql/scripts/install_hadoop_regr_test_env b/core/sqf/sql/scripts/install_hadoop_regr_test_env
index b3eacc0..17da726 100755
--- a/core/sqf/sql/scripts/install_hadoop_regr_test_env
+++ b/core/sqf/sql/scripts/install_hadoop_regr_test_env
@@ -200,17 +200,17 @@
./dsdgen -force $FORCE -dir $MY_TPCDS_DATA_DIR -scale $SCALE -table store >>${MY_LOG_FILE} 2>&1
./dsdgen -force $FORCE -dir $MY_TPCDS_DATA_DIR -scale $SCALE -table promotion >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/date_dim >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/time_dim >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/item >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/customer >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/customer_demographics >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/household_demographics >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/customer_address >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/store >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/promotion >>${MY_LOG_FILE} 2>&1
- $MY_HDFS_CMD dfs -mkdir /hive/tpcds/store_sales >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/date_dim >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/time_dim >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/item >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/customer >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/customer_demographics >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/household_demographics >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/customer_address >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/store >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/promotion >>${MY_LOG_FILE} 2>&1
+ $MY_HDFS_CMD dfs -mkdir -p /hive/tpcds/store_sales >>${MY_LOG_FILE} 2>&1
cd $MY_TPCDS_DATA_DIR
diff --git a/core/sqf/src/seatrans/hbase-trx/Makefile b/core/sqf/src/seatrans/hbase-trx/Makefile
index 5cab6dd..e5e25f9 100644
--- a/core/sqf/src/seatrans/hbase-trx/Makefile
+++ b/core/sqf/src/seatrans/hbase-trx/Makefile
@@ -21,7 +21,7 @@
# This Makefile is just a thin shell to Maven, which is used to do the real build
-BLD_HBASE_APACHE_TRX_JARNAME =hbase-trx-hbase_98_4-$(TRAFODION_VER).jar
+BLD_HBASE_APACHE_TRX_JARNAME =hbase-trx-hbase_98_10-$(TRAFODION_VER).jar
BLD_HBASE_CDH_TRX_JARNAME =hbase-trx-cdh5_3-$(TRAFODION_VER).jar
BLD_HBASE_MAPR_TRX_JARNAME =hbase-trx-mapr4_0-$(TRAFODION_VER).jar
BLD_HBASE_HDP_TRX_JARNAME =hbase-trx-hdp2_2-$(TRAFODION_VER).jar
diff --git a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
index c717479..f936c1b 100755
--- a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
+++ b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
@@ -58,7 +58,7 @@
<properties>
<hadoop.version>2.4.0</hadoop.version>
- <hbase.version>0.98.4-hadoop2</hbase.version>
+ <hbase.version>0.98.10-hadoop2</hbase.version>
<!--<hbase.version>0.98.0.2.1.4.0-632-hadoop2</hbase.version>-->
<!--<hbase.version>0.98.3-hadoop2</hbase.version>-->
<!--<hbase.version>0.98.3-hadoop1</hbase.version>-->
@@ -70,7 +70,7 @@
<groupId>org.apache</groupId>
<modelVersion>4.0.0</modelVersion>
- <artifactId>hbase-trx-hbase_98_4</artifactId>
+ <artifactId>hbase-trx-hbase_98_10</artifactId>
<version>${env.TRAFODION_VER}</version>
<name>HBase - Trx</name>
<description>Trx of HBase usage</description>
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
index 7bc51cb..559e9e3 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
@@ -41,9 +41,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -671,25 +673,24 @@
}
}
+
+ // validate for well-formedness
+ public void validatePut(final Put put) throws IllegalArgumentException {
+ if (put.isEmpty()) {
+ throw new IllegalArgumentException("No columns to insert");
+ }
+ if (maxKeyValueSize > 0) {
+ for (List<Cell> list : put.getFamilyCellMap().values()) {
+ for (Cell c : list) {
+ if (KeyValueUtil.length(c) > maxKeyValueSize) {
+ throw new IllegalArgumentException("KeyValue size too large");
+ }
+ }
+ }
+ }
+ }
- // validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException {
- if (put.isEmpty()) {
- throw new IllegalArgumentException("No columns to insert");
- }
- if (maxKeyValueSize > 0) {
- for (List<KeyValue> list : put.getFamilyMap().values()) {
- for (KeyValue kv : list) {
- if (kv.getLength() > maxKeyValueSize) {
- throw new IllegalArgumentException(
- "KeyValue size too large");
- }
- }
- }
- }
- }
- public HRegionLocation getRegionLocation(byte[] row, boolean f)
- throws IOException {
+ public HRegionLocation getRegionLocation(byte[] row, boolean f) throws IOException {
return super.getRegionLocation(row, f);
}
public void close() throws IOException { super.close(); }
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
index 20450d1..0e0cd9e 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -47,9 +47,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -744,27 +746,26 @@
throw new IOException(result.getException());
}
}
-
- // validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException {
- if (put.isEmpty()) {
- throw new IllegalArgumentException("No columns to insert");
- }
- if (maxKeyValueSize > 0) {
- for (List<KeyValue> list : put.getFamilyMap().values()) {
- for (KeyValue kv : list) {
- if (kv.getLength() > maxKeyValueSize) {
- throw new IllegalArgumentException(
- "KeyValue size too large");
- }
- }
- }
- }
- }
-
- private int maxKeyValueSize;
-public HRegionLocation getRegionLocation(byte[] row, boolean f)
- throws IOException {
+
+ // validate for well-formedness
+ public void validatePut(final Put put) throws IllegalArgumentException {
+ if (put.isEmpty()) {
+ throw new IllegalArgumentException("No columns to insert");
+ }
+ if (maxKeyValueSize > 0) {
+ for (List<Cell> list : put.getFamilyCellMap().values()) {
+ for (Cell c : list) {
+ if (KeyValueUtil.length(c) > maxKeyValueSize) {
+ throw new IllegalArgumentException("KeyValue size too large");
+ }
+ }
+ }
+ }
+ }
+
+ private int maxKeyValueSize;
+
+ public HRegionLocation getRegionLocation(byte[] row, boolean f) throws IOException {
return super.getRegionLocation(row,f);
}
public void close() throws IOException { super.close(); }
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java
index 7e7c1c1..976803e 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java
@@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
@@ -53,10 +54,10 @@
* mechansim will currently treat DeleteColumn the same as Delete which could cause confusion.
*/
public static void validateDelete(final Delete delete) throws SingleVersionDeleteNotSupported {
- Collection<List<KeyValue>> values = delete.getFamilyMap().values();
- for (List<KeyValue> value : values) {
- for (KeyValue kv : value) {
- if (Type.Delete.getCode() == kv.getType()) {
+ Collection<List<Cell>> values = delete.getFamilyCellMap().values();
+ for (List<Cell> value : values) {
+ for (Cell cell : value) {
+ if (cell.getTypeByte() == Type.Delete.getCode()) {
throw new SingleVersionDeleteNotSupported();
}
}
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java
index 79816d7..4e22cd9 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java
@@ -24,20 +24,15 @@
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
-
-import java.lang.Class;
-
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.ListIterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
@@ -45,31 +40,27 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.Hex;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.io.DataInputBuffer;
/**
@@ -78,53 +69,96 @@
*/
public class TrxTransactionState extends TransactionState{
- static boolean sb_sqm_98_1;
- static boolean sb_sqm_98_4;
+ static boolean sb_sqm_98_1 = false;
+ static boolean sb_sqm_98_4 = false;
+ static boolean sb_sqm_98_9 = false;
+
static java.lang.reflect.Constructor c98_1 = null;
static java.lang.reflect.Constructor c98_4 = null;
+ static java.lang.reflect.Constructor c98_9 = null;
+
+ static Class keepDeletedCellsClazz = null;
+ static Class scaninfoClazz = null;
+ static Constructor scaninfoConstructor = null;
+ static Object[] scaninfoArgs = null;
static {
- sb_sqm_98_1 = true;
- try {
- NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null;
- c98_1 = ScanQueryMatcher.class.getConstructor(
- new Class [] {
- Scan.class,
- ScanInfo.class,
- java.util.NavigableSet.class,
- ScanType.class,
- long.class,
- long.class,
- long.class
- });
- }
- catch (NoSuchMethodException exc_nsm) {
- sb_sqm_98_1 = false;
- sb_sqm_98_4 = true;
- try {
- c98_4 = ScanQueryMatcher.class.getConstructor(
- new Class [] {
- Scan.class,
- ScanInfo.class,
- java.util.NavigableSet.class,
- ScanType.class,
- long.class,
- long.class,
- long.class,
- RegionCoprocessorHost.class
- });
- }
- catch (NoSuchMethodException exc_nsm2) {
- sb_sqm_98_4 = false;
- }
- }
+ String version = VersionInfo.getVersion();// the hbase version string, eg. "0.6.3-dev"
+ LOG.info("Got info of Class ScanQueryMatcher for HBase version :" + version);
- if (sb_sqm_98_1) {
- LOG.info("Got info of Class ScanQueryMatcher for HBase 98.1");
- }
- if (sb_sqm_98_4) {
- LOG.info("Got info of Class ScanQueryMatcher for HBase 98.4");
- }
+ try {
+ c98_1 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class,
+ ScanInfo.class,
+ java.util.NavigableSet.class,
+ ScanType.class,
+ long.class,
+ long.class,
+ long.class });
+ LOG.info("Got info of Class ScanQueryMatcher for HBase 98.1");
+ sb_sqm_98_1 = true;
+ } catch (NoSuchMethodException e) {
+ try {
+ c98_4 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class,
+ ScanInfo.class,
+ java.util.NavigableSet.class,
+ ScanType.class,
+ long.class,
+ long.class,
+ long.class,
+ RegionCoprocessorHost.class });
+ LOG.info("Got info of Class ScanQueryMatcher for HBase 98.4");
+ sb_sqm_98_4 = true;
+ } catch (NoSuchMethodException e1) {
+ try {
+ c98_9 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class,
+ ScanInfo.class,
+ java.util.NavigableSet.class,
+ ScanType.class,
+ long.class,
+ long.class,
+ long.class,
+ long.class,
+ RegionCoprocessorHost.class });
+ LOG.info("Got info of Class ScanQueryMatcher for HBase 98.9");
+ sb_sqm_98_9 = true;
+ } catch (NoSuchMethodException e2) {
+ throw new RuntimeException("HBase version :" + version + ". No matcher ScanQueryMatcher.");
+ }
+ }
+ }
+
+
+ try {
+ scaninfoClazz = Class.forName("org.apache.hadoop.hbase.regionserver.ScanInfo");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ Class[] types = null;
+
+ try {
+ types = new Class[] { byte[].class, int.class, int.class, long.class, boolean.class, long.class, KVComparator.class };
+ scaninfoConstructor = scaninfoClazz.getConstructor(types);
+ scaninfoArgs = new Object[] { null, 0, 1, HConstants.FOREVER, false, 0, KeyValue.COMPARATOR };
+ if (LOG.isTraceEnabled())
+ LOG.trace("Created ScanInfo instance before HBase 98.8");
+ } catch (Exception e) {
+
+ try {
+ keepDeletedCellsClazz = Class.forName("org.apache.hadoop.hbase.KeepDeletedCells");
+ } catch (ClassNotFoundException e1) {
+ throw new RuntimeException(e1.getMessage());
+ }
+ types = new Class[] { byte[].class, int.class, int.class, long.class, keepDeletedCellsClazz, long.class, KVComparator.class };
+ try {
+ scaninfoConstructor = scaninfoClazz.getConstructor(types);
+ scaninfoArgs = new Object[] { null, 0, 1, HConstants.FOREVER, Enum.valueOf(keepDeletedCellsClazz, "FALSE"), 0, KeyValue.COMPARATOR };
+ if (LOG.isTraceEnabled())
+ LOG.trace("Created ScanInfo instance after HBase 98.8");
+ } catch (Exception e1) {
+ LOG.error("Created ScanInfo instance ERROR");
+ throw new RuntimeException(e1.getMessage());
+ }
+ }
}
/**
@@ -199,14 +233,14 @@
e.add(kv);
}
try {
- long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
- e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
- this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
- //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId);
- if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+ long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+ e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
+ this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId);
+ if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
}
catch (IOException exp1) {
- LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
+ LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
//throw exp1;
}
}
@@ -222,7 +256,11 @@
public boolean hasWrite() {
return writeOrdering.size() > 0;
}
-
+
+ public int writeSize() {
+ return writeOrdering.size();
+ }
+
public synchronized void addDelete(final Delete delete) {
if (LOG.isTraceEnabled()) LOG.trace("addDelete -- ENTRY: delete: " + delete.toString());
@@ -245,14 +283,13 @@
e.add(kv);
}
try {
- long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+ long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
//if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with txid " + txid + " ts flush id " + this.flushTxId);
- if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
- }
- catch (IOException exp1) {
- LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
+ if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+ } catch (IOException exp1) {
+ LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
}
}
else {
@@ -649,61 +686,87 @@
super.setSequenceID(Long.MAX_VALUE);
//Store.ScanInfo scaninfo = new Store.ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, Cell.COMPARATOR);
- ScanInfo scaninfo = new ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, KeyValue.COMPARATOR);
+ //after hbase 0.98.8, ScanInfo instance need KeepDeletedCells as param instead of boolean
+ ScanInfo scaninfo = null;
+ try {
+ scaninfo = (ScanInfo) scaninfoConstructor.newInstance(scaninfoArgs);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
try {
- if (sb_sqm_98_1) {
- try {
- matcher = (ScanQueryMatcher) c98_1.newInstance(scan,
- scaninfo,
- null,
- ScanType.USER_SCAN,
- Long.MAX_VALUE,
- HConstants.LATEST_TIMESTAMP,
- 0);
- if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.1");
- }
- catch (InstantiationException exc_ins) {
- LOG.error("InstantiationException: " + exc_ins);
- }
- catch (IllegalAccessException exc_ill_acc) {
- LOG.error("IllegalAccessException: " + exc_ill_acc);
- }
- catch (InvocationTargetException exc_inv_tgt) {
- LOG.error("InvocationTargetException: " + exc_inv_tgt);
- }
-
- }
- else {
- try {
- matcher = (ScanQueryMatcher) c98_4.newInstance(scan,
- scaninfo,
- null,
- ScanType.USER_SCAN,
- Long.MAX_VALUE,
- HConstants.LATEST_TIMESTAMP,
- (long) 0,
- null);
- if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.4");
- }
- catch (InstantiationException exc_ins) {
- LOG.error("InstantiationException: " + exc_ins);
- }
- catch (IllegalAccessException exc_ill_acc) {
- LOG.error("IllegalAccessException: " + exc_ill_acc);
- }
- catch (InvocationTargetException exc_inv_tgt) {
- LOG.error("InvocationTargetException: " + exc_inv_tgt);
- }
-
- }
+ if (sb_sqm_98_1) {
+ try {
+ matcher = (ScanQueryMatcher) c98_1.newInstance(scan,
+ scaninfo,
+ null,
+ ScanType.USER_SCAN,
+ Long.MAX_VALUE,
+ HConstants.LATEST_TIMESTAMP,
+ 0);
+ if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.1");
+ }
+ catch (InstantiationException exc_ins) {
+ LOG.error("InstantiationException: " + exc_ins);
+ }
+ catch (IllegalAccessException exc_ill_acc) {
+ LOG.error("IllegalAccessException: " + exc_ill_acc);
+ }
+ catch (InvocationTargetException exc_inv_tgt) {
+ LOG.error("InvocationTargetException: " + exc_inv_tgt);
+ }
+
+ }
+ else if(sb_sqm_98_4) {
+ try {
+ matcher = (ScanQueryMatcher) c98_4.newInstance(scan,
+ scaninfo,
+ null,
+ ScanType.USER_SCAN,
+ Long.MAX_VALUE,
+ HConstants.LATEST_TIMESTAMP,
+ (long) 0,
+ null);
+ if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.4");
+ }
+ catch (InstantiationException exc_ins) {
+ LOG.error("InstantiationException: " + exc_ins);
+ }
+ catch (IllegalAccessException exc_ill_acc) {
+ LOG.error("IllegalAccessException: " + exc_ill_acc);
+ }
+ catch (InvocationTargetException exc_inv_tgt) {
+ LOG.error("InvocationTargetException: " + exc_inv_tgt);
+ }
+ }
+ else {
+ try {
+ matcher = (ScanQueryMatcher) c98_9.newInstance(scan,
+ scaninfo,
+ null,
+ ScanType.USER_SCAN,
+ Long.MAX_VALUE,
+ HConstants.LATEST_TIMESTAMP,
+ 0l,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ null);
+ if (LOG.isTraceEnabled())
+ LOG.trace("Created matcher using reflection for HBase 98.9");
+ } catch (InstantiationException exc_ins) {
+ LOG.error("InstantiationException: " + exc_ins);
+ } catch (IllegalAccessException exc_ill_acc) {
+ LOG.error("IllegalAccessException: " + exc_ill_acc);
+ } catch (InvocationTargetException exc_inv_tgt) {
+ LOG.error("InvocationTargetException: " + exc_inv_tgt);
+ }
+ }
}
catch (Exception e) {
LOG.error("error while instantiating the ScanQueryMatcher()" + e);
}
-
}
+
/**
* Get the next row of values from this transaction.
*
@@ -918,45 +981,43 @@
}
synchronized List<KeyValue> getKeyValues() {
- List<KeyValue> edits = new ArrayList<KeyValue>();
- Collection<List<KeyValue>> kvsList = null;
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ Collection<List<Cell>> cellList = null;
- if (put != null) {
- if (!put.getFamilyMap().isEmpty()) {
- kvsList = put.getFamilyMap().values();
- }
- } else if (delete != null) {
- if (delete.getFamilyCellMap().isEmpty()) {
- // If whole-row delete then we need to expand for each
- // family
- kvsList = new ArrayList<List<KeyValue>>(1);
- for (byte[] family : tabledescriptor.getFamiliesKeys()) {
- KeyValue familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(),
- KeyValue.Type.DeleteFamily);
- kvsList.add(Collections.singletonList(familyDelete));
- }
- } else {
- kvsList = delete.getFamilyMap().values();
- }
- } else {
- throw new IllegalStateException("WriteAction is invalid");
- }
+ if (put != null) {
+ if (!put.getFamilyCellMap().isEmpty()) {
+ cellList = put.getFamilyCellMap().values();
+ }
+ } else if (delete != null) {
+ if (delete.getFamilyCellMap().isEmpty()) {
+ // If whole-row delete then we need to expand for each family
+ cellList = new ArrayList<List<Cell>>(1);
+ for (byte[] family : tabledescriptor.getFamiliesKeys()) {
+ Cell familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(),
+ KeyValue.Type.DeleteFamily);
+ cellList.add(Collections.singletonList(familyDelete));
+ }
+ } else {
+ cellList = delete.getFamilyCellMap().values();
+ }
+ } else {
+ throw new IllegalStateException("WriteAction is invalid");
+ }
- if (kvsList != null) {
- for (List<KeyValue> kvs : kvsList) {
- for (KeyValue kv : kvs) {
- edits.add(kv);
- //if (LOG.isDebugEnabled()) LOG.debug("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " create edits for transaction: "
- // + transactionId + " with Op " + kv.getType());
- }
- }
- }
- else
- if (LOG.isTraceEnabled()) LOG.trace("Trafodion getKeyValues: "
- + regionInfo.getRegionNameAsString() + " kvsList was null");
- return edits;
- }
+ if (cellList != null) {
+ for (List<Cell> cells : cellList) {
+ for (Cell cell : cells) {
+ edits.add(new KeyValue(cell));
+ // if (LOG.isDebugEnabled()) LOG.debug("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " create edits for transaction: "
+ // + transactionId + " with Op " + kv.getType());
+ }
+ }
+ } else if (LOG.isTraceEnabled())
+ LOG.trace("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " kvsList was null");
+ return edits;
+ }
}
+
public Set<TrxTransactionState> getTransactionsToCheck() {
return transactionsToCheck;
}