blob: 44f0708efd5b2d4ed285efd78f484da147e3d1a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
* Transaction} object. It mimics the behavior in {@link org.apache.hadoop.hbase.client.Table}
*/
public class TTable implements Closeable {
private static Logger LOG = LoggerFactory.getLogger(TTable.class);
private Table table;
private SnapshotFilter snapshotFilter;
private boolean serverSideFilter;
private final List<Mutation> mutations;
private boolean autoFlush = true;
private final boolean conflictFree;
// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
public TTable(Connection connection, byte[] tableName) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), false);
}
public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
public TTable(Connection connection, String tableName) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), false);
}
public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
public TTable(Table hTable) throws IOException {
this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), false);
}
public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
}
public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
}
public TTable(Connection connection, String tableName, boolean conflictFree) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
}
public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
}
public TTable(Table hTable, boolean conflictFree) throws IOException {
this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), conflictFree);
}
public TTable(Table hTable, SnapshotFilter snapshotFilter) throws IOException {
this(hTable, snapshotFilter, false);
}
public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
this(hTable, commitTableClient, false);
}
public TTable(Table hTable, boolean serverSideFilter, boolean conflictFree) throws IOException {
this.table = hTable;
this.conflictFree = conflictFree;
this.mutations = new ArrayList<Mutation>();
this.serverSideFilter = serverSideFilter;
this.snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
}
public TTable(Table hTable, SnapshotFilter snapshotFilter, boolean conflictFree) throws IOException {
this.table = hTable;
this.conflictFree = conflictFree;
this.mutations = new ArrayList<Mutation>();
this.snapshotFilter = snapshotFilter;
}
public TTable(Table hTable, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
this.table = hTable;
this.conflictFree = conflictFree;
this.mutations = new ArrayList<Mutation>();
this.serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
this.snapshotFilter = (serverSideFilter) ? new AttributeSetSnapshotFilter(hTable) :
new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
}
// ----------------------------------------------------------------------------------------------------------------
// Closeable implementation
// ----------------------------------------------------------------------------------------------------------------
/**
* Releases any resources held or pending changes in internal buffers.
*
* @throws IOException if a remote or network exception occurs.
*/
@Override
public void close() throws IOException {
table.close();
}
// ----------------------------------------------------------------------------------------------------------------
// Transactional operations
// ----------------------------------------------------------------------------------------------------------------
/**
* Transactional version of {@link Table#get(Get get)}
*
* @param get an instance of Get
* @param tx an instance of transaction to be used
* @return Result an instance of Result
* @throws IOException if a remote or network exception occurs.
*/
public Result get(Transaction tx, final Get get) throws IOException {
throwExceptionIfOpSetsTimerange(get);
HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
final long readTimestamp = transaction.getReadTimestamp();
final Get tsget = new Get(get.getRow()).setFilter(get.getFilter());
propagateAttributes(get, tsget);
TimeRange timeRange = get.getTimeRange();
long startTime = timeRange.getMin();
long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
tsget.setTimeRange(startTime, endTime).setMaxVersions(1);
Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
byte[] family = entry.getKey();
NavigableSet<byte[]> qualifiers = entry.getValue();
if (qualifiers == null || qualifiers.isEmpty()) {
tsget.addFamily(family);
} else {
for (byte[] qualifier : qualifiers) {
tsget.addColumn(family, qualifier);
tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
}
tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER);
tsget.addColumn(family, CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
}
}
LOG.trace("Initial Get = {}", tsget);
return snapshotFilter.get(tsget, transaction);
}
static private void propagateAttributes(OperationWithAttributes from, OperationWithAttributes to) {
Map<String,byte[]> attributeMap = from.getAttributesMap();
for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
to.setAttribute(entry.getKey(), entry.getValue());
}
}
private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException {
Result result = this.get(tx, deleteG);
if (!result.isEmpty()) {
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
.entrySet()) {
byte[] family = entryF.getKey();
for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
byte[] qualifier = entryQ.getKey();
addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, qualifier,
tx.getWriteTimestamp()));
}
deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
tx.getWriteTimestamp()));
}
}
}
private void familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put deleteP, Get deleteG) {
Set<byte[]> fset = deleteG.getFamilyMap().keySet();
for (byte[] family : fset) {
deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
tx.getWriteTimestamp()));
}
}
/**
* Transactional version of {@link Table#delete(Delete delete)}
*
* @param delete an instance of Delete
* @param tx an instance of transaction to be used
* @throws IOException if a remote or network exception occurs.
*/
public void delete(Transaction tx, Delete delete) throws IOException {
Put deleteP = deleteInternal(tx, delete);
if (!deleteP.isEmpty()) {
addMutation(deleteP);
}
}
private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
throwExceptionIfOpSetsTimerange(delete);
HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
final long writeTimestamp = transaction.getWriteTimestamp();
boolean deleteFamily = false;
final Put deleteP = new Put(delete.getRow(), writeTimestamp);
final Get deleteG = new Get(delete.getRow());
propagateAttributes(delete, deleteP);
propagateAttributes(delete, deleteG);
Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap();
if (fmap.isEmpty()) {
familyQualifierBasedDeletion(transaction, deleteP, deleteG);
}
for (List<Cell> cells : fmap.values()) {
for (Cell cell : cells) {
CellUtils.validateCell(cell, writeTimestamp);
switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
case DeleteColumn:
deleteP.addColumn(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp,
CellUtils.DELETE_TOMBSTONE);
addWriteSetElement(transaction,
new HBaseCellId(this,
delete.getRow(),
CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp));
break;
case DeleteFamily:
deleteG.addFamily(CellUtil.cloneFamily(cell));
deleteFamily = true;
break;
case Delete:
if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
deleteP.addColumn(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp,
CellUtils.DELETE_TOMBSTONE);
addWriteSetElement(transaction,
new HBaseCellId(this,
delete.getRow(),
CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell),
writeTimestamp));
break;
} else {
throw new UnsupportedOperationException(
"Cannot delete specific versions on Snapshot Isolation.");
}
default:
break;
}
}
}
if (deleteFamily) {
if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).
getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG);
} else {
familyQualifierBasedDeletion(transaction, deleteP, deleteG);
}
}
return deleteP;
}
/**
* Transactional version of {@link Table#put(Put put)}
*
* @param put an instance of Put
* @param tx an instance of transaction to be used
* @throws IOException if a remote or network exception occurs.
*/
public void put(Transaction tx, Put put) throws IOException {
put(tx, put, false);
}
/**
* @param put an instance of Put
* @param timestamp timestamp to be used as cells version
* @param commitTimestamp timestamp to be used as commit timestamp
* @throws IOException if a remote or network exception occurs.
*/
static public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
final Put tsput = new Put(put.getRow(), timestamp);
propagateAttributes(put, tsput);
Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
for (List<Cell> kvl : kvs.values()) {
for (Cell c : kvl) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), timestamp);
try {
tsput.add(kv);
} catch (IOException e) {
// The existing Put has this Cell, so the cloned one
// will never throw an IOException when it's added.
throw new RuntimeException(e);
}
tsput.addColumn(CellUtil.cloneFamily(kv),
CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
kv.getTimestamp(),
Bytes.toBytes(commitTimestamp));
}
}
return tsput;
}
/**
* @param put an instance of Put
* @param tx an instance of transaction to be used
* @param addShadowCell denotes whether to add the shadow cell
* @throws IOException if a remote or network exception occurs.
*/
public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
Put tsput = putInternal(tx, put, addShadowCell);
addMutation(tsput);
}
private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
throwExceptionIfOpSetsTimerange(put);
HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
final long writeTimestamp = transaction.getWriteTimestamp();
// create put with correct ts
final Put tsput = new Put(put.getRow(), writeTimestamp);
propagateAttributes(put, tsput);
Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
for (List<Cell> kvl : kvs.values()) {
for (Cell c : kvl) {
CellUtils.validateCell(c, writeTimestamp);
// Reach into keyvalue to update timestamp.
// It's not nice to reach into keyvalue internals,
// but we want to avoid having to copy the whole thing
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
tsput.add(kv);
if (addShadowCell) {
tsput.addColumn(CellUtil.cloneFamily(kv),
CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
kv.getTimestamp(),
Bytes.toBytes(kv.getTimestamp()));
} else {
HBaseCellId cellId = new HBaseCellId(this,
CellUtil.cloneRow(kv),
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
kv.getTimestamp());
addWriteSetElement(transaction, cellId);
}
}
}
return tsput;
}
private void addWriteSetElement(HBaseTransaction transaction, HBaseCellId cellId) {
if (conflictFree) {
transaction.addConflictFreeWriteSetElement(cellId);
} else {
transaction.addWriteSetElement(cellId);
}
}
private void addMutation(Mutation m) throws IOException {
this.mutations.add(m);
if (autoFlush) {
flushCommits();
}
}
private void addMutations(List<Mutation> mutations) throws IOException {
this.mutations.addAll(mutations);
if (autoFlush) {
flushCommits();
}
}
/**
* Transactional version of {@link Table#getScanner(Scan scan)}
*
* @param scan an instance of Scan
* @param tx an instance of transaction to be used
* @return ResultScanner an instance of ResultScanner
* @throws IOException if a remote or network exception occurs.
*/
public ResultScanner getScanner(Transaction tx, Scan scan) throws IOException {
throwExceptionIfOpSetsTimerange(scan);
HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
Scan tsscan = new Scan(scan);
tsscan.setMaxVersions(1);
tsscan.setTimeRange(0, transaction.getReadTimestamp() + 1);
propagateAttributes(scan, tsscan);
Map<byte[], NavigableSet<byte[]>> kvs = scan.getFamilyMap();
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
byte[] family = entry.getKey();
NavigableSet<byte[]> qualifiers = entry.getValue();
if (qualifiers == null) {
continue;
}
for (byte[] qualifier : qualifiers) {
tsscan.addColumn(family, CellUtils.addShadowCellSuffixPrefix(qualifier));
}
if (!qualifiers.isEmpty()) {
tsscan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER);
}
}
return snapshotFilter.getScanner(tsscan, transaction);
}
/**
*
* @return array of byte
*/
public byte[] getTableName() {
return table.getName().getName();
}
/**
* Delegates to {@link Table#getConfiguration()}
*
* @return standard configuration object
*/
public Configuration getConfiguration() {
return table.getConfiguration();
}
/**
* Delegates to {@link Table#getTableDescriptor()}
*
* @return HTableDescriptor an instance of HTableDescriptor
* @throws IOException if a remote or network exception occurs.
*/
public HTableDescriptor getTableDescriptor() throws IOException {
return table.getTableDescriptor();
}
/**
* Transactional version of {@link Table#exists(Get get)}
*
* @param transaction an instance of transaction to be used
* @param get an instance of Get
* @return true if cell exists
* @throws IOException if a remote or network exception occurs.
*/
public boolean exists(Transaction transaction, Get get) throws IOException {
Result result = get(transaction, get);
return !result.isEmpty();
}
/* TODO What should we do with this methods???
* @Override public void batch(Transaction transaction, List<? extends Row>
* actions, Object[] results) throws IOException, InterruptedException {}
*
* @Override public Object[] batch(Transaction transaction, List<? extends
* Row> actions) throws IOException, InterruptedException {}
*
* @Override public <R> void batchCallback(Transaction transaction, List<?
* extends Row> actions, Object[] results, Callback<R> callback) throws
* IOException, InterruptedException {}
*
* @Override public <R> Object[] batchCallback(List<? extends Row> actions,
* Callback<R> callback) throws IOException, InterruptedException {}
*/
/**
* Transactional version of {@link Table#get(List gets)}
*
* @param transaction an instance of transaction to be used
* @param gets list of Get instances
* @return array of Results
* @throws IOException if a remote or network exception occurs
*/
public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
Result[] results = new Result[gets.size()];
int i = 0;
for (Get get : gets) {
results[i++] = get(transaction, get);
}
return results;
}
/**
* Transactional version of {@link Table#getScanner(byte[] family)}
*
* @param transaction an instance of transaction to be used
* @param family column family
* @return an instance of ResultScanner
* @throws IOException if a remote or network exception occurs
*/
public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(transaction, scan);
}
/**
* Transactional version of {@link Table#getScanner(byte[] family, byte[] qualifier)}
*
* @param transaction an instance of transaction to be used
* @param family column family
* @param qualifier column name
* @return an instance of ResultScanner
* @throws IOException if a remote or network exception occurs
*/
public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier)
throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(transaction, scan);
}
/**
* Transactional version of {@link Table#put(List puts)}
*
* @param transaction an instance of transaction to be used
* @param puts List of puts
* @param addShadowCell denotes whether to add the shadow cell
* @throws IOException if a remote or network exception occurs
*/
public void put(Transaction transaction, List<Put> puts, boolean addShadowCells) throws IOException {
List<Mutation> mutations = new ArrayList<>(puts.size());
for (Put put : puts) {
mutations.add(putInternal(transaction, put, addShadowCells));
}
addMutations(mutations);
}
public void put(Transaction transaction, List<Put> puts) throws IOException {
put(transaction, puts, false);
}
/**
* Transactional version of {@link Table#batch(List<? extends Row> rows)}
*
* @param transaction an instance of transaction to be used
* @param rows List of rows that must be instances of Put or Delete
* @param addShadowCell denotes whether to add the shadow cell
* @throws IOException if a remote or network exception occurs
*/
public void batch(Transaction transaction, List<? extends Row> rows, boolean addShadowCells) throws IOException {
List<Mutation> mutations = new ArrayList<>(rows.size());
for (Row row : rows) {
if (row instanceof Put) {
mutations.add(putInternal(transaction, (Put)row, addShadowCells));
} else if (row instanceof Delete) {
Put deleteP = deleteInternal(transaction, (Delete)row);
if (!deleteP.isEmpty()) {
mutations.add(deleteP);
}
} else {
throw new UnsupportedOperationException("Unsupported mutation: " + row);
}
}
addMutations(mutations);
}
public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
batch(transaction, rows, false);
}
/**
* Transactional version of {@link Table#delete(List deletes)}
*
* @param transaction an instance of transaction to be used
* @param deletes List of deletes
* @throws IOException if a remote or network exception occurs
*/
public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
List<Mutation> mutations = new ArrayList<>(deletes.size());
for (Delete delete : deletes) {
Put deleteP = deleteInternal(transaction, delete);
if (!deleteP.isEmpty()) {
mutations.add(deleteP);
}
}
addMutations(mutations);
}
/**
* Provides access to the underliying Table in order to configure it or to perform unsafe (non-transactional)
* operations. The latter would break the transactional guarantees of the whole system.
*
* @return The underlying Table object
*/
public Table getHTable() {
return table;
}
public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}
public boolean isAutoFlush() {
return autoFlush;
}
public void flushCommits() throws IOException {
try {
table.batch(this.mutations, new Object[mutations.size()]);
} catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException(e);
} finally {
this.mutations.clear();
}
}
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
private void throwExceptionIfOpSetsTimerange(Get getOperation) {
TimeRange tr = getOperation.getTimeRange();
checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
}
private void throwExceptionIfOpSetsTimerange(Scan scanOperation) {
TimeRange tr = scanOperation.getTimeRange();
checkTimerangeIsSetToDefaultValuesOrThrowException(tr);
}
private void checkTimerangeIsSetToDefaultValuesOrThrowException(TimeRange tr) {
if (tr.getMin() != 0L || tr.getMax() != Long.MAX_VALUE) {
throw new IllegalArgumentException(
"Timestamp/timerange not allowed in transactional user operations");
}
}
private void throwExceptionIfOpSetsTimerange(Mutation userOperation) {
if (userOperation.getTimeStamp() != HConstants.LATEST_TIMESTAMP) {
throw new IllegalArgumentException(
"Timestamp not allowed in transactional user operations");
}
}
private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
if (tx instanceof HBaseTransaction) {
return (HBaseTransaction) tx;
} else {
throw new IllegalArgumentException(
String.format("The transaction object passed %s is not an instance of HBaseTransaction",
tx.getClass().getName()));
}
}
private HBaseTransactionManager enforceHBaseTransactionManagerAsParam(TransactionManager tm) {
if (tm instanceof HBaseTransactionManager) {
return (HBaseTransactionManager) tm;
} else {
throw new IllegalArgumentException(
String.format("The transaction manager object passed %s is not an instance of HBaseTransactionManager ",
tm.getClass().getName()));
}
}
}