blob: 0e0cd9e443e8c7d20adaee1a5eb520de1a554667 [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.apache.hadoop.hbase.client.transactional;
import java.io.File;
import java.util.Collection;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.regionserver.transactional.SingleVersionDeleteNotSupported;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.fs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.ServiceException;
/**
* Table with transactional support.
*/
public class TransactionalTable extends HTable implements TransactionalTableClient {
static final Log LOG = LogFactory.getLog(RMInterface.class);
static private HConnection connection = null;
static Configuration config = HBaseConfiguration.create();
static ExecutorService threadPool;
static int retries = 15;
static int delay = 1000;
private String retryErrMsg = "Coprocessor result is null, retries exhausted";
static {
config.set("hbase.hregion.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion");
try {
connection = HConnectionManager.createConnection(config);
}
catch (IOException ioe) {
LOG.error("Exception on TransactionTable anonymous static method. IOException while creating HConnection");
}
threadPool = Executors.newCachedThreadPool();
}
/**
* @param tableName
* @throws IOException
*/
public TransactionalTable(final String tableName) throws IOException {
this(Bytes.toBytes(tableName));
}
/**
* @param tableName
* @throws IOException
*/
public TransactionalTable(final byte[] tableName) throws IOException {
super(tableName, connection, threadPool);
}
public void resetConnection() throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Resetting connection for " + this.getTableDescriptor().getTableName());
HConnection conn = this.getConnection();
conn = HConnectionManager.createConnection(this.getConfiguration());
}
private void addLocation(final TransactionState transactionState, HRegionLocation location) {
if (LOG.isTraceEnabled()) LOG.trace("addLocation ENTRY");
if (transactionState.addRegion(location)){
if (LOG.isTraceEnabled()) LOG.trace("addLocation added region [" + location.getRegionInfo().getRegionNameAsString() + " endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " to TS. Beginning txn " + transactionState.getTransactionId() + " on server");
}
if (LOG.isTraceEnabled()) LOG.trace("addLocation EXIT");
}
/**
* Method for getting data from a row
*
* @param get the Get to fetch
* @return the result
* @throws IOException
* @since 0.20.0
*/
public Result get(final TransactionState transactionState, final Get get) throws IOException {
return get(transactionState, get, true);
}
public Result get(final TransactionState transactionState, final Get get, final boolean bool_addLocation) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.get");
if (bool_addLocation) addLocation(transactionState, super.getRegionLocation(get.getRow()));
final String regionName = super.getRegionLocation(get.getRow()).getRegionInfo().getRegionNameAsString();
Batch.Call<TrxRegionService, GetTransactionalResponse> callable =
new Batch.Call<TrxRegionService, GetTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GetTransactionalResponse> rpcCallback =
new BlockingRpcCallback<GetTransactionalResponse>();
@Override
public GetTransactionalResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalRequest.Builder builder = GetTransactionalRequest.newBuilder();
builder.setGet(ProtobufUtil.toGet(get));
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
instance.get(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
GetTransactionalResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], TrxRegionProtos.GetTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class,
get.getRow(),
get.getRow(),
callable)
.entrySet().iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
//Collection<GetTransactionalResponse> results = result.values();
// Should only be one result, if more than one. Can't handle.
// Need to test whether '!=' or '>' is correct
//if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
//if(results.size() != 1)
// throw new IOException("Incorrect number of results from coprocessor call");
//GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];
//results.toArray(resultArray);
//if(resultArray.length == 0)
// throw new IOException("Problem with calling coprocessor, no regions returned result");
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
throw new IOException(result.getException());
return ProtobufUtil.toResult(result.getResult());
}
/**
* @param delete
* @throws IOException
* @since 0.20.0
*/
public void delete(final TransactionState transactionState, final Delete delete) throws IOException {
delete(transactionState, delete, true);
}
public void delete(final TransactionState transactionState, final Delete delete, final boolean bool_addLocation) throws IOException {
SingleVersionDeleteNotSupported.validateDelete(delete);
if (bool_addLocation) addLocation(transactionState, super.getRegionLocation(delete.getRow()));
final String regionName = super.getRegionLocation(delete.getRow()).getRegionInfo().getRegionNameAsString();
Batch.Call<TrxRegionService, DeleteTransactionalResponse> callable =
new Batch.Call<TrxRegionService, DeleteTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<DeleteTransactionalResponse> rpcCallback =
new BlockingRpcCallback<DeleteTransactionalResponse>();
@Override
public DeleteTransactionalResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalRequest.Builder builder = DeleteTransactionalRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
builder.setDelete(m1);
instance.delete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
byte[] row = delete.getRow();
DeleteTransactionalResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], DeleteTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class,
row,
row,
callable)
.entrySet().iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (ServiceException e) {
e.printStackTrace();
throw new IOException();
} catch (Throwable t) {
t.printStackTrace();
throw new IOException();
}
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
throw new IOException(result.getException());
}
/**
* Commit a Put to the table.
* <p>
* If autoFlush is false, the update is buffered.
*
* @param put
* @throws IOException
* @since 0.20.0
*/
public synchronized void put(final TransactionState transactionState, final Put put) throws IOException {
put(transactionState, put, true);
}
public synchronized void put(final TransactionState transactionState, final Put put, final boolean bool_addLocation) throws IOException {
validatePut(put);
if (LOG.isTraceEnabled()) LOG.trace("TransactionalTable.put ENTRY");
if (bool_addLocation) addLocation(transactionState, super.getRegionLocation(put.getRow()));
final String regionName = super.getRegionLocation(put.getRow()).getRegionInfo().getRegionNameAsString();
Batch.Call<TrxRegionService, PutTransactionalResponse> callable =
new Batch.Call<TrxRegionService, PutTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<PutTransactionalResponse> rpcCallback =
new BlockingRpcCallback<PutTransactionalResponse>();
@Override
public PutTransactionalResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalRequest.Builder builder = PutTransactionalRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
builder.setPut(m1);
instance.put(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
PutTransactionalResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], PutTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
put.getRow(),
put.getRow(),
callable)
.entrySet().iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while(retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
throw new IOException(result.getException());
// put is void, may not need to check result
if (LOG.isTraceEnabled()) LOG.trace("TransactionalTable.put EXIT");
}
public synchronized ResultScanner getScanner(final TransactionState transactionState, final Scan scan) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.getScanner");
if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching());
}
Long value = (long) 0;
TransactionalScanner scanner = new TransactionalScanner(this, transactionState, scan, value);
return scanner;
}
public boolean checkAndDelete(final TransactionState transactionState,
final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value,
final Delete delete) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.checkAndDelete row: " + row + " family: " + family + " qualifier: " + qualifier + " value: " + value);
if (!Bytes.equals(row, delete.getRow())) {
throw new IOException("Action's getRow must match the passed row");
}
final String regionName = super.getRegionLocation(delete.getRow()).getRegionInfo().getRegionNameAsString();
if(regionName == null)
throw new IOException("Null regionName");
Batch.Call<TrxRegionService, CheckAndDeleteResponse> callable =
new Batch.Call<TrxRegionService, CheckAndDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<CheckAndDeleteResponse> rpcCallback =
new BlockingRpcCallback<CheckAndDeleteResponse>();
@Override
public CheckAndDeleteResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest.Builder builder = CheckAndDeleteRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
builder.setRow(HBaseZeroCopyByteString.wrap(row));
if(family != null)
builder.setFamily(HBaseZeroCopyByteString.wrap(family));
else
builder.setFamily(HBaseZeroCopyByteString.wrap(new byte[]{}));
if(qualifier != null)
builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier));
else
builder.setQualifier(HBaseZeroCopyByteString.wrap(new byte[]{}));
if(value != null)
builder.setValue(HBaseZeroCopyByteString.wrap(value));
else
builder.setValue(HBaseZeroCopyByteString.wrap(new byte[]{}));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
builder.setDelete(m1);
instance.checkAndDelete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
CheckAndDeleteResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], CheckAndDeleteResponse>> it = super.coprocessorService(TrxRegionService.class,
delete.getRow(),
delete.getRow(),
callable)
.entrySet()
.iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
throw new IOException(result.getException());
return result.getResult();
}
public boolean checkAndPut(final TransactionState transactionState,
final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Put put) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.checkAndPut row: " + row
+ " family: " + family + " qualifier: " + qualifier
+ " value: " + value);
if (!Bytes.equals(row, put.getRow())) {
throw new IOException("Action's getRow must match the passed row");
}
final String regionName = super.getRegionLocation(put.getRow()).getRegionInfo().getRegionNameAsString();
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, region name: " + regionName);
//if(regionName == null)
//throw new IOException("Null regionName");
//System.out.println("RegionName: " + regionName);
Batch.Call<TrxRegionService, CheckAndPutResponse> callable =
new Batch.Call<TrxRegionService, CheckAndPutResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<CheckAndPutResponse> rpcCallback =
new BlockingRpcCallback<CheckAndPutResponse>();
@Override
public CheckAndPutResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRequest.Builder builder = CheckAndPutRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
builder.setRow(HBaseZeroCopyByteString.wrap(row));
if (family != null)
builder.setFamily(HBaseZeroCopyByteString.wrap(family));
else
builder.setFamily(HBaseZeroCopyByteString.wrap(new byte[]{}));
if (qualifier != null )
builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier));
else
builder.setQualifier(HBaseZeroCopyByteString.wrap(new byte[]{}));
if (value != null)
builder.setValue(HBaseZeroCopyByteString.wrap(value));
else
builder.setValue(HBaseZeroCopyByteString.wrap(new byte[]{}));
//Put p = new Put(ROW1);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
builder.setPut(m1);
instance.checkAndPut(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
CheckAndPutResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], CheckAndPutResponse>> it = super.coprocessorService(TrxRegionService.class,
put.getRow(),
put.getRow(),
callable)
.entrySet()
.iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
throw new IOException("ERROR while calling coprocessor " + sw.toString());
}
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
throw new IOException(result.getException());
return result.getResult();
}
/**
* Looking forward to TransactionalRegion-side implementation
*
* @param transactionState
* @param deletes
* @throws IOException
*/
public void delete(final TransactionState transactionState,
List<Delete> deletes) throws IOException {
long transactionId = transactionState.getTransactionId();
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.delete[] <List> size: " + deletes.size() + ", transid: " + transactionId);
// collect all rows from same region
final Map<TransactionRegionLocation, List<Delete>> rows = new HashMap<TransactionRegionLocation, List<Delete>>();
HRegionLocation hlocation = null;
TransactionRegionLocation location = null;
List<Delete> list = null;
int size = 0;
for (Delete del : deletes) {
hlocation = this.getRegionLocation(del.getRow(), false);
location = new TransactionRegionLocation(hlocation.getRegionInfo(), hlocation.getServerName());
if (LOG.isTraceEnabled()) LOG.trace("delete <List> with trRegion [" + location.getRegionInfo().getEncodedName() + "], endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " and transaction [" + transactionId + "], delete number: " + size);
if (!rows.containsKey(location)) {
if (LOG.isTraceEnabled()) LOG.trace("delete adding new <List> for region [" + location.getRegionInfo().getRegionNameAsString() + "], endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " and transaction [" + transactionId + "], delete number: " + size);
list = new ArrayList<Delete>();
rows.put(location, list);
} else {
list = rows.get(location);
}
list.add(del);
size++;
}
final List<Delete> rowsInSameRegion = new ArrayList<Delete>();
for (Map.Entry<TransactionRegionLocation, List<Delete>> entry : rows.entrySet()) {
rowsInSameRegion.clear();
rowsInSameRegion.addAll(entry.getValue());
final String regionName = entry.getKey().getRegionInfo().getRegionNameAsString();
Batch.Call<TrxRegionService, DeleteMultipleTransactionalResponse> callable =
new Batch.Call<TrxRegionService, DeleteMultipleTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<DeleteMultipleTransactionalResponse> rpcCallback =
new BlockingRpcCallback<DeleteMultipleTransactionalResponse>();
@Override
public DeleteMultipleTransactionalResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalRequest.Builder builder = DeleteMultipleTransactionalRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
for(Delete delete : rowsInSameRegion) {
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
builder.addDelete(m1);
}
instance.deleteMultiple(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
DeleteMultipleTransactionalResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], DeleteMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
entry.getValue().get(0).getRow(),
entry.getValue().get(0).getRow(),
callable)
.entrySet().iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
if(result == null)
throw new IOException(retryErrMsg);
else if (result.hasException())
throw new IOException(result.getException());
}
}
/**
* Put a set of rows
*
* @param transactionState
* @param puts
* @throws IOException
*/
public void put(final TransactionState transactionState,
final List<Put> puts) throws IOException {
long transactionId = transactionState.getTransactionId();
if (LOG.isTraceEnabled()) LOG.trace("Enter TransactionalTable.put[] <List> size: " + puts.size() + ", transid: " + transactionId);
// collect all rows from same region
final Map<TransactionRegionLocation, List<Put>> rows = new HashMap<TransactionRegionLocation, List<Put>>();
HRegionLocation hlocation = null;
TransactionRegionLocation location = null;
List<Put> list = null;
int size = 0;
for (Put put : puts) {
validatePut(put);
hlocation = this.getRegionLocation(put.getRow(), false);
location = new TransactionRegionLocation(hlocation.getRegionInfo(), hlocation.getServerName());
if (LOG.isTraceEnabled()) LOG.trace("put <List> with trRegion [" + location.getRegionInfo().getEncodedName() + "], endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " and transaction [" + transactionId + "], put number: " + size);
if (!rows.containsKey(location)) {
if (LOG.isTraceEnabled()) LOG.trace("put adding new <List> for region [" + location.getRegionInfo().getRegionNameAsString() + "], endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " and transaction [" + transactionId + "], put number: " + size);
list = new ArrayList<Put>();
rows.put(location, list);
} else {
list = rows.get(location);
}
list.add(put);
size++;
}
final List<Put> rowsInSameRegion = new ArrayList<Put>();
for (Map.Entry<TransactionRegionLocation, List<Put>> entry : rows.entrySet()) {
rowsInSameRegion.clear();
rowsInSameRegion.addAll(entry.getValue());
final String regionName = entry.getKey().getRegionInfo().getRegionNameAsString();
Batch.Call<TrxRegionService, PutMultipleTransactionalResponse> callable =
new Batch.Call<TrxRegionService, PutMultipleTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<PutMultipleTransactionalResponse> rpcCallback =
new BlockingRpcCallback<PutMultipleTransactionalResponse>();
@Override
public PutMultipleTransactionalResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest.Builder builder = PutMultipleTransactionalRequest.newBuilder();
builder.setTransactionId(transactionState.getTransactionId());
builder.setRegionName(ByteString.copyFromUtf8(regionName));
for (Put put : rowsInSameRegion){
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
builder.addPut(m1);
}
instance.putMultiple(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
PutMultipleTransactionalResponse result = null;
try {
int retryCount = 0;
boolean retry = false;
do {
Iterator<Map.Entry<byte[], PutMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
entry.getValue().get(0).getRow(),
entry.getValue().get(0).getRow(),
callable)
.entrySet().iterator();
if(it.hasNext()) {
result = it.next().getValue();
retry = false;
}
if(result == null || result.getException().contains("closing region")) {
Thread.sleep(TransactionalTable.delay);
retry = true;
transactionState.setRetried(true);
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
if(result == null)
throw new IOException(retryErrMsg);
else if (result.hasException())
throw new IOException(result.getException());
}
}
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List<Cell> list : put.getFamilyCellMap().values()) {
for (Cell c : list) {
if (KeyValueUtil.length(c) > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
}
private int maxKeyValueSize;
public HRegionLocation getRegionLocation(byte[] row, boolean f) throws IOException {
return super.getRegionLocation(row,f);
}
public void close() throws IOException { super.close(); }
public void setAutoFlush(boolean autoFlush, boolean b)
{
super.setAutoFlush(autoFlush, b);
}
public org.apache.hadoop.conf.Configuration getConfiguration()
{
return super.getConfiguration();
}
public void flushCommits()
throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
super.flushCommits();
}
public HConnection getConnection()
{
return super.getConnection();
}
public byte[][] getEndKeys()
throws IOException
{
return super.getEndKeys();
}
public byte[][] getStartKeys() throws IOException
{
return super.getStartKeys();
}
public void setWriteBufferSize(long writeBufferSize) throws IOException
{
super.setWriteBufferSize(writeBufferSize);
}
public long getWriteBufferSize()
{
return super.getWriteBufferSize();
}
public byte[] getTableName()
{
return super.getTableName();
}
public ResultScanner getScanner(Scan scan) throws IOException
{
return super.getScanner(scan);
}
public Result get(Get g) throws IOException
{
return super.get(g);
}
public Result[] get( List<Get> g) throws IOException
{
return super.get(g);
}
public void delete(Delete d) throws IOException
{
super.delete(d);
}
public void delete(List<Delete> deletes) throws IOException
{
super.delete(deletes);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException
{
return super.checkAndPut(row,family,qualifier,value,put);
}
public void put(Put p) throws InterruptedIOException,RetriesExhaustedWithDetailsException
{
super.put(p);
}
public void put(List<Put> p) throws InterruptedIOException,RetriesExhaustedWithDetailsException
{
super.put(p);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException
{
return super.checkAndDelete(row,family,qualifier,value,delete);
}
}