blob: 82552881482dca906501c8725985a43dac942826 [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
// SsccTableClient.java
package org.apache.hadoop.hbase.coprocessor.transactional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.DtmConst;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRegionService;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.regionserver.transactional.IdTm;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
public class SsccTableClientUtils {
protected final static Log log = LogFactory.getLog(SsccTableClientUtils.class);
private static final int ID_TM_SERVER_TIMEOUT = 1000;
static String regionname = "RegionName";
static HTable ht = null;
private static long i = 1L;
private static AtomicLong startId = new AtomicLong(10);
private static IdTm idServer = new IdTm(false);
static Map<String, TransactionState> transMap = new HashMap<String, TransactionState>();
static long scannerId = 0L;
static int returnStatus = 0;
static boolean checkResult = false;
static boolean hasMore = false;
static long totalRows = 0L;
static boolean continuePerform = true;
static byte[][] startKeys = null;
static int startPos = 0;
static byte[] startRow = null;
static byte[] lastRow = null;
static List<HRegionLocation> regionsList = null;
static int regionCount = 0;
static Scan scan = null;
static Pair<byte[][], byte[][]> startEndKeys = null;
private static final String TABLE_NAME = "table1";
private static final byte[] FAMILY = Bytes.toBytes("family");
private static final byte[] FAMILYBAD = Bytes.toBytes("familybad");
private static final byte[] QUAL_A = Bytes.toBytes("a");
private static final byte[] QUAL_B = Bytes.toBytes("b");
static final String ROW1 = "row1";
static final String ROW2 = "row2";
static final String ROW3 = "row3";
static final String ROW4 = "row4";
static final String ROW5 = "row5";
static final String ROW6 = "row6";
static final int VALUE1 = 1;
static final int VALUE2 = 2;
static final int VALUE3 = 3;
static final int VALUE4 = 4;
static final int VALUE5 = 5;
private static final int COMMIT_OK = 1;
private static final int COMMIT_OK_READ_ONLY = 2;
private static final int COMMIT_UNSUCCESSFUL = 3;
private static final int COMMIT_CONFLICT = 5;
private static final int STATEFUL_UPDATE_OK = 1;
private static final int STATEFUL_UPDATE_CONFLICT = 2;
private static final int STATELESS_UPDATE_OK = 3;
private static final int STATELESS_UPDATE_CONFLICT = 5;
private static HBaseAdmin admin;
private static void genTransId() {
// System.out.println("gen || " + Thread.currentThread().getName() +
// " || " + transMap);
String threadName = Thread.currentThread().getName();
TransactionState ts = transMap.get(threadName);
if (ts == null) {
ts = new TransactionState(i++);
IdTmId seqId = new IdTmId();
try {
idServer.id(ID_TM_SERVER_TIMEOUT, seqId);
} catch (IdTmException e) {
e.printStackTrace();
}
ts.setStartId(seqId.val);
transMap.put(threadName, ts);
}
}
private static void destoryTransId() {
String threadName = Thread.currentThread().getName();
transMap.remove(threadName);
}
private static long getTransId() {
// System.out.println("get || " + Thread.currentThread().getName() +
// " || " + transMap);
TransactionState ts = transMap.get(Thread.currentThread().getName());
if (ts == null) {
return i++;
} else {
return ts.getTransactionId();
}
}
private static long getStartId() {
// System.out.println("get || " + Thread.currentThread().getName() +
// " || " + transMap);
TransactionState ts = transMap.get(Thread.currentThread().getName());
if (ts == null) {
IdTmId seqId = new IdTmId();
try {
idServer.id(ID_TM_SERVER_TIMEOUT, seqId);
} catch (IdTmException e) {
e.printStackTrace();
}
return seqId.val;
} else {
return ts.getStartId();
}
}
// Initialize and set up tables
public static void initialize() throws Exception {
// File workaround = new File(".");
// System.getProperties().put("hadoop.home.dir",
// workaround.getAbsolutePath());
// new File("./bin").mkdirs();
// new File("./bin/winutils.exe").createNewFile();
Configuration config = HBaseConfiguration.create();
// config.set("hbase.zookeeper.quorum", "sqws139.houston.hp.com");
// config.set("hbase.zookeeper.property.clientPort", "48370");
// config.set("hbase.master", "16.235.163.156:10560");
// config.set("hbase.zookeeper.quorum", "16.235.163.156");
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(3));
desc.addFamily(new HColumnDescriptor(DtmConst.TRANSACTION_META_FAMILY).setMaxVersions(3));
desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.transactional.SsccRegionEndpoint");
admin = new HBaseAdmin(config);
try {
log.info("Cleaning up the table " + TABLE_NAME);
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
} catch (TableNotFoundException e) {
log.info("Table " + TABLE_NAME + " was not found");
} catch (TableNotEnabledException n) {
log.info("Table " + TABLE_NAME + " is not enabled");
}
try {
log.info("Creating the table " + TABLE_NAME);
admin.createTable(desc);
} catch (TableExistsException e) {
log.info("Table " + TABLE_NAME + " already exists");
}
ht = new HTable(config, desc.getName());
try {
startKeys = ht.getStartKeys();
startRow = startKeys[startPos];
log.info("Table " + TABLE_NAME + " startRow is " + startRow);
} catch (IOException e) {
log.info("Table " + TABLE_NAME + " unable to get start keys" + e);
}
for (int i = 0; i < startKeys.length; i++) {
String regionLocation = ht.getRegionLocation(startKeys[i]).getHostname();
log.info("Table " + TABLE_NAME + " region location " + regionLocation + ", startKey is " + startKeys[i]);
}
try {
startEndKeys = ht.getStartEndKeys();
for (int i = 0; i < startEndKeys.getFirst().length; i++) {
log.info("First key: " + startEndKeys.getFirst()[i] + ", Second key: " + startEndKeys.getSecond()[i]);
}
} catch (Exception e) {
log.info("Table " + TABLE_NAME + " unable to get start and endkeys" + e);
}
regionsList = ht.getRegionsInRange(HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
int first = 0;
for (HRegionLocation regionLocation : regionsList) {
HRegionInfo region = regionLocation.getRegionInfo();
if (first == 0) {
regionname = region.getRegionNameAsString();
first++;
}
log.info(region.getRegionNameAsString());
}
}
static public void testSsccAbortTransaction() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccAbortTransaction");
final long id = getTransId();
destoryTransId();
Batch.Call<SsccRegionService, SsccAbortTransactionResponse> callable = new Batch.Call<SsccRegionService, SsccAbortTransactionResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccAbortTransactionResponse> rpcCallback = new BlockingRpcCallback<SsccAbortTransactionResponse>();
@Override
public SsccAbortTransactionResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionRequest.Builder builder = SsccAbortTransactionRequest
.newBuilder();
builder.setTransactionId(id);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.abortTransaction(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccAbortTransactionResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccAbortTransactionResponse aresponse : result.values()) {
boolean hasException = aresponse.getHasException();
String exception = aresponse.getException();
if (hasException) {
log.info(" " + Thread.currentThread().getName() + " SsccAbortTransactionResponse exception "
+ exception);
throw new IOException(exception);
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccAbortTransaction");
return;
}
static public void testSsccBeginTransaction() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccBeginTransaction");
genTransId();
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccBeginTransactionResponse> callable = new Batch.Call<SsccRegionService, SsccBeginTransactionResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccBeginTransactionResponse> rpcCallback = new BlockingRpcCallback<SsccBeginTransactionResponse>();
@Override
public SsccBeginTransactionResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionRequest.Builder builder = SsccBeginTransactionRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.beginTransaction(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
try {
ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccBeginTransaction with transId: " + id);
return;
}
static public void testSsccCheckAndDelete() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccCheckAndDelete");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndDeleteResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndDeleteResponse>();
@Override
public SsccCheckAndDeleteResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRequest.Builder builder = SsccCheckAndDeleteRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW1)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE1)));
Delete d = new Delete(Bytes.toBytes(ROW1));
d.deleteColumns(FAMILY, QUAL_A);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, d);
builder.setDelete(m1);
instance.checkAndDelete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndDeleteResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndDeleteResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" " + Thread.currentThread().getName() + " testSsccCheckAndDeleteResponse exception "
+ exception);
else
log.info(" " + Thread.currentThread().getName() + " testSsccCheckAndDeleteResponse result is "
+ checkResult);
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCheckAndDelete");
return;
}
static public void testSsccCheckAndDelete2() throws IOException {
log.info("Starting testSsccCheckAndDelete2");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndDeleteResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndDeleteResponse>();
@Override
public SsccCheckAndDeleteResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRequest.Builder builder = SsccCheckAndDeleteRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW1)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_B));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE2)));
Delete d = new Delete(Bytes.toBytes(ROW1));
d.deleteColumns(FAMILY, QUAL_B);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, d);
builder.setDelete(m1);
instance.checkAndDelete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndDeleteResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndDeleteResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" testSsccCheckAndDelete2Response exception " + exception);
else
log.info(" testSsccCheckAndDelete2Response result is " + checkResult);
}
log.info("Finished testSsccCheckAndDelete2");
return;
}
static public void testSsccCheckAndDelete4() throws IOException {
log.info("Starting testSsccCheckAndDelete");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndDeleteResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndDeleteResponse>();
@Override
public SsccCheckAndDeleteResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRequest.Builder builder = SsccCheckAndDeleteRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW2)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE2)));
Delete d = new Delete(Bytes.toBytes(ROW2));
d.deleteColumns(FAMILY, QUAL_A);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, d);
builder.setDelete(m1);
instance.checkAndDelete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndDeleteResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndDeleteResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" testSsccCheckAndDeleteResponse exception " + exception);
else
log.info(" testSsccCheckAndDeleteResponse result is " + checkResult);
}
log.info("Finished testSsccCheckAndDelete");
return;
}
static public void testSsccCheckAndPut() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccCheckAndPut");
final byte[] emptyVal = new byte[] {};
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndPutResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndPutResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndPutResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndPutResponse>();
@Override
public SsccCheckAndPutResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest.Builder builder = SsccCheckAndPutRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW1)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(emptyVal));
Put p = new Put(Bytes.toBytes(ROW1)).add(FAMILY, QUAL_A, Bytes.toBytes(1));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.checkAndPut(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndPutResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndPutResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" " + Thread.currentThread().getName() + " testSsccCheckAndPutResponse exception "
+ exception);
else
log.info(" " + Thread.currentThread().getName() + " testSsccCheckAndPutResponse result is "
+ checkResult);
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCheckAndPut");
return;
}
static public void testSsccCheckAndPut2() throws IOException {
log.info("Starting testSsccCheckAndPut2");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndPutResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndPutResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndPutResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndPutResponse>();
@Override
public SsccCheckAndPutResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest.Builder builder = SsccCheckAndPutRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW1)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE1)));
Put p = new Put(Bytes.toBytes(ROW1)).add(FAMILY, QUAL_B, Bytes.toBytes(2));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.checkAndPut(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndPutResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndPutResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" testSsccCheckAndPut2Response exception " + exception);
else
log.info(" testSsccCheckAndPut2Response result is " + checkResult);
}
log.info("Finished testSsccCheckAndPut2");
return;
}
static public void testSsccCheckAndPut3() throws IOException {
log.info("Starting testSsccCheckAndPut3");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndPutResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndPutResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndPutResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndPutResponse>();
@Override
public SsccCheckAndPutResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest.Builder builder = SsccCheckAndPutRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW1)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE1)));
Put p = new Put(Bytes.toBytes(ROW2)).add(FAMILY, QUAL_A, Bytes.toBytes(1));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.checkAndPut(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndPutResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndPutResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" testSsccCheckAndPut3Response exception " + exception);
else
log.info(" testSsccCheckAndPut3Response result is " + checkResult);
}
log.info("Finished testSsccCheckAndPut3");
return;
}
static public void testSsccCheckAndPut4() throws IOException {
log.info("Starting testSsccCheckAndPut4");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccCheckAndPutResponse> callable = new Batch.Call<SsccRegionService, SsccCheckAndPutResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCheckAndPutResponse> rpcCallback = new BlockingRpcCallback<SsccCheckAndPutResponse>();
@Override
public SsccCheckAndPutResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest.Builder builder = SsccCheckAndPutRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setRow(HBaseZeroCopyByteString.wrap(Bytes.toBytes(ROW2)));
builder.setFamily(HBaseZeroCopyByteString.wrap(FAMILY));
builder.setQualifier(HBaseZeroCopyByteString.wrap(QUAL_A));
builder.setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(VALUE2)));
Put p = new Put(Bytes.toBytes(ROW2)).add(FAMILY, QUAL_A, Bytes.toBytes(2));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.checkAndPut(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCheckAndPutResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCheckAndPutResponse cresponse : result.values()) {
checkResult = cresponse.getResult();
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException)
log.info(" testSsccCheckAndPut4Response exception " + exception);
else
log.info(" testSsccCheckAndPut4Response result is " + checkResult);
}
log.info("Finished testSsccCheckAndPut4");
return;
}
static public void testSsccCloseScanner() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccClosecanner");
final long id = getTransId();
Batch.Call<SsccRegionService, SsccCloseScannerResponse> callable = new Batch.Call<SsccRegionService, SsccCloseScannerResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCloseScannerResponse> rpcCallback = new BlockingRpcCallback<SsccCloseScannerResponse>();
@Override
public SsccCloseScannerResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerRequest.Builder builder = SsccCloseScannerRequest
.newBuilder();
builder.setTransactionId(id);
builder.setScannerId(scannerId++);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.closeScanner(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCloseScannerResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCloseScannerResponse cresponse : result.values()) {
boolean hasException = cresponse.getHasException();
String exception = cresponse.getException();
if (hasException)
log.info(" " + Thread.currentThread().getName() + " testSsccCloseScannerResponse exception "
+ exception);
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCloseScanner");
return;
}
static public void testSsccCommit() throws IOException {
final long id = getTransId();
log.info(" " + Thread.currentThread().getName() + " Starting testSsccCommit with transId: " + id);
destoryTransId();
Batch.Call<SsccRegionService, SsccCommitResponse> callable = new Batch.Call<SsccRegionService, SsccCommitResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCommitResponse> rpcCallback = new BlockingRpcCallback<SsccCommitResponse>();
@Override
public SsccCommitResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequest.Builder builder = SsccCommitRequest
.newBuilder();
builder.setTransactionId(id);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.commit(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCommitResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCommitResponse cresponse : result.values()) {
String exception = cresponse.getException();
boolean hasException = cresponse.getHasException();
if (hasException) {
log.info(" " + Thread.currentThread().getName() + " SsccCommitResponse exception " + exception);
throw new IOException(exception);
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCommit");
return;
}
static public void testSsccCommitRequest() throws IOException {
final long id = getTransId();
log.info(" " + Thread.currentThread().getName() + " Starting testSsccCommitRequest with transId: " + id);
Batch.Call<SsccRegionService, SsccCommitRequestResponse> callable = new Batch.Call<SsccRegionService, SsccCommitRequestResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCommitRequestResponse> rpcCallback = new BlockingRpcCallback<SsccCommitRequestResponse>();
@Override
public SsccCommitRequestResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestRequest.Builder builder = SsccCommitRequestRequest
.newBuilder();
builder.setTransactionId(id);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.commitRequest(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCommitRequestResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCommitRequestResponse cresponse : result.values()) {
int value = cresponse.getResult();
String returnString;
switch (value) {
case COMMIT_OK:
returnString = new String("COMMIT_OK");
break;
case COMMIT_OK_READ_ONLY:
returnString = new String("COMMIT_OK_READ_ONLY");
break;
case COMMIT_UNSUCCESSFUL:
returnString = new String("COMMIT_UNSUCCESSFUL");
break;
case COMMIT_CONFLICT:
returnString = new String("COMMIT_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(value));
break;
}
log.info(" " + Thread.currentThread().getName() + " SsccCommitRequestResponse value " + returnString);
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCommitRequest");
return;
}
static public void testSsccCommitIfPossible() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccCommitIfPossible");
final long id = getTransId();
destoryTransId();
Batch.Call<SsccRegionService, SsccCommitIfPossibleResponse> callable = new Batch.Call<SsccRegionService, SsccCommitIfPossibleResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccCommitIfPossibleResponse> rpcCallback = new BlockingRpcCallback<SsccCommitIfPossibleResponse>();
@Override
public SsccCommitIfPossibleResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleRequest.Builder builder = SsccCommitIfPossibleRequest
.newBuilder();
builder.setTransactionId(id);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.commitIfPossible(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccCommitIfPossibleResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccCommitIfPossibleResponse cipresponse : result.values()) {
boolean hasException = cipresponse.getHasException();
if (hasException) {
String exception = cipresponse.getException();
log.info(" " + Thread.currentThread().getName() + " testSsccCommitIfPossible exception " + exception);
} else {
log.info(" " + Thread.currentThread().getName() + " testSsccCommitIfPossible result is "
+ cipresponse.getWasSuccessful());
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccCommitIfPossible");
return;
}
static public void testSsccDelete(final int value) throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccDelete");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccDeleteTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccDeleteTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccDeleteTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccDeleteTransactionalResponse>();
@Override
public SsccDeleteTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalRequest.Builder builder = SsccDeleteTransactionalRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Delete d = new Delete(Bytes.toBytes(ROW1));
// d.deleteColumn(FAMILY, QUAL_A, System.currentTimeMillis());
NavigableMap<byte[], List<Cell>> map = d.getFamilyCellMap();
List<Cell> list = new ArrayList<Cell>();
Cell c = new KeyValue(Bytes.toBytes(ROW1), FAMILY, QUAL_A, System.currentTimeMillis(), Type.Delete);
list.add(KeyValueUtil.ensureKeyValue(c));
map.put(FAMILY, list);
d.setFamilyCellMap(map);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, d);
builder.setDelete(m1);
instance.delete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccDeleteTransactionalResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccDeleteTransactionalResponse dresponse : result.values()) {
boolean hasException = dresponse.getHasException();
if (hasException) {
String exception = dresponse.getException();
log.info(" " + Thread.currentThread().getName() + " testSsccDelete exception " + exception);
} else {
returnStatus = dresponse.getStatus();
String returnString;
switch (returnStatus) {
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(returnStatus));
break;
}
log.info(" " + Thread.currentThread().getName() + " testSsccDelete returnStatus is " + returnString);
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccDelete");
return;
}
static public void testSsccDeleteMultiple() throws IOException {
log.info("Starting testSsccDeleteMultiple");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccDeleteMultipleTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccDeleteMultipleTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccDeleteMultipleTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccDeleteMultipleTransactionalResponse>();
@Override
public SsccDeleteMultipleTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalRequest.Builder builder = SsccDeleteMultipleTransactionalRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Delete d1 = new Delete(Bytes.toBytes(ROW1));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.DELETE, d1);
builder.addDelete(m1);
Delete d2 = new Delete(Bytes.toBytes(ROW2));
MutationProto m2 = ProtobufUtil.toMutation(MutationType.DELETE, d2);
builder.addDelete(m2);
Delete d3 = new Delete(Bytes.toBytes(ROW3));
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, d3);
builder.addDelete(m3);
Delete d4 = new Delete(Bytes.toBytes(ROW4));
MutationProto m4 = ProtobufUtil.toMutation(MutationType.DELETE, d4);
builder.addDelete(m4);
Delete d5 = new Delete(Bytes.toBytes(ROW5));
MutationProto m5 = ProtobufUtil.toMutation(MutationType.DELETE, d5);
builder.addDelete(m5);
Delete d6 = new Delete(Bytes.toBytes(ROW6));
MutationProto m6 = ProtobufUtil.toMutation(MutationType.DELETE, d6);
builder.addDelete(m6);
instance.deleteMultiple(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccDeleteMultipleTransactionalResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccDeleteMultipleTransactionalResponse dmresponse : result.values()) {
boolean hasException = dmresponse.getHasException();
if (hasException) {
String exception = dmresponse.getException();
log.info(" testSsccDeleteMultiple exception " + exception);
} else {
returnStatus = dmresponse.getStatus();
String returnString;
switch (returnStatus) {
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(returnStatus));
break;
}
log.info(" testSsccDeleteMultiple returnStatus is " + returnString);
}
}
log.info("Finished testSsccDeleteMultiple");
return;
}
public static Integer[] testSsccGet() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccGet");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccGetTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccGetTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccGetTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccGetTransactionalResponse>();
@Override
public SsccGetTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalRequest.Builder builder = SsccGetTransactionalRequest
.newBuilder();
// Get get = new Get(ROW1).addColumn(FAMILY, Bytes.toBytes(1));
Get get = new Get(Bytes.toBytes(ROW1)).addColumn(FAMILY, QUAL_A);
builder.setGet(ProtobufUtil.toGet(get));
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
instance.get(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccGetTransactionalResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
List<Integer> results = new ArrayList<Integer>();
for (SsccGetTransactionalResponse gresponse : result.values()) {
Result resultFromGet = ProtobufUtil.toResult(gresponse.getResult());
log.info(" " + Thread.currentThread().getName()
+ " SsccGetTransactionalResponse Get result count before action is committed: "
+ resultFromGet.size());
if (resultFromGet.size() == 0) {
log.info(" " + Thread.currentThread().getName() + " can't get any value. ");
continue;
}
for (Cell c : resultFromGet.listCells()) {
log.info(" " + Thread.currentThread().getName() + " get value is : "
+ Bytes.toInt(CellUtil.cloneValue(c)));
results.add(Bytes.toInt(CellUtil.cloneValue(c)));
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccGet");
return results.toArray(new Integer[results.size()]);
}
static public void testSsccPut(final int value) throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccPut, value is :" + value);
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccPutTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccPutTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccPutTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccPutTransactionalResponse>();
@Override
public SsccPutTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalRequest.Builder builder = SsccPutTransactionalRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Put p = new Put(Bytes.toBytes(ROW1)).add(FAMILY, QUAL_A, Bytes.toBytes(value));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.put(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccPutTransactionalResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccPutTransactionalResponse presponse : result.values()) {
boolean hasException = presponse.getHasException();
if (hasException) {
String exception = presponse.getException();
log.info(" " + Thread.currentThread().getName() + " testSsccPut, value is :" + value + "; exception "
+ exception);
} else {
returnStatus = presponse.getStatus();
String returnString;
switch (returnStatus) {
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(returnStatus));
break;
}
log.info(" " + Thread.currentThread().getName() + " testSsccPut, value is :" + value
+ "; returnStatus is " + returnString);
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccPut, value is :" + value);
return;
}
static public void testSsccPutRow(final String row) throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccPutRow, row is " + row);
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccPutTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccPutTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccPutTransactionalResponse> done = new BlockingRpcCallback<SsccPutTransactionalResponse>();
@Override
public SsccPutTransactionalResponse call(SsccRegionService instance) throws IOException {
SsccPutTransactionalRequest.Builder request = SsccPutTransactionalRequest.newBuilder();
request.setTransactionId(id);
request.setStartId(start);
request.setRegionName(ByteString.copyFromUtf8(regionname));
Put p = new Put(Bytes.toBytes(row)).add(FAMILY, QUAL_A, Bytes.toBytes(VALUE1));
MutationProto m = ProtobufUtil.toMutation(MutationType.PUT, p);
request.setPut(m);
instance.put(controller, request.build(), done);
return done.get();
}
};
try {
ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccPutRow, row is " + new String(row));
}
static public void testSsccPutException() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccPutException");
final long id = getTransId();
Batch.Call<SsccRegionService, SsccPutTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccPutTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccPutTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccPutTransactionalResponse>();
@Override
public SsccPutTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalRequest.Builder builder = SsccPutTransactionalRequest
.newBuilder();
builder.setTransactionId(id);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Put p = new Put(Bytes.toBytes(ROW1)).add(FAMILYBAD, QUAL_A, Bytes.toBytes(1));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.setPut(m1);
instance.put(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
try {
ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccPutException");
return;
}
static public void testSsccPutMultiple() throws IOException {
log.info("Starting testSsccPutMultiple");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccPutMultipleTransactionalResponse> callable = new Batch.Call<SsccRegionService, SsccPutMultipleTransactionalResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccPutMultipleTransactionalResponse> rpcCallback = new BlockingRpcCallback<SsccPutMultipleTransactionalResponse>();
@Override
public SsccPutMultipleTransactionalResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalRequest.Builder builder = SsccPutMultipleTransactionalRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Put p = new Put(Bytes.toBytes(ROW1)).add(FAMILY, QUAL_A, Bytes.toBytes(1));
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
builder.addPut(m1);
instance.putMultiple(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccPutMultipleTransactionalResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccPutMultipleTransactionalResponse pmresponse : result.values()) {
boolean hasException = pmresponse.getHasException();
if (hasException) {
String exception = pmresponse.getException();
log.info(" testSsccPutMultiple exception " + exception);
} else {
returnStatus = pmresponse.getStatus();
String returnString;
switch (returnStatus) {
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(returnStatus));
break;
}
log.info(" testSsccPutMultiple returnStatus is " + returnString);
}
}
log.info("Finished testSsccPutMultiple");
return;
}
static public List<Object[]> testSsccPerformScan() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccPerformScan");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccPerformScanResponse> callable = new Batch.Call<SsccRegionService, SsccPerformScanResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccPerformScanResponse> rpcCallback = new BlockingRpcCallback<SsccPerformScanResponse>();
@Override
public SsccPerformScanResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanRequest.Builder builder = SsccPerformScanRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
builder.setScannerId(scannerId);
builder.setNumberOfRows(9);
builder.setCloseScanner(false);
builder.setNextCallSeq(0);
instance.performScan(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccPerformScanResponse> presult = null;
try {
presult = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
log.info(" " + Thread.currentThread().getName()
+ " testSsccPerformScanResponse exception getting results " + e);
}
int count = 0;
boolean hasMore = false;
List<Object[]> results = new ArrayList<Object[]>();
for (SsccPerformScanResponse presponse : presult.values()) {
if (presponse.getHasException()) {
String exception = presponse.getException();
log.info(" " + Thread.currentThread().getName() + " testSsccPerformScanResponse exception "
+ exception);
} else {
count = presponse.getResultCount();
hasMore = presponse.getHasMore();
log.info(" " + Thread.currentThread().getName() + " testSsccPerformScan response count " + count
+ " rows , hasMore is " + hasMore);
for (int i = 0; i < count; i++) {
Result resultFromScan = ProtobufUtil.toResult(presponse.getResult(i));
List<Cell> cells = resultFromScan.listCells();
for (Cell c : cells) {
Object[] objArrs = { CellUtil.cloneRow(c), CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c),
CellUtil.cloneValue(c), c.getTimestamp() };
results.add(objArrs);
}
log.info(" " + Thread.currentThread().getName() + " , result " + resultFromScan);
}
}
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccPerformScan");
return results;
}
static public void testSsccOpenScanner() throws IOException {
log.info(" " + Thread.currentThread().getName() + " Starting testSsccOpenScanner");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccOpenScannerResponse> callable = new Batch.Call<SsccRegionService, SsccOpenScannerResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccOpenScannerResponse> rpcCallback = new BlockingRpcCallback<SsccOpenScannerResponse>();
@Override
public SsccOpenScannerResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerRequest.Builder builder = SsccOpenScannerRequest
.newBuilder();
builder.setTransactionId(id);
builder.setStartId(start);
builder.setRegionName(ByteString.copyFromUtf8(regionname));
Scan scan = new Scan();
Filter filter = new SingleColumnValueFilter(FAMILY, QUAL_A, CompareOp.GREATER_OR_EQUAL,
Bytes.toBytes(VALUE1));
scan.setFilter(filter);
scan.addFamily(FAMILY);
scan.addColumn(FAMILY, QUAL_A);
builder.setScan(ProtobufUtil.toScan(scan));
instance.openScanner(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccOpenScannerResponse> result = null;
try {
result = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
for (SsccOpenScannerResponse oresponse : result.values()) {
scannerId = oresponse.getScannerId();
String exception = oresponse.getException();
boolean hasException = oresponse.getHasException();
if (hasException)
log.info(" " + Thread.currentThread().getName() + " testSsccOpenScannerResponse exception "
+ exception);
else
log.info(" " + Thread.currentThread().getName() + " testSsccOpenScannerResponse scannerId is "
+ scannerId);
}
log.info(" " + Thread.currentThread().getName() + " Finished testSsccOpenScanner");
return;
}
static public void testSsccRecoveryRequest() throws IOException {
log.info("Starting testSsccRecoveryRequest");
final long id = getTransId();
final long start = getStartId();
Batch.Call<SsccRegionService, SsccRecoveryRequestResponse> callable = new Batch.Call<SsccRegionService, SsccRecoveryRequestResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SsccRecoveryRequestResponse> rpcCallback = new BlockingRpcCallback<SsccRecoveryRequestResponse>();
@Override
public SsccRecoveryRequestResponse call(SsccRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestRequest.Builder rbuilder = SsccRecoveryRequestRequest
.newBuilder();
rbuilder.setTransactionId(id);
rbuilder.setStartId(start);
rbuilder.setRegionName(ByteString.copyFromUtf8(regionname));
rbuilder.setTmId(7);
instance.recoveryRequest(controller, rbuilder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], SsccRecoveryRequestResponse> rresult = null;
try {
rresult = ht.coprocessorService(SsccRegionService.class, null, null, callable);
} catch (Throwable e) {
e.printStackTrace();
}
int count = 0;
long l = 0;
for (SsccRecoveryRequestResponse rresponse : rresult.values()) {
count = rresponse.getResultCount();
l = rresponse.getResult(0);
log.info(" testSsccRecoveryResponse count " + count + ", result " + l);
}
log.info("Finished testSsccRecoveryRequest");
return;
}
}