blob: 7c82674d52ca8d551aab0b4dbfc63d47d2dae78a [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.apache.hadoop.hbase.coprocessor.transactional;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.DtmConst;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerTimeoutException;
import org.apache.hadoop.hbase.client.SsccConst;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
// Sscc imports
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRegionService;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccTransactionalAggregateRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccTransactionalAggregateResponse;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.regionserver.LeaseListener;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
#ifdef CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.regionserver.Region;
#endif
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTm;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState;
import org.apache.hadoop.hbase.regionserver.transactional.SsccTransactionState;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class SsccRegionEndpoint<T, S, P extends Message, Q extends Message, R extends Message> extends SsccRegionService implements
CoprocessorService, Coprocessor {
private static final Log LOG = LogFactory.getLog(SsccRegionEndpoint.class);
private RegionCoprocessorEnvironment env;
ConcurrentHashMap<String, Object> transactionsByIdTestz = null;
// Collection of active transactions (PENDING) keyed by id.
protected ConcurrentHashMap<String, SsccTransactionState> transactionsById = new ConcurrentHashMap<String, SsccTransactionState>();
// Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
// their sequence number
private SortedMap<Long, SsccTransactionState> commitedTransactionsBySequenceNumber = Collections.synchronizedSortedMap(new TreeMap<Long, SsccTransactionState>());
// Collection of transactions that are COMMIT_PENDING
private Set<SsccTransactionState> commitPendingTransactions = Collections
.synchronizedSet(new HashSet<SsccTransactionState>());
// an in-doubt transaction list during recovery WALEdit replay
private Map<Long, WALEdit> indoubtTransactionsById = new TreeMap<Long, WALEdit>();
// an in-doubt transaction list count by TM id
private Map<Integer, Integer> indoubtTransactionsCountByTmid = new TreeMap<Integer,Integer>();
// Concurrent map for transactional region scanner holders
// Protected by synchronized methods
final ConcurrentHashMap<Long,
TransactionalRegionScannerHolder> scanners =
new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
// Atomic values to manage region scanners
private AtomicLong performScannerId = new AtomicLong(0);
// private AtomicInteger nextSequenceId = new AtomicInteger(0);
private Object commitCheckLock = new Object();
private Object recoveryCheckLock = new Object();
private Object editReplay = new Object();
private Object stoppableLock = new Object();
private int reconstructIndoubts = 0;
//temporary THLog getSequenceNumber() replacement
private AtomicLong nextLogSequenceId = new AtomicLong(0);
private final int oldTransactionFlushTrigger = 0;
private final Boolean splitDelayEnabled = false;
private final Boolean doWALHlog = false;
static Leases transactionLeases = null;
// CleanOldTransactionsChore cleanOldTransactionsThread;
// CompleteTransactionsChore completeTransactionsThread;
static Stoppable stoppable = new StoppableImplementation();
private int cleanTimer = 5000; // Five minutes
private int regionState = 0;
private Path recoveryTrxPath = null;
private int cleanAT = 0;
private long[] commitCheckTimes = new long[50];
private long[] hasConflictTimes = new long[50];
private long[] putBySequenceTimes = new long[50];
private long[] writeToLogTimes = new long[50];
private AtomicInteger timeIndex = new AtomicInteger (0);
private AtomicInteger totalCommits = new AtomicInteger (0);
private AtomicInteger writeToLogOperations = new AtomicInteger (0);
private AtomicInteger putBySequenceOperations = new AtomicInteger (0);
private long totalCommitCheckTime = 0;
private long totalConflictTime = 0;
private long totalPutTime = 0;
private long totalWriteToLogTime = 0;
private long minCommitCheckTime = 1000000000;
private long maxCommitCheckTime = 0;
private double avgCommitCheckTime = 0;
private long minConflictTime = 1000000000;
private long maxConflictTime = 0;
private double avgConflictTime = 0;
private long minPutTime = 1000000000;
private long maxPutTime = 0;
private double avgPutTime = 0;
private long minWriteToLogTime = 1000000000;
private long maxWriteToLogTime = 0;
private double avgWriteToLogTime = 0;
private HRegionInfo regionInfo = null;
#ifdef CDH5.7 APACHE1.2
private Region m_Region = null;
#else
private HRegion m_Region = null;
#endif
#ifdef CDH5.7 APACHE1.2
private HRegion t_Region = null;
#else
private TransactionalRegion t_Region = null;
#endif
private FileSystem fs = null;
private RegionCoprocessorHost rch = null;
private WAL tHLog = null;
boolean closing = false;
private boolean fullEditInCommit = true;
private boolean configuredEarlyLogging = false;
private static Object zkRecoveryCheckLock = new Object();
private static ZooKeeperWatcher zkw1 = null;
String lv_hostName;
int lv_port;
private static String zNodePath = "/hbase/Trafodion/recovery/";
private static final int DEFAULT_LEASE_TIME = 7200 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000;
private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
private static final int DEFAULT_SLEEP = 60 * 1000;
protected static int transactionLeaseTimeout = 0;
private static int scannerLeaseTimeoutPeriod = 0;
private static int scannerThreadWakeFrequency = 0;
private Configuration config;
// Transaction state defines
private static final int COMMIT_OK = 1;
private static final int COMMIT_OK_READ_ONLY = 2;
private static final int COMMIT_UNSUCCESSFUL = 3;
private static final int COMMIT_CONFLICT = 5;
// update return code defines
private static final int STATEFUL_UPDATE_OK = 1;
private static final int STATEFUL_UPDATE_CONFLICT = 2;
private static final int STATELESS_UPDATE_OK = 3;
private static final int STATELESS_UPDATE_CONFLICT = 5;
private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
private static final int MAX_COMMIT_PENDING_WAITS = 10;
private Thread ChoreThread = null;
//private static Thread ScannerLeasesThread = null;
private static Thread TransactionalLeasesThread = null;
private static boolean editGenerated = false;
private static final int COMMIT_REQUEST = 1;
private static final int COMMIT = 2;
private static final int ABORT = 3;
private static final int CONTROL_POINT_COMMIT = 4;
private Map<Long, Long> updateTsToStartId = new TreeMap<Long, Long>();
private AtomicLong nextSsccSequenceId;
private IdTm idServer;
private static final int ID_TM_SERVER_TIMEOUT = 1000;
// SsccRegionService methods
@Override
public void beginTransaction(RpcController controller,
SsccBeginTransactionRequest request,
RpcCallback<SsccBeginTransactionResponse> done) {
SsccBeginTransactionResponse response = SsccBeginTransactionResponse.getDefaultInstance();
long transId = request.getTransactionId();
long startId = request.getStartId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: beginTransaction - id " + transId + ", regionName " + regionInfo.getRegionNameAsString());
Throwable t = null;
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
WrongRegionException wre = null;
try {
beginTransaction(transId, startId);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:beginTransaction threw exception after internal begin");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccBeginTransactionResponse.Builder SsccBeginTransactionResponseBuilder = SsccBeginTransactionResponse.newBuilder();
SsccBeginTransactionResponseBuilder.setHasException(false);
if (t != null)
{
SsccBeginTransactionResponseBuilder.setHasException(true);
SsccBeginTransactionResponseBuilder.setException(t.toString());
}
if (wre != null)
{
SsccBeginTransactionResponseBuilder.setHasException(true);
SsccBeginTransactionResponseBuilder.setException(wre.toString());
}
SsccBeginTransactionResponse bresponse = SsccBeginTransactionResponseBuilder.build();
done.run(bresponse);
}
/**
* Begin a transaction
* @param long transactionId
* @param long startId
* @param boolean regionTx
* @throws IOException
*/
public void beginTransaction(final long transactionId, final long startId) throws IOException {
beginTransaction(transactionId, startId, /* this is a region TX */ false);
}
/**
* Begin a transaction
* @param long transactionId
* @param long startId
* @param boolean regionTx
* @throws IOException
*/
public void beginTransaction(final long transactionId, final long startId, final boolean regionTx) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction -- ENTRY txId: " + transactionId
+ " startId: " + startId + " regionTx " + regionTx);
checkClosing(transactionId);
// TBD until integration with recovery
if (reconstructIndoubts == 0) {
if (LOG.isTraceEnabled()) LOG.trace("RECOV beginTransaction -- ENTRY txId: " + transactionId);
// try {
constructIndoubtTransactions();
// }
// catch (IdTmException e){
// LOG.error("RECOV beginTransaction exception" + e);
// throw new IOException("RECOV beginTransaction " + e);
// }
// catch (Exception e2) {
// LOG.error("RECOV beginTransaction exception" + e2);
// throw new IOException("RECOV beginTransaction exception " + e2);
// }
}
if (regionState != 2) {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: RECOVERY WARN beginTransaction while the region is still in recovering state " + regionState);
}
String key = getTransactionalUniqueId(transactionId);
SsccTransactionState state;
synchronized (transactionsById) {
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction -- creating new SsccTransactionState without coprocessorHost txId: " + transactionId);
state = new SsccTransactionState(transactionId,
nextLogSequenceId.getAndIncrement(),
nextLogSequenceId,
m_Region.getRegionInfo(),
m_Region.getTableDesc(), tHLog, configuredEarlyLogging,
startId,
regionTx);
// nextSsccSequenceId.getAndIncrement());
state.setStartSequenceNumber(state.getStartId());
// List<TransactionState> commitPendingCopy =
// new ArrayList<SsccTransactionState>(commitPendingTransactions);
// for (SsccTransactionState commitPending : commitPendingCopy) {
// state.addTransactionToCheck(commitPending);
// }
transactionsById.put(key, state);
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - Adding transaction: [" + transactionId + "] in region ["
+ m_Region.getRegionInfo().getRegionNameAsString() + "]" +
" to list");
}
try {
transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(transactionId));
} catch (LeaseStillHeldException e) {
LOG.error("beginTransaction - Lease still held for [" + transactionId + "] in region ["
+ m_Region.getRegionInfo().getRegionNameAsString() + "]");
throw new RuntimeException(e);
}
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction -- EXIT txId: " + transactionId
+ " transactionsById size: " + transactionsById.size() + " region ID: " + this.regionInfo.getRegionId());
}
/**begin transaction if not yet
* @param transactionId
* @return true: begin; false: not necessary to begin
* @throws IOException
*/
private SsccTransactionState beginTransIfNotExist(final long transactionId,
final long startId) throws IOException{
return beginTransIfNotExist(transactionId, startId,
/* this is a Region Tx */ false);
}
/**begin transaction if not yet
* @param transactionId
* @return true: begin; false: not necessary to begin
* @throws IOException
*/
private SsccTransactionState beginTransIfNotExist(final long transactionId, final long startId,
final boolean regionTx) throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransIfNotExist, txid: "
+ transactionId + " startId: " + startId + "regionTx " + regionTx +
" transactionsById size: " + transactionsById.size());
String key = getTransactionalUniqueId(transactionId);
synchronized (transactionsById) {
SsccTransactionState state = transactionsById.get(key);
if (state == null) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Begin transaction in beginTransIfNotExist beginning the transaction internally as state was null");
this.beginTransaction(transactionId, startId, regionTx);
state = transactionsById.get(key);
}
return state;
}
}
/**
* Gets the transaction state
* @param long transactionId
* @return SsccTransactionState
* @throws UnknownTransactionException
*/
protected SsccTransactionState getTransactionState(final long transactionId)
throws UnknownTransactionException {
SsccTransactionState state = null;
boolean throwUTE = false;
String key = getTransactionalUniqueId(transactionId);
state = transactionsById.get(key);
if (state == null)
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getTransactionState Unknown transaction: [" + transactionId + "], throwing UnknownTransactionException");
throwUTE = true;
}
else {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getTransactionState Found transaction: [" + transactionId + "]");
try {
transactionLeases.renewLease(key);
} catch (LeaseException e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getTransactionState renewLease failed will try to createLease for transaction: [" + transactionId + "]");
try {
transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(transactionId));
} catch (LeaseStillHeldException lshe) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getTransactionState renewLeasefollowed by createLease failed throwing original LeaseException for transaction: [" + transactionId + "]");
throw new RuntimeException(e);
}
}
}
if (throwUTE)
{
throw new UnknownTransactionException();
}
return state;
}
@Override
public void commit(RpcController controller,
SsccCommitRequest request,
RpcCallback<SsccCommitResponse> done) {
SsccCommitResponse response = SsccCommitResponse.getDefaultInstance();
long transId = request.getTransactionId();
long commitId = request.getCommitId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit - id " + transId + ", regionName " + regionInfo.getRegionNameAsString());
Throwable t = null;
WrongRegionException wre = null;
{
// Process local memory
try {
commit(transId, commitId, request.getIgnoreUnknownTransactionException());
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:commit threw exception after internal commit");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitResponse.Builder commitResponseBuilder = SsccCommitResponse.newBuilder();
commitResponseBuilder.setHasException(false);
if (t != null)
{
commitResponseBuilder.setHasException(true);
commitResponseBuilder.setException(t.toString());
}
if (wre != null)
{
commitResponseBuilder.setHasException(true);
commitResponseBuilder.setException(wre.toString());
}
SsccCommitResponse cresponse = commitResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitIfPossible(RpcController controller,
SsccCommitIfPossibleRequest request,
RpcCallback<SsccCommitIfPossibleResponse> done) {
SsccCommitIfPossibleResponse response = SsccCommitIfPossibleResponse.getDefaultInstance();
boolean reply = false;
Throwable t = null;
WrongRegionException wre = null;
long transId = request.getTransactionId();
{
// Process local memory
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible - id " + transId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible");
reply = commitIfPossible(transId);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("commitIfPossible threw exception ", e);
t = e;
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitIfPossibleResponse.Builder commitIfPossibleResponseBuilder = SsccCommitIfPossibleResponse.newBuilder();
commitIfPossibleResponseBuilder.setHasException(false);
if (t != null)
{
commitIfPossibleResponseBuilder.setHasException(true);
commitIfPossibleResponseBuilder.setException(t.toString());
}
if (wre != null)
{
commitIfPossibleResponseBuilder.setHasException(true);
commitIfPossibleResponseBuilder.setException(wre.toString());
}
commitIfPossibleResponseBuilder.setWasSuccessful(reply);
SsccCommitIfPossibleResponse cresponse = commitIfPossibleResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitRequest(RpcController controller,
SsccCommitRequestRequest request,
RpcCallback<SsccCommitRequestResponse> done) {
SsccCommitRequestResponse response = SsccCommitRequestResponse.getDefaultInstance();
long transId = request.getTransactionId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitRequest - id " + transId +
", regionName " + regionInfo.getRegionNameAsString());
int status = 0;
IOException ioe = null;
UnknownTransactionException ute = null;
Throwable t = null;
WrongRegionException wre = null;
{
// Process local memory
try {
status = commitRequest(transId);
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:commitRequest threw exception after internal commit" + u.toString());
ute = u;
} catch (IOException e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:commitRequest threw exception after internal commit" + e.toString());
ioe = e;
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCommitRequestResponse.Builder commitRequestResponseBuilder = SsccCommitRequestResponse.newBuilder();
commitRequestResponseBuilder.setHasException(false);
if (t != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(t.toString());
}
if (wre != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(wre.toString());
}
if (ioe != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(ioe.toString());
}
if (ute != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(ute.toString());
}
commitRequestResponseBuilder.setResult(status);
SsccCommitRequestResponse cresponse = commitRequestResponseBuilder.build();
done.run(cresponse);
}
/**
* Commits the transaction
* @param SsccTransactionState state
* @throws IOException
*/
private void commit(final SsccTransactionState state) throws IOException {
long txid = 0;
//if state is in ABORTED status, return do nothing
if(state.getStatus() == Status.ABORTED || state.getStatus() == Status.COMMITED) return;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Commiting transaction: " + state.toString() + " to "
+ m_Region.getRegionInfo().getRegionNameAsString());
long inTransactionId = state.getTransactionId();
long startId = state.getStartId();
if (state.isReinstated()) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit Trafodion Recovery: commit reinstated indoubt transactions " + inTransactionId +
" in region " + m_Region.getRegionInfo().getRegionNameAsString());
if (false) //Somthing wrong
{
state.setStatus(Status.ABORTED);
retireTransaction(state);
}
} // reinstated transactions
else {
//get a commid ID
// This commitId must be comparable with the startId
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit : try to update the status and version ");
// long commitId = nextSsccSequenceId.getAndIncrement();
long commitId = state.getCommitId();
//update the putlist
List<byte[]> putToDoList = state.getPutRows();
List<Delete> delToDoList = state.getDelRows();
//List<Mutation> mutList= new ArrayList<Mutation>();
try {
for( byte[] rowkey : putToDoList )
{
// delete the status item from status column for this transactin
Delete statusDelete = new Delete(rowkey, startId);
statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.STATUS_COL );
//statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.COLUMNS_COL );
m_Region.delete(statusDelete);
//mutList.add(statusDelete);
// insert a new item into version column
// Put verPut = new Put(rowkey, commitId.val);
Put verPut = new Put(rowkey, commitId);
verPut.add(DtmConst.TRANSACTION_META_FAMILY , SsccConst.VERSION_COL,SsccConst.generateVersionValue(startId,false));
m_Region.put(verPut);
//mutList.add(verPut);
}
ListIterator<Delete> deletesIter = null;
for (deletesIter = delToDoList.listIterator(); deletesIter.hasNext();) {
Delete d = deletesIter.next();
//mutList.add(d);
// insert a new item into version column
// delete the status item from status column for this transactin
byte[] dKey = d.getRow();
Delete statusDelete = new Delete(dKey, startId);
statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.STATUS_COL );
//statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.COLUMNS_COL );
m_Region.delete(statusDelete);
// Put verPut = new Put(dKey, commitId.val);
Put verPut = new Put(dKey, commitId);
verPut.add(DtmConst.TRANSACTION_META_FAMILY , SsccConst.VERSION_COL,SsccConst.generateVersionValue(startId,true));
m_Region.put(verPut);
// m_Region.delete(d);
}
//DO a batch mutation
//Mutation[] m = (Mutation[])mutList.toArray();
//m_Region.batchMutate(m);
//set the commitId of the transaction
// state.setCommitId(commitId.val);
// state.setCommitId(commitId);
}
catch (Exception e) //something wrong
{
LOG.error("SsccRegionEndpoint Commit get exception " + e.toString());
state.setStatus(Status.ABORTED);
retireTransaction(state);
throw new IOException(e.toString());
}
} // normal transactions
// Now the transactional writes live in the core WAL, we can write a commit to the log
// so we don't have to recover it from the transactional WAL.
if (state.hasWrite() || state.isReinstated()) {
// comment out for now
if (LOG.isTraceEnabled()) LOG.trace("write commit edit to HLOG");
if (LOG.isTraceEnabled()) LOG.trace("BBB write commit edit to HLOG after appendNoSync");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:commit -- EXIT txId: " + inTransactionId + " HLog seq " + txid);
if (!editGenerated) editGenerated = true;
}
state.setStatus(Status.COMMITED);
/*
if (state.hasWrite() || state.isReinstated()) {
synchronized (commitPendingTransactions) {
if (!commitPendingTransactions.remove(state)) {
LOG.fatal("SsccRegionEndpoint coprocessor: commit Commiting a non-query transaction that is not in commitPendingTransactions");
// synchronized statements are cleared for a throw
throw new IOException("commit failure");
}
}
}
*/
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit(tstate) -- EXIT SsccTransactionState: " +
state.toString());
if (state.isReinstated()) {
synchronized(indoubtTransactionsById) {
indoubtTransactionsById.remove(state.getTransactionId());
int tmid = (int) TransactionState.getNodeId(inTransactionId);
int count = 0;
if (indoubtTransactionsCountByTmid.containsKey(tmid)) {
count = (int) indoubtTransactionsCountByTmid.get(tmid) - 1;
if (count > 0) indoubtTransactionsCountByTmid.put(tmid, count);
}
if (count == 0) {
indoubtTransactionsCountByTmid.remove(tmid);
String lv_encoded = m_Region.getRegionInfo().getEncodedName();
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit Trafodion Recovery: delete in commit recovery zNode TM " + tmid + " region encoded name " + lv_encoded + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmid, lv_encoded);
} catch (IOException e) {
LOG.error("Trafodion Recovery: delete recovery zNode failed");
}
}
if ((indoubtTransactionsById == null) || (indoubtTransactionsById.size() == 0)) {
if (indoubtTransactionsById == null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit Trafodion Recovery: start region in commit with indoubtTransactionsById null");
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit Trafodion Recovery: start region in commit with indoubtTransactionsById size " + indoubtTransactionsById.size());
startRegionAfterRecovery();
}
}
}
retireTransaction(state);
}
/**
* Commits the transaction
* @param long TransactionId
* @param boolean ignoreUnknownTransactionException
* @throws IOException
*/
public void commit(final long transactionId, final long commitId, final boolean ignoreUnknownTransactionException) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit -- ENTRY txId: " + transactionId +
" commitId: " + commitId + " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
SsccTransactionState state=null;
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
if (ignoreUnknownTransactionException == true) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: ignoring UnknownTransactionException in commit : " + transactionId
+ " in region "
+ m_Region.getRegionInfo().getRegionNameAsString());
return;
}
LOG.fatal("SsccRegionEndpoint coprocessor: Asked to commit unknown transaction: " + transactionId
+ " in region "
+ m_Region.getRegionInfo().getRegionNameAsString());
throw new IOException("UnknownTransactionException");
}
/* SSCC don't need a prepare phase I
if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
LOG.fatal("SsccRegionEndpoint coprocessor: commit - Asked to commit a non pending transaction ");
throw new IOException("Asked to commit a non-pending transaction");
}
*/
state.setCommitId(commitId);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commit -- EXIT " + state);
commit(state);
}
/**
* @param transactionId
* @return TransactionRegionInterface commit code
* @throws IOException
*/
public int commitRequest(final long transactionId) throws IOException {
long txid = 0;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitRequest -- ENTRY txId: " + transactionId);
checkClosing(transactionId);
SsccTransactionState state=null;
try{
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
return COMMIT_UNSUCCESSFUL;
}
if (state.hasWrite()) {
return COMMIT_OK;
}
// Otherwise we were read-only and commitable, so we can forget it.
state.setStatus(Status.COMMITED);
retireTransaction(state);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId);
return COMMIT_OK_READ_ONLY;
}
/**
* Commits the transaction
* @param long TransactionId
* @throws IOException
*/
public void commit(final long transactionId, final long commitId) throws IOException {
commit(transactionId, commitId, false /* IgnoreUnknownTransactionException */);
}
@Override
public void abortTransaction(RpcController controller,
SsccAbortTransactionRequest request,
RpcCallback<SsccAbortTransactionResponse> done) {
SsccAbortTransactionResponse response = SsccAbortTransactionResponse.getDefaultInstance();
long transId = request.getTransactionId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: abortTransaction - id " + transId + ", regionName " + regionInfo.getRegionNameAsString());
IOException ioe = null;
UnknownTransactionException ute = null;
WrongRegionException wre = null;
Throwable t = null;
{
// Process in local memory
try {
abortTransaction(transId);
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:abort threw UnknownTransactionException after internal abort");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + u.getMessage() + "" + stackTraceToString(u));
ute = u;
} catch (IOException e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:abort threw UnknownTransactionException after internal abort");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
ioe = e;
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccAbortTransactionResponse.Builder abortTransactionResponseBuilder = SsccAbortTransactionResponse.newBuilder();
abortTransactionResponseBuilder.setHasException(false);
if (t != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(t.toString());
}
if (wre != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(wre.toString());
}
if (ioe != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(ioe.toString());
}
if (ute != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(ute.toString());
}
SsccAbortTransactionResponse aresponse = abortTransactionResponseBuilder.build();
done.run(aresponse);
}
/**
* Abort the transaction.
*
* @param transactionId
* @throws IOException
* @throws UnknownTransactionException
*/
public void abortTransaction(final long transactionId) throws IOException, UnknownTransactionException {
long txid = 0;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: abort transactionId: " + transactionId + " " + m_Region.getRegionInfo().getRegionNameAsString());
SsccTransactionState state;
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
IOException ioe = new IOException("UnknownTransactionException");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Unknown transaction [" + transactionId
+ "] in region ["
+ m_Region.getRegionInfo().getRegionNameAsString()
+ "], " + ioe.toString());
throw new IOException("UnknownTransactionException");
}
if(state.getStatus() == Status.ABORTED)
{
LOG.error("Transaction " + transactionId + " is already aborted, probably due to write conflict");
return;
}
if(state.getStatus() == Status.COMMITED) //should be programming error in client. Like commmit() then abort()
{
LOG.error("Transaction " + transactionId + " is already committed, cannot perform abort anymore");
return;
}
if (state.hasWrite()) {
//get put/del list
//do delet to undo put, do nothiing to undo del
List<byte[]> putUndoList = ( (SsccTransactionState)state).getPutRows();
//List<Mutation> mutList = new ArrayList<Mutation>();
for ( byte[] rowkey : putUndoList )
{
long localTransId = state.getStartId();
Delete d = new Delete(rowkey, localTransId);
Get forColListGet = new Get(rowkey);
forColListGet.setTimeStamp(localTransId); //get only those cells affected by the given transaction
//perform a get first, parse the result and get all column affected by the put
Result r = m_Region.get(forColListGet);
List<Cell> listCells = r.listCells();
if(listCells != null)
{
for (Cell cell : listCells) {
d.deleteColumn(CellUtil.cloneFamily(cell),CellUtil.cloneQualifier(cell),localTransId); //this is the cell that needs to be delete
}
}
m_Region.delete(d);
//clear the status item
Delete statusDelete = new Delete(rowkey, localTransId);
statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.STATUS_COL );
//statusDelete.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.COLUMNS_COL );
m_Region.delete(statusDelete);
//mutList.add(d);
}
//clear status
List<Delete> deleteList = state.getDelRows();
ListIterator<Delete> deletesIter = null;
for (deletesIter = deleteList.listIterator(); deletesIter.hasNext();) {
Delete di = deletesIter.next();
long localTransId = state.getStartId();
Delete d = new Delete(di.getRow(), localTransId);
d.deleteColumn(DtmConst.TRANSACTION_META_FAMILY , SsccConst.STATUS_COL );
m_Region.delete(d);
}
/* not understand how to use batchMutate yet
try {
Mutation[] m = (Mutation[])mutList.toArray();
m_Region.batchMutate(m);
}
catch (Exception e) {
//TODO
throw new IOException(e.toString());
}
*/
}
synchronized (commitPendingTransactions) {
commitPendingTransactions.remove(state);
}
if (state.isReinstated()) {
synchronized(indoubtTransactionsById) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: abort reinstated indoubt transactions " + transactionId);
indoubtTransactionsById.remove(state.getTransactionId());
int tmid = (int) TransactionState.getNodeId(transactionId);
int count = 0;
// indoubtTransactionsCountByTmid protected by
// indoubtTransactionsById synchronization
if (indoubtTransactionsCountByTmid.containsKey(tmid)) {
count = (int) indoubtTransactionsCountByTmid.get(tmid) - 1;
if (count > 0) indoubtTransactionsCountByTmid.put(tmid, count);
}
// if all reinstated txns are resolved from a TM, remove it and delete associated zNode
if (count == 0) {
indoubtTransactionsCountByTmid.remove(tmid);
String lv_encoded = m_Region.getRegionInfo().getEncodedName();
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: delete in abort recovery zNode TM " + tmid + " region encoded name " + lv_encoded + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmid, lv_encoded);
} catch (IOException e) {
LOG.error("SsccRegionEndpoint coprocessor: Trafodion Recovery: delete recovery zNode failed");
}
}
if ((indoubtTransactionsById == null) ||
(indoubtTransactionsById.size() == 0)) {
// change region state to STARTED, and archive the split-thlog
if (indoubtTransactionsById == null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: start region in abort with indoubtTransactionsById null");
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: start region in abort with indoubtTransactionsById size " + indoubtTransactionsById.size());
startRegionAfterRecovery();
}
}
}
state.setStatus(Status.ABORTED);
retireTransaction(state);
}
/**
* Determines if the transaction can be committed, and if possible commits the transaction.
* @param long transactionId
* @return boolean
* @throws IOException
*/
public boolean commitIfPossible(final long transactionId)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: "
+ transactionId);
int status = commitRequest(transactionId);
if (status == COMMIT_OK) {
IdTmId seqId;
try {
seqId = new IdTmId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible:getting new IdTM sequence ");
idServer.id(ID_TM_SERVER_TIMEOUT, seqId);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible: IdTM sequence is " + seqId.val);
} catch (IdTmException exc) {
LOG.error("commitIfPossible: IdTm threw exception 1 ", exc);
throw new IOException("SsccRegionEndpoint coprocessor: commitIfPossible: IdTm threw exception 1 " + exc);
}
catch (Exception e2) {
LOG.error("commitIfPossible: IdTm threw exception e2 ", e2);
throw new IOException("SsccRegionEndpoint coprocessor: commitIfPossible: IdTm threw exception e2 " + e2);
}
// Process local memory
try {
commit(transactionId, seqId.val);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId + " COMMIT_OK");
return true;
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:coprocesor: commitIfPossible threw exception after internal commit");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
throw new IOException(e.toString());
}
} else if (status == COMMIT_OK_READ_ONLY) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: "
+ transactionId + " COMMIT_OK_READ_ONLY");
return true;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: "
+ transactionId + " Commit Unsuccessful");
return false;
}
/***********************************************************************************
***************** ALL code to support SCAN ************************************
***********************************************************************************/
/**
* Returns the scanner associated with the specified ID.
*
* @param long scannerId
* @param long nextCallSeq
* @return a Scanner or throws UnknownScannerException
* @throws NotServingRegionException
* @throws OutOfOrderscannerNextException
* @throws UnknownScannerException
*/
protected synchronized RegionScanner getScanner(long scannerId,
long nextCallSeq)
throws NotServingRegionException,
OutOfOrderScannerNextException,
UnknownScannerException {
RegionScanner scanner = null;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScanner scanners map is " + scanners + ", count is " + scanners.size() + ", scanner id is " + scannerId);
TransactionalRegionScannerHolder rsh =
scanners.get(scannerId);
if (rsh != null)
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScanner rsh is " + rsh + "rsh.s is " + rsh.s );
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScanner rsh is null");
throw new UnknownScannerException(
"ScannerId: " + scannerId + ", already closed?");
}
scanner = rsh.s;
if (scanner != null) {
HRegionInfo hri = scanner.getRegionInfo();
if (this.m_Region != rsh.r) { // Yes, should be the same instance
throw new NotServingRegionException("Region was re-opened after the scannerId"
+ scannerId + " was created: " + hri.getRegionNameAsString());
}
}
if (nextCallSeq != rsh.nextCallSeq) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScanner calling OutOfOrderScannerNextException, nextCallSeq is " + nextCallSeq + " rsh.nextCallSeq is " + rsh.nextCallSeq);
throw new OutOfOrderScannerNextException(
"Expected nextCallSeq: " + rsh.nextCallSeq +
" But the nextCallSeq got from client: " + nextCallSeq);
}
return scanner;
}
@Override
public void performScan(RpcController controller,
SsccPerformScanRequest request,
RpcCallback<SsccPerformScanResponse> done) {
boolean hasMore = true;
RegionScanner scanner = null;
Throwable t = null;
ScannerTimeoutException ste = null;
OutOfOrderScannerNextException ooo = null;
WrongRegionException wre = null;
Exception ne = null;
Scan scan = null;
List<Cell> cellResults = new ArrayList<Cell>();
List<Result> results = new ArrayList<Result>();
List<Cell> validResults = new ArrayList<Cell>();
org.apache.hadoop.hbase.client.Result result = null;
long transId = request.getTransactionId();
long startId = request.getStartId();
long scannerId = request.getScannerId();
int numberOfRows = request.getNumberOfRows();
boolean closeScanner = request.getCloseScanner();
long nextCallSeq = request.getNextCallSeq();
long count = 0L;
boolean shouldContinue = true;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - scannerId " + scannerId + ", numberOfRows " + numberOfRows + ", nextCallSeq " + nextCallSeq);
//This may be wrong, check later
Map<String , Cell> tempBuf = new TreeMap<String, Cell>();
try {
scanner = getScanner(scannerId, nextCallSeq);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - scanner is: " + scanner == null ? "NULL" : "NOT NULL" );
SsccTransactionState state = this.beginTransIfNotExist(transId, startId);
Set<byte[]>visitCols = new HashSet<byte[]>();
if (scanner != null)
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId + ", scanner is not null");
boolean firstCell=true;
while (shouldContinue) {
hasMore = scanner.next(cellResults);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan hasMore is: " + hasMore + " cellResults: " + cellResults);
firstCell=true;
Result verResult = null;
Result statusResult = null;
Result colResult = null;
tempBuf.clear();
ListIterator<Cell> cellIter = null;
for (cellIter = cellResults.listIterator(); cellIter.hasNext();) {
Cell c = cellIter.next();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan Cell is: " + c);
if(firstCell == true){
if(CellUtil.cloneFamily(c) != DtmConst.TRANSACTION_META_FAMILY){
//get the statusList
Get statusGet = new Get(c.getRow()); //TODO: deprecated API
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
statusResult = m_Region.get(statusGet);
//get the colList
Get colGet = new Get(c.getRow()); //TODO: deprecated API
//colGet.setTimeStamp(startId);
colGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.COLUMNS_COL);
colGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
colResult = m_Region.get(colGet);
//get the versionList
Get verGet = new Get(c.getRow());//TODO: deprecated API
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
verResult = m_Region.get(verGet);
firstCell = false;
}
}
//long kvStartId = getStartIdFromTs(thisTs);
if(firstCell == false) {
//long thisTs = c.getTimestamp();
if (state.handleResult(c,statusResult.listCells(),verResult.listCells(),colResult.listCells(),transId) == true)
{
byte[] keyt=CellUtil.cloneQualifier(c);
String keys = new String(keyt);
if(tempBuf.containsKey(keys) == false) //only get the first one, suppose first is latest
tempBuf.put(keys,c);
}
}
}
for(String j: tempBuf.keySet())
{
Cell kv = tempBuf.get(j);
validResults.add(kv);
}
result = Result.create(validResults);
cellResults.clear();
validResults.clear();
if (!result.isEmpty()) {
results.add(result);
count++;
}
if (count == numberOfRows || !hasMore)
shouldContinue = false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId + ", count is " + count + ", hasMore is " + hasMore +
", result.isEmpty: " + result.isEmpty() + ", row " + result.getRow() + ", shouldContinue: " + shouldContinue);
}
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId+ ", scanner is null");
}
} catch(OutOfOrderScannerNextException ooone) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId + " Caught OutOfOrderScannerNextException " + ooone.getMessage() + " " + stackTraceToString(ooone));
ooo = ooone;
} catch(ScannerTimeoutException cste) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId + " Caught ScannerTimeoutException " + cste.getMessage() + " " + stackTraceToString(cste));
ste = cste;
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + scannerId + " Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
finally {
if (scanner != null) {
try {
if (closeScanner) {
scanner.close();
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan failed to get a lease " + scannerId);
}
*/
}
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan caught exception " + e.getMessage() + "" + stackTraceToString(e));
ne = e;
}
}
}
TransactionalRegionScannerHolder rsh = scanners.get(scannerId);
nextCallSeq++;
rsh.nextCallSeq = nextCallSeq;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: performScan - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() +
", scannerId " + scannerId + ", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq);
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPerformScanResponse.Builder performResponseBuilder = SsccPerformScanResponse.newBuilder();
performResponseBuilder.setHasMore(hasMore);
performResponseBuilder.setNextCallSeq(nextCallSeq);
performResponseBuilder.setCount(count);
performResponseBuilder.setHasException(false);
if (results != null)
{
if (!results.isEmpty()) {
ListIterator<Result> resultIter = null;
for (resultIter = results.listIterator(); resultIter.hasNext();) {
Result r = resultIter.next();
performResponseBuilder.addResult(ProtobufUtil.toResult(r));
// LOG.info("UNIQUE: performScan return row " + Arrays.toString(r.getRow()) );
// for( Cell c : r.listCells() ) {
// LOG.info("UNIQUE: performScan return value col : " + Arrays.toString(CellUtil.cloneQualifier(c) )+ " value " + Arrays.toString(CellUtil.cloneValue(c) ) );
// }
// LOG.info("UNIQUE : -- ");
}
}
}
if (t != null)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(t.toString());
}
if (ste != null)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ste.toString());
}
if (wre != null)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(wre.toString());
}
if (ne != null)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ne.toString());
}
if (ooo != null)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ooo.toString());
}
SsccPerformScanResponse presponse = performResponseBuilder.build();
done.run(presponse);
}
@Override
public void openScanner(RpcController controller,
SsccOpenScannerRequest request,
RpcCallback<SsccOpenScannerResponse> done) {
boolean hasMore = true;
RegionScanner scanner = null;
RegionScanner scannert = null;
Throwable t = null;
WrongRegionException wre = null;
boolean exceptionThrown = false;
NullPointerException npe = null;
Exception ge = null;
IOException ioe = null;
LeaseStillHeldException lse = null;
Scan scan = null;
long scannerId = 0L;
boolean isLoadingCfsOnDemandSet = false;
long transId = request.getTransactionId();
long startId = request.getStartId();
{
try {
scan = ProtobufUtil.toScan(request.getScan());
if (scan == null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: openScanner scan was null");
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: openScanner Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
exceptionThrown = true;
}
}
if (!exceptionThrown) {
if (scan == null) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:openScanner scan is null");
npe = new NullPointerException("scan is null");
ioe = new IOException("Invalid arguments to openScanner", npe);
exceptionThrown = true;
}
else
{
try {
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
//checkRow(scan.getStartRow(), "Scan");
prepareScanner(scan);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:openScanner scan threw exception");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
exceptionThrown = true;
}
}
}
List<Cell> results = new ArrayList<Cell>();
if (!exceptionThrown) {
try {
scan.setMaxVersions();
scanner = getScanner(transId, startId, scan);
if (scanner != null) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: openScanner called getScanner, scanner is " + scanner + ", transid " + transId);
// Add the scanner to the map
scannerId = addScanner(transId,scanner, this.m_Region);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: openScanner called addScanner, scannerId is " + scannerId + ", transid " + transId);
}
else {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScanner returned null, scannerId is " + scannerId + ", transid " + transId);
}
} catch (LeaseStillHeldException llse) {
LOG.error("SsccRegionEndpoint coprocessor: getScanner Error opening scanner, " + llse.toString());
exceptionThrown = true;
lse = llse;
} catch (IOException e) {
LOG.error("SsccRegionEndpoint coprocessor: getScanner Error opening scanner, " + e.toString());
exceptionThrown = true;
}
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: openScanner - transid " + transId + ", regionName " + regionInfo.getRegionNameAsString());
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccOpenScannerResponse.Builder openResponseBuilder = SsccOpenScannerResponse.newBuilder();
openResponseBuilder.setScannerId(scannerId);
openResponseBuilder.setHasException(false);
if (t != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(t.toString());
}
if (wre != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(wre.toString());
}
if (ioe != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(ioe.toString());
}
if (lse != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(lse.toString());
}
SsccOpenScannerResponse oresponse = openResponseBuilder.build();
done.run(oresponse);
}
/**
* Obtain a RegionScanner
* @param long transactionId
* @param Scan scan
* @return RegionScanner
* @throws IOException
*/
public RegionScanner getScanner(final long transactionId, final long startId, final Scan scan)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: RegionScanner getScanner -- ENTRY txId: " + transactionId );
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
//state.addScan(scan);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
//Scan deleteWrapScan = wrapWithDeleteFilter(scan, state);
//In SSCC, we cannot find a way to do this with Filter...
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: RegionScanner getScanner -- Calling t_Region.getScanner txId: " + transactionId );
RegionScanner gotScanner = this.t_Region.getScanner(scan);
if (gotScanner != null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: RegionScanner getScanner -- obtained scanner was not null, txId: " + transactionId );
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: RegionScanner getScanner -- obtained scanner was null, txId: " + transactionId );
return gotScanner;
}
@Override
public void closeScanner(RpcController controller,
SsccCloseScannerRequest request,
RpcCallback<SsccCloseScannerResponse> done) {
RegionScanner scanner = null;
Throwable t = null;
WrongRegionException wre = null;
Exception ce = null;
long transId = request.getTransactionId();
long scannerId = request.getScannerId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: closeScanner - transId " + transId+ ", regionName "
+ regionInfo.getRegionNameAsString() + ", scannerId " + scannerId);
try {
scanner = removeScanner(scannerId);
if (scanner != null) {
scanner.close();
}
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: closeScanner scanner was null for scannerId " + scannerId);
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: closeScanner failed to get a lease " + scannerId);
}
*/
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: closeScanner caught exception " + e.getMessage() + "" + stackTraceToString(e));
ce = e;
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: closeScanner - id Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCloseScannerResponse.Builder closeResponseBuilder = SsccCloseScannerResponse.newBuilder();
closeResponseBuilder.setHasException(false);
if (t != null)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(t.toString());
}
if (wre != null)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(wre.toString());
}
if (ce != null)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(ce.toString());
}
SsccCloseScannerResponse cresponse = closeResponseBuilder.build();
done.run(cresponse);
}
/***********************************************************************************
***************** ALL code to support GET ************************************
***********************************************************************************/
@Override
public void get(RpcController controller,
SsccGetTransactionalRequest request,
RpcCallback<SsccGetTransactionalResponse> done) {
SsccGetTransactionalResponse response = SsccGetTransactionalResponse.getDefaultInstance();
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get proto = request.getGet();
Get get = null;
RegionScanner scanner = null;
Throwable t = null;
Exception ge = null;
WrongRegionException wre = null;
org.apache.hadoop.hbase.client.Result result2 = null;
try {
get = ProtobufUtil.toGet(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:get threw exception");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
byte[] row = proto.getRow().toByteArray();
byte[] getrow = get.getRow();
String rowKey = Bytes.toString(row);
String getRowKey = Bytes.toString(getrow);
long transId = request.getTransactionId();
long startId = request.getStartId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", row = " + rowKey + ", getRowKey = " + getRowKey);
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get - id Calling getScanner id/scan " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", row = " + rowKey + ", getRowKey = " + getRowKey);
result2 = get(transId, startId, get);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get - id No exception, result2 isEmpty is "
+ result2.isEmpty()
+ ", row "
+ result2.getRow()
+ " result length: "
+ result2.size());
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get - id Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
finally {
if (scanner != null) {
try {
scanner.close();
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
ge = e;
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccGetTransactionalResponse.Builder getResponseBuilder = SsccGetTransactionalResponse.newBuilder();
try {
getResponseBuilder.setResult(ProtobufUtil.toResult(result2));
}
catch (Exception e) {
LOG.error("SsccRegionEndpoint coprocessor: Caught exception getting result2 " + result2 + " " + stackTraceToString(e));
}
/*
if(result2 !=null){
LOG.info("UNIQUE: get result for row " + Arrays.toString(result2.getRow() ));
for( Cell c : result2.listCells() )
{
LOG.info("UNIQUE: get return value col : " + Arrays.toString(CellUtil.cloneQualifier(c) )+ " value " + Arrays.toString(CellUtil.cloneValue(c) ) );
}
}
*/
getResponseBuilder.setHasException(false);
if (t != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(t.toString());
}
if (wre != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(wre.toString());
}
if (ge != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(ge.toString());
}
SsccGetTransactionalResponse gresponse = getResponseBuilder.build();
done.run(gresponse);
}
/**
* Obtains a transactional Result for Get
* @param long transactionId
* @param Get get
* @return Result
* @throws IOException
*/
public Result get(final long transactionId, final long startId, final Get get)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get -- ENTRY txId: " + transactionId + " startId: " + startId );
//get thet state object
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
Scan scan = new Scan(get);
List<Cell> results = new ArrayList<Cell>();
RegionScanner scanner = null;
long max = scan.getTimeRange().getMax();
long min = scan.getTimeRange().getMin();
scan.setTimeRange(0, startId + 1); //only get data updated before me
scan.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Map<String , Cell> tempBuf = new TreeMap<String, Cell>();
try {
scanner = m_Region.getScanner(scan);
//get the statusList
Get statusGet = new Get(get.getRow());//TODO: deprecated API
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Result statusResult = m_Region.get(statusGet);
//get the versionList
Get verGet = new Get(get.getRow());//TODO: deprecated API
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Result verResult = m_Region.get(verGet);
//get the colList
Get colGet = new Get(get.getRow()); //TODO: deprecated API
//colGet.setTimeStamp(startId);
colGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.COLUMNS_COL);
colGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Result colResult = m_Region.get(colGet);
/* go through all the versions and find out correct one */
boolean hasMore = false;
List<Cell> eachVersion = new ArrayList<Cell>();
Set<byte[]>visitCols = new HashSet<byte[]>();
List<Cell> sl = statusResult.listCells();
List<Cell> vl = verResult.listCells();
List<Cell> cl = colResult.listCells();
do {
eachVersion.clear();
hasMore = scanner.next(eachVersion);
tempBuf.clear();
for( Cell kv : eachVersion) {
long thisTs = kv.getTimestamp();
//find out the startId for thisTs
//long kvStartId = getStartIdFromTs(thisTs);
if (state.handleResult(kv,sl,vl,cl,transactionId) == true)
{
byte[] keyt=CellUtil.cloneQualifier(kv);
String keys = new String(keyt);
if(tempBuf.containsKey(keys) == false)
tempBuf.put(keys,kv);
}
}
for(String j: tempBuf.keySet())
{
Cell kv = tempBuf.get(j);
results.add(kv);
}
} while (hasMore);
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get Caught exception " + e.getMessage() + "" + stackTraceToString(e));
LOG.error("SsccRegionEndpoint coprocessor: get Caught exception " + e.getMessage() + "" + stackTraceToString(e));
}
finally {
if (scanner != null) {
scanner.close();
}
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get -- EXIT txId: " + transactionId);
return Result.create(results);
}
/***********************************************************************************
***************** ALL code to support stateless/stateful PUT ********************
***********************************************************************************/
@Override
public void put(RpcController controller,
SsccPutTransactionalRequest request,
RpcCallback<SsccPutTransactionalResponse> done) {
SsccPutTransactionalResponse response = SsccPutTransactionalResponse.getDefaultInstance();
boolean stateless = false;
if (request.hasIsStateless()) {
stateless = request.getIsStateless();
}
byte [] row = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
long transId = request.getTransactionId();
long startId = request.getStartId();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:put threw exception");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
LOG.error("UNIQUE coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
if (type == MutationType.PUT && proto.hasRow()) {
row = proto.getRow().toByteArray();
// Process in local memory
try {
status = put(transId, startId, put, stateless);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:put returned status: " + status);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:put threw exception after internal put");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: put - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toString(row));
}
else {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: put - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + "- no valid PUT type or does not contain a row");
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalResponse.Builder putTransactionalResponseBuilder = SsccPutTransactionalResponse.newBuilder();
putTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
putTransactionalResponseBuilder.setHasException(true);
putTransactionalResponseBuilder.setException(t.toString());
}
if (wre != null)
{
putTransactionalResponseBuilder.setHasException(true);
putTransactionalResponseBuilder.setException(wre.toString());
}
putTransactionalResponseBuilder.setStatus(status);
SsccPutTransactionalResponse presponse = putTransactionalResponseBuilder.build();
done.run(presponse);
}
/**
* Add a write to the transaction.
* process.
* @param long transactionId
* @param Put put
* @param boolean stateless // Is this a stateless put?
* @return int
* @throws IOException
*/
public int put(final long transactionId, final long startId, final Put put, boolean stateless)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter SsccRegionEndpoint coprocessor: put, txid " + transactionId +
", startId " + startId +", stateless: " + stateless);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
// check whether has del before
state.removeDelBeforePut(put, stateless);
/*need to change the timestamp, but HBase API does not support
At this point, the solution is to create a new Put object
*/
//So the solution at this point is
//add a mapping of current timestamp of the put row with the startId
//mapStartIdFromTs(put.getTimeStamp(),startId);
// try to use getFamilyCellMap to get out all data from the put object and generate a new one
byte[] rowkey = put.getRow();
Put newPut = new Put(rowkey, startId);
byte[] mergedCols = null;
byte[] mergedColsV = null;
byte[] cv = null;
NavigableMap<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
mergedCols = null;
mergedCols = byteMerger("|".getBytes(),qualifier);
mergedCols = byteMerger(mergedCols,"|".getBytes());
byte[] value = CellUtil.cloneValue(cell);
newPut.add(family,qualifier,startId,value);
byte[] currentCollist = state.getColList(rowkey);
if( indexOf(currentCollist,mergedCols) != -1) //already in this list
{
mergedColsV = byteMerger(currentCollist,null);
continue;
}
mergedColsV = byteMerger(mergedCols,currentCollist);
state.addToColList(rowkey,mergedColsV);
}
}
//get the statusList
Get statusGet = new Get(rowkey);
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
//statusGet.setTimeRange(0, startId + 1); //only get data updated before me
//statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
statusGet.setMaxVersions();
Result statusResult = m_Region.get(statusGet);
List<Cell> sl = null;
List<Cell> vl = null;
//get the versionList
// If this is a stateless put we don't need the version list
if (stateless == false) {
Get verGet = new Get(rowkey);
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Result verResult = m_Region.get(verGet);
if(verResult != null ) vl = verResult.listCells();
}
if(statusResult != null ) sl = statusResult.listCells();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: put stateless: " + stateless );
if(state.hasConflict(sl,vl,stateless,startId,transactionId) == false) {
state.addToPutList(rowkey);
//update status metadata
byte[] statusValue;
if (stateless){
statusValue = SsccConst.generateStatusValue(SsccConst.S_STATELESS_BYTE,transactionId); //stateless update
}
else {
statusValue = SsccConst.generateStatusValue(SsccConst.S_STATEFUL_BYTE,transactionId); //stateful update
}
newPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.STATUS_COL,startId , statusValue);
newPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.COLUMNS_COL,startId ,mergedColsV);
//perform the put operation, persistently save the data now.
// LOG.info("UNIQUE: put ok " );
m_Region.put(newPut);
return stateless ? STATELESS_UPDATE_OK : STATEFUL_UPDATE_OK;
}
else { //conflict
// Return conflict, but don't trigger and abort. That needs to be triggered from the client, if desired.
if (LOG.isTraceEnabled()) LOG.trace("UNIQUE: put STATEFUL_UPDATE_CONFLICT " );
return stateless ? STATELESS_UPDATE_CONFLICT : STATEFUL_UPDATE_CONFLICT;
}
}
public void putRegionTx(RpcController controller,
SsccPutRegionTxRequest request,
RpcCallback<SsccPutRegionTxResponse> done) {
SsccPutRegionTxResponse response = SsccPutRegionTxResponse.getDefaultInstance();
boolean stateless = false;
if (request.hasIsStateless()) {
stateless = request.getIsStateless();
}
byte [] row = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
long tid = request.getTid();
long startId = request.getStartId();
boolean autoCommit = request.getAutoCommit();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
LOG.error("putRegionTx threw exception ", e);
t = e;
}
if (type == MutationType.PUT && proto.hasRow()) {
row = proto.getRow().toByteArray();
// Process in local memory
try {
status = putRegionTx(tid, startId, put, stateless, autoCommit);
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx returned status: " + status);
} catch (Throwable e) {
LOG.error("putRegionTx threw exception after internal putRegionTx ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx - id " + tid + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toString(row));
}
else {
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx - id " + tid + ", regionName " + regionInfo.getRegionNameAsString() + "- no valid PUT type or does not contain a row");
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutRegionTxResponse.Builder putRegionTxResponseBuilder = SsccPutRegionTxResponse.newBuilder();
putRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
putRegionTxResponseBuilder.setHasException(true);
putRegionTxResponseBuilder.setException(t.toString());
}
if (wre != null)
{
putRegionTxResponseBuilder.setHasException(true);
putRegionTxResponseBuilder.setException(wre.toString());
}
putRegionTxResponseBuilder.setStatus(status);
SsccPutRegionTxResponse papresponse = putRegionTxResponseBuilder.build();
done.run(papresponse);
}
/**
* Add a write using a region transaction.
* process.
* @param long tid
* @param Put put
* @param boolean stateless // Is this a stateless put?
* @param boolean autoCommit // Should we commit immediately?
* @return int
* @throws IOException
*/
public int putRegionTx(final long tid, final long startId, final Put put,
final boolean stateless, final boolean autoCommit)throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx, tid " + tid +
", startId " + startId +", stateless: " + stateless + " autoCommit " + autoCommit);
SsccTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a region TX
// check whether has del before
state.removeDelBeforePut(put, stateless);
/*need to change the timestamp, but HBase API does not support
At this point, the solution is to create a new Put object
*/
//So the solution at this point is
//add a mapping of current timestamp of the put row with the startId
//mapStartIdFromTs(put.getTimeStamp(),startId);
// try to use getFamilyCellMap to get out all data from the put object and generate a new one
byte[] rowkey = put.getRow();
Put newPut = new Put(rowkey, startId);
byte[] mergedCols = null;
byte[] mergedColsV = null;
byte[] cv = null;
NavigableMap<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
mergedCols = null;
mergedCols = byteMerger("|".getBytes(),qualifier);
mergedCols = byteMerger(mergedCols,"|".getBytes());
byte[] value = CellUtil.cloneValue(cell);
newPut.add(family,qualifier,startId,value);
byte[] currentCollist = state.getColList(rowkey);
if( indexOf(currentCollist,mergedCols) != -1) //already in this list
{
mergedColsV = byteMerger(currentCollist,null);
continue;
}
mergedColsV = byteMerger(mergedCols,currentCollist);
state.addToColList(rowkey,mergedColsV);
}
}
//get the statusList
Get statusGet = new Get(rowkey);
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
//statusGet.setTimeRange(0, startId + 1); //only get data updated before me
//statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
statusGet.setMaxVersions();
Result statusResult = m_Region.get(statusGet);
List<Cell> sl = null;
List<Cell> vl = null;
//get the versionList
// If this is a stateless put we don't need the version list
if (stateless == false) {
Get verGet = new Get(rowkey);
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
Result verResult = m_Region.get(verGet);
if(verResult != null ) vl = verResult.listCells();
}
if(statusResult != null ) sl = statusResult.listCells();
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx stateless: " + stateless);
if(state.hasConflict(sl,vl,stateless,startId,tid) == false) {
state.addToPutList(rowkey);
//update status metadata
byte[] statusValue;
if (stateless){
statusValue = SsccConst.generateStatusValue(SsccConst.S_STATELESS_BYTE,tid); //stateless update
}
else {
statusValue = SsccConst.generateStatusValue(SsccConst.S_STATEFUL_BYTE,tid); //stateful update
}
newPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.STATUS_COL,startId , statusValue);
newPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.COLUMNS_COL,startId ,mergedColsV);
//perform the put operation, persistently save the data now.
// LOG.info("UNIQUE: put ok " );
m_Region.put(newPut);
}
else { //conflict
// Return conflict, but don't trigger and abort. That needs to be triggered from the client, if desired.
if (LOG.isTraceEnabled()) LOG.trace("UNIQUE: putRegionTx STATEFUL_UPDATE_CONFLICT " );
return stateless ? STATELESS_UPDATE_CONFLICT : STATEFUL_UPDATE_CONFLICT;
}
// Now we must perform conflict checking.
boolean success = commitIfPossible(tid);
if (! success) {
LOG.error("putRegionTx - tid " + tid + " was not successful. Throwing IOException ");
throw new IOException("putRegionTx - tid " + tid + " was not successful. Throwing IOException ");
}
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx -- EXIT tid: " + tid);
return stateless ? STATELESS_UPDATE_OK : STATEFUL_UPDATE_OK;
}
/***********************************************************************************
***************** ALL code to support DELETE ************************************
***********************************************************************************/
@Override
public void deleteRegionTx(RpcController controller,
SsccDeleteRegionTxRequest request,
RpcCallback<SsccDeleteRegionTxResponse> done) {
/*
SsccDeleteRegionTxResponse response = SsccDeleteRegionTxResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
boolean autoCommit = request.getAutoCommit();
Delete delete = null;
Throwable t = null;
WrongRegionException wre = null;
long tid = request.getTid();
long startId = request.getStartId();
if (wre == null && type == MutationType.DELETE && proto.hasRow())
row = proto.getRow().toByteArray();
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx threw exception ", e);
t = e;
}
// Process in local memory
int status = 0;
try {
status = deleteRegionTx(tid, startId, delete, autoCommit);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx threw exception after internal deleteRegionTx ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - tid " + tid
+ ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type
+ ", row " + Bytes.toString(row));
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteRegionTxResponse.Builder deleteRegionTxResponseBuilder = SsccDeleteRegionTxResponse.newBuilder();
deleteRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
deleteRegionTxResponseBuilder.setHasException(true);
deleteRegionTxResponseBuilder.setException(t.toString());
}
if (wre != null)
{
deleteRegionTxResponseBuilder.setHasException(true);
deleteRegionTxResponseBuilder.setException(wre.toString());
}
deleteRegionTxResponseBuilder.setStatus(status);
SsccDeleteRegionTxResponse dresponse = deleteRegionTxResponseBuilder.build();
done.run(dresponse);
*/
}
/**
* Processes a delete using a region transaction
* @param long tid
* @param Delete delete
* @param Boolean autoCommit
* @return int
* @throws IOException
*/
public int deleteRegionTx(final long tid, final long startId, final Delete delete, final Boolean autoCommit)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx -- ENTRY tid: " + tid
+ ", startId:" + startId);
checkClosing(tid);
SsccTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
//clone the delete
//just to change the timestamp. But I hope the overhead is not too big a concern here
byte[] rowkey = delete.getRow();
Delete newDelete = new Delete( rowkey,startId );
NavigableMap<byte[], List<Cell>> familyCellMap = delete.getFamilyCellMap();
byte[] mergedColsV = null;
byte[] cv = null;
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
cv = null;
cv = byteMerger("|".getBytes(), null);
cv = byteMerger(cv,qualifier);
cv = byteMerger(cv,"|".getBytes());
byte[] currentCollist = state.getColList(rowkey);
newDelete.deleteColumns(family,qualifier,startId); //NOTE: HBase 1.0 this API will change ...
//here use deleteColumns with timestamp, so it will delete all history version of this row
//but the real delete is not done at this point, but when doCommit
//Only when it goes to doCommit(), it is safe to delete all versions
//Otherwise, SSCC use MVCC in hbase to save history versions, those versions may need in other transaction
// to get a snapshot value in the before.
//Another choice here is to use deleteColumn instead of using deleteColumns, so it will only delete the specific version
// specified by the startId. But I suppose these two methods are same. Need more test maybe.
if( indexOf(currentCollist,cv) != -1) //already in this list
{
mergedColsV = byteMerger(currentCollist,null);
continue;
}
mergedColsV = byteMerger(currentCollist,cv);
state.addToColList(rowkey,mergedColsV);
}
}
Get statusGet = new Get(rowkey);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions();
Result statusResult = m_Region.get(statusGet);
List<Cell> sl = null;
if(statusResult != null ) sl = statusResult.listCells();
// All deletes are treated as stateless, so no need to retrieve the versions
if(state.hasConflict(sl,null,true,startId,tid) == false){
state.addToDelList(newDelete);
/*update the status metadata*/
Put statusPut = new Put(rowkey,startId );
byte[] statusValue;
statusValue = SsccConst.generateStatusValue(SsccConst.S_DELETE_BYTE,tid); //stateless delete
statusPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.STATUS_COL, startId , statusValue);
statusPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.COLUMNS_COL,startId , mergedColsV);
m_Region.put(statusPut);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: deleteRegionTx: STATELESS_UPDATE_OK");
}
else
{
// Return conflict, but don't trigger and abort. That needs to be triggered from the client, if desired.
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: delete: STATELESS_UPDATE_CONFLICT");
return STATELESS_UPDATE_CONFLICT;
}
// Now we must perform conflict checking.
boolean success = commitIfPossible(tid);
if (! success) {
LOG.error("deleteRegionTx - tid " + tid + " was not successful. Throwing IOException ");
throw new IOException("deleteRegionTx - tid " + tid + " was not successful. Throwing IOException ");
}
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx -- EXIT returning STATELESS_UPDATE_OK for tid: " + tid);
return STATELESS_UPDATE_OK;
}
@Override
public void delete(RpcController controller,
SsccDeleteTransactionalRequest request,
RpcCallback<SsccDeleteTransactionalResponse> done) {
SsccDeleteTransactionalResponse response = SsccDeleteTransactionalResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
WrongRegionException wre = null;
long transId = request.getTransactionId();
long startId = request.getStartId();
if (wre == null && type == MutationType.DELETE && proto.hasRow())
row = proto.getRow().toByteArray();
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:delete threw exception");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
// Process in local memory
int status = 0;
try {
status = delete(transId, startId, delete);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:delete threw exception after internal delete");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: delete - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toString(row));
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteTransactionalResponse.Builder deleteTransactionalResponseBuilder = SsccDeleteTransactionalResponse.newBuilder();
deleteTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
deleteTransactionalResponseBuilder.setHasException(true);
deleteTransactionalResponseBuilder.setException(t.toString());
}
if (wre != null)
{
deleteTransactionalResponseBuilder.setHasException(true);
deleteTransactionalResponseBuilder.setException(wre.toString());
}
deleteTransactionalResponseBuilder.setStatus(status);
SsccDeleteTransactionalResponse dresponse = deleteTransactionalResponseBuilder.build();
done.run(dresponse);
}
/**
* Processes a transactional delete
* @param long transactionId
* @param Delete delete
* @return int
* @throws IOException
*/
public int delete(final long transactionId, final long startId, final Delete delete)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: delete -- ENTRY txId: " + transactionId + ", startId:" + startId);
checkClosing(transactionId);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
//clone the delete
//just to change the timestamp. But I hope the overhead is not too big a concern here
byte[] rowkey = delete.getRow();
Delete newDelete = new Delete( rowkey,startId );
NavigableMap<byte[], List<Cell>> familyCellMap = delete.getFamilyCellMap();
byte[] mergedColsV = null;
byte[] cv = null;
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
cv = null;
cv = byteMerger("|".getBytes(), null);
cv = byteMerger(cv,qualifier);
cv = byteMerger(cv,"|".getBytes());
byte[] currentCollist = state.getColList(rowkey);
newDelete.deleteColumns(family,qualifier,startId); //NOTE: HBase 1.0 this API will change ...
//here use deleteColumns with timestamp, so it will delete all history version of this row
//but the real delete is not done at this point, but when doCommit
//Only when it goes to doCommit(), it is safe to delete all versions
//Otherwise, SSCC use MVCC in hbase to save history versions, those versions may need in other transaction
// to get a snapshot value in the before.
//Another choice here is to use deleteColumn instead of using deleteColumns, so it will only delete the specific version
// specified by the startId. But I suppose these two methods are same. Need more test maybe.
if( indexOf(currentCollist,cv) != -1) //already in this list
{
mergedColsV = byteMerger(currentCollist,null);
continue;
}
mergedColsV = byteMerger(currentCollist,cv);
state.addToColList(rowkey,mergedColsV);
}
}
Get statusGet = new Get(rowkey);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions();
Result statusResult = m_Region.get(statusGet);
List<Cell> sl = null;
if(statusResult != null ) sl = statusResult.listCells();
// All deletes are treated as stateless, so no need to retrieve the versions
if(state.hasConflict(sl,null,true,startId,transactionId) == false){
state.addToDelList(newDelete);
/*update the status metadata*/
Put statusPut = new Put(rowkey,startId );
byte[] statusValue;
statusValue = SsccConst.generateStatusValue(SsccConst.S_DELETE_BYTE,transactionId); //stateless delete
statusPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.STATUS_COL, startId , statusValue);
statusPut.add(DtmConst.TRANSACTION_META_FAMILY ,SsccConst.COLUMNS_COL,startId , mergedColsV);
m_Region.put(statusPut);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: delete: STATELESS_UPDATE_OK");
return STATELESS_UPDATE_OK;
}
else
{
// Return conflict, but don't trigger and abort. That needs to be triggered from the client, if desired.
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: delete: STATELESS_UPDATE_CONFLICT");
return STATELESS_UPDATE_CONFLICT;
}
}
/***********************************************************************************
************************* Helper functions ************************************
***********************************************************************************/
public static int indexOf(byte[] source, byte[] find) {
if (source == null) {
return -1;
}
if (find == null) {
return -1;
}
if (find.length > source.length) {
return -1;
}
final int maxIndex = source.length - find.length;
final int maxLength = find.length;
final byte firstByte = find[0];
int index = 0;
Loop:
do {
if (source[index] == firstByte ) {
for (int i = 1; i < maxLength; i++) {
if (source[index + i] != find[i]) {
continue Loop;
}
}
return index;
}
} while (++index <= maxIndex);
return -1;
}
public static byte[] byteMerger(byte[] byte_1, byte[] byte_2){
if (byte_1 == null)
{
if (byte_2 == null)
return null;
else
return byte_2;
}
else
{
if (byte_2 == null)
return byte_1;
}
byte[] byte_3 = new byte[byte_1.length+byte_2.length];
System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
return byte_3;
}
/** methods to handle the lookup table
*/
/**
generate a local transId and update the lookup table
*/
/* public void generateLocalId(SsccTransactionState state)
{
long localId = nextLocalTransId.getAndIncrement();
state.setStartId(localId);
}
*/
public long getStartIdFromTs(long ts)
{
long id = 0;
if(updateTsToStartId.containsKey(ts))
id= updateTsToStartId.get(ts);
return id;
}
public void mapStartIdFromTs(long ts, long id)
{
updateTsToStartId.put(ts,id);
}
/**
* Retires the transaction
* @param SsccTransactionState state
*/
private void retireTransaction(final SsccTransactionState state) {
//long key = state.getTransactionId();
String key = getTransactionalUniqueId(state.getTransactionId());
state.clearStateResource();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction: ["
+ state.getTransactionId() + "]");
try {
transactionLeases.cancelLease(key);
} catch (LeaseException le) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction: ["
+ state.getTransactionId() + "] LeaseException");
// Ignore
} catch (Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction: ["
+ state.getTransactionId() + "] General Lease exception");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Caught exception " + e.getMessage() + "" + stackTraceToString(e));
// Ignore
}
// Clearing transaction conflict check list in case it is holding
// a reference to a transaction state
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction clearTransactionsById: " + key + " from list");
//state.clearTransactionsToCheck();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction calling Removing transaction: " + key + " from list");
transactionsById.remove(key);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: retireTransaction Removed transaction: " + key + " from list");
}
public void checkAndDeleteRegionTx(RpcController controller,
SsccCheckAndDeleteRegionTxRequest request,
RpcCallback<SsccCheckAndDeleteRegionTxResponse> done) {
/*
SsccCheckAndDeleteRegionTxResponse response = SsccCheckAndDeleteRegionTxResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
boolean result = false;
long tid = request.getTid();
long startId = request.getStartId();
boolean autoCommit = request.getAutoCommit();
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteRegionTxResponse.Builder checkAndDeleteRegionTxResponseBuilder = SsccCheckAndDeleteRegionTxResponse.newBuilder();
if (wre == null && type == MutationType.DELETE && proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx caught exception ", e);
t = e;
}
// Process in local memory
if (delete != null && t == null)
{
if (!Bytes.equals(rowArray, request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"Delete row must match the passed row");
}
if (t == null) {
try {
result = checkAndDeleteRegionTx(tid, startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
delete,
autoCommit);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx caught exception ", e);
t = e;
}
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx setting result: " + result);
checkAndDeleteRegionTxResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndDeleteRegionTxResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx result is " + result);
checkAndDeleteRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
checkAndDeleteRegionTxResponseBuilder.setHasException(true);
checkAndDeleteRegionTxResponseBuilder.setException(t.toString());
}
if (wre != null)
{
checkAndDeleteRegionTxResponseBuilder.setHasException(true);
checkAndDeleteRegionTxResponseBuilder.setException(wre.toString());
}
SsccCheckAndDeleteRegionTxResponse checkAndDeleteRegionTxResponse = checkAndDeleteRegionTxResponseBuilder.build();
done.run(checkAndDeleteRegionTxResponse);
*/
}
@Override
public void checkAndDelete(RpcController controller,
SsccCheckAndDeleteRequest request,
RpcCallback<SsccCheckAndDeleteResponse> done) {
SsccCheckAndDeleteResponse response = SsccCheckAndDeleteResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
boolean result = false;
long transId = request.getTransactionId();
long startId = request.getStartId();
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndDeleteResponse.Builder checkAndDeleteResponseBuilder = SsccCheckAndDeleteResponse.newBuilder();
if (wre == null && type == MutationType.DELETE && proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
// Process in local memory
if (delete != null && t == null)
{
if (request.hasRow()) {
row = request.getRow();
if (!Bytes.equals(rowArray, request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"Delete row must match the passed row");
}
if (t == null) {
if (request.hasRow())
row = request.getRow();
if (request.hasFamily())
family = request.getFamily();
if (request.hasQualifier())
qualifier = request.getQualifier();
if (request.hasValue())
value = request.getValue();
try {
result = checkAndDelete(transId, startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
delete);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete setting result: " + result);
checkAndDeleteResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndDeleteResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete result is " + result);
checkAndDeleteResponseBuilder.setHasException(false);
if (t != null)
{
checkAndDeleteResponseBuilder.setHasException(true);
checkAndDeleteResponseBuilder.setException(t.toString());
}
if (wre != null)
{
checkAndDeleteResponseBuilder.setHasException(true);
checkAndDeleteResponseBuilder.setException(wre.toString());
}
SsccCheckAndDeleteResponse checkAndDeleteResponse = checkAndDeleteResponseBuilder.build();
done.run(checkAndDeleteResponse);
}
@Override
public void checkAndPut(RpcController controller,
SsccCheckAndPutRequest request,
RpcCallback<SsccCheckAndPutResponse> done) {
SsccCheckAndPutResponse response = SsccCheckAndPutResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
WrongRegionException wre = null;
Throwable t = null;
boolean result = false;
long transId = request.getTransactionId();
long startId = request.getStartId();
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutResponse.Builder checkAndPutResponseBuilder = SsccCheckAndPutResponse.newBuilder();
if (wre == null && type == MutationType.PUT && proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
LOG.error("SsccRegionEndpoint coprocessor: checkAndPut caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
// Process in local memory
if (put != null)
{
if (request.hasRow()) {
row = request.getRow();
if (!Bytes.equals(rowArray, row.toByteArray())) {
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's Put row must match the passed row");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut creating DoNotRetryIOException: setting result: Put row must match the passed row\n" +
"rowArray: " + Bytes.toStringBinary(rowArray) + ", rowArray in hex: " + Hex.encodeHexString(rowArray) + ", row: " +
Bytes.toStringBinary(request.getRow().toByteArray()) + ", row in hex: " + Hex.encodeHexString(request.getRow().toByteArray()));
}
}
if (t == null) {
if (request.hasRow())
row = request.getRow();
if (request.hasFamily())
family = request.getFamily();
if (request.hasQualifier())
qualifier = request.getQualifier();
if (request.hasValue())
value = request.getValue();
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: attempting checkAndPut for trans: " + transId +
" startid: " + startId);
result = checkAndPut(transId, startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
put);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut returned " + result);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut threw exception after internal checkAndPut");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAnd Put caught exception " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut setting result: " + result);
checkAndPutResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndPutResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut result is " + result);
checkAndPutResponseBuilder.setHasException(false);
if (wre != null)
{
checkAndPutResponseBuilder.setHasException(true);
checkAndPutResponseBuilder.setException(wre.toString());
}
if (t != null)
{
checkAndPutResponseBuilder.setHasException(true);
checkAndPutResponseBuilder.setException(t.toString());
}
SsccCheckAndPutResponse checkAndPutResponse = checkAndPutResponseBuilder.build();
done.run(checkAndPutResponse);
}
@Override
public void checkAndPutRegionTx(RpcController controller,
SsccCheckAndPutRegionTxRequest request,
RpcCallback<SsccCheckAndPutRegionTxResponse> done) {
SsccCheckAndPutRegionTxResponse response = SsccCheckAndPutRegionTxResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
WrongRegionException wre = null;
Throwable t = null;
boolean result = false;
long tid = request.getTid();
long startId = request.getStartId();
boolean autoCommit = request.getAutoCommit();
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccCheckAndPutRegionTxResponse.Builder checkAndPutRegionTxResponseBuilder = SsccCheckAndPutRegionTxResponse.newBuilder();
if (wre == null && type == MutationType.PUT && proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
LOG.error("checkAndPutRegionTx caught exception ", e);
t = e;
}
// Process in local memory
if (put != null)
{
if (request.hasRow()) {
row = request.getRow();
if (!Bytes.equals(rowArray, row.toByteArray())) {
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's Put row must match the passed row");
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx creating DoNotRetryIOException: setting result: Put row must match the passed row\n" +
"rowArray: " + Bytes.toStringBinary(rowArray) + ", rowArray in hex: " + Hex.encodeHexString(rowArray) + ", row: " +
Bytes.toStringBinary(request.getRow().toByteArray()) + ", row in hex: " + Hex.encodeHexString(request.getRow().toByteArray()));
}
}
if (t == null) {
if (request.hasRow())
row = request.getRow();
if (request.hasFamily())
family = request.getFamily();
if (request.hasQualifier())
qualifier = request.getQualifier();
if (request.hasValue())
value = request.getValue();
try {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx for tid: " + tid +
" startid: " + startId);
result = checkAndPutRegionTx(tid, startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
put,
autoCommit);
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx returned " + result);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx threw exception after internal checkAndPutRegionTx ", e);
t = e;
}
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx setting result: " + result);
checkAndPutRegionTxResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndPutRegionTxResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx result is " + result);
checkAndPutRegionTxResponseBuilder.setHasException(false);
if (wre != null)
{
checkAndPutRegionTxResponseBuilder.setHasException(true);
checkAndPutRegionTxResponseBuilder.setException(wre.toString());
}
if (t != null)
{
checkAndPutRegionTxResponseBuilder.setHasException(true);
checkAndPutRegionTxResponseBuilder.setException(t.toString());
}
SsccCheckAndPutRegionTxResponse checkAndPutRegionTxResponse = checkAndPutRegionTxResponseBuilder.build();
done.run(checkAndPutRegionTxResponse);
}
/**
* Processes a checkAndDelete operation using a region transaction
* @param long transactionId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Delete delete
* @param boolean autoCommit
* @return boolean
* @throws IOException
*/
public boolean checkAndDeleteRegionTx(long tid, long startId,
byte[] row, byte[] family,
byte[] qualifier, byte[] value,
Delete delete, boolean autoCommit) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter checkAndDeleteRegionTx, tid: " + tid);
SsccTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a regionTX
boolean result = false;
byte[] rsValue = null;
Get get = new Get(row);
get.addColumn(family, qualifier);
Result rs = this.get(tid, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
int lv_return = 0;
if (rs.isEmpty() && valueIsNull) {
lv_return = this.deleteRegionTx(tid, startId, delete, autoCommit);
result = (lv_return == STATELESS_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx, tid: "
+ tid + " rsValue.length == 0, lv_return: " + lv_return
+ ", result is " + result);
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
lv_return = this.deleteRegionTx(tid, startId, delete, autoCommit);
result = (lv_return == STATELESS_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx, tid: "
+ tid + " rsValue.length == 0, lv_return: " + lv_return
+ ", result is: " + result);
}
else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx, tid: "
+ tid + " rsValue.length != 0, result is false");
result = false;
}
} else if ((!rs.isEmpty())
&& !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
lv_return = this.deleteRegionTx(tid, startId, delete, autoCommit);
result = (lv_return == STATELESS_UPDATE_OK) ? true : false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx, tid: "
+ tid + " rsValue matches the row, lv_return is: " + lv_return
+ ", result is: " + result);
} else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx setting result to " + result + ", row: " + row);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx EXIT, result is " + result + " row: " + row);
return result;
}
/**
* Processes a transactional checkAndDelete
* @param long transactionId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Delete delete
* @return boolean
* @throws IOException
*/
public boolean checkAndDelete(long transactionId, long startId,
byte[] row, byte[] family,
byte[] qualifier, byte[] value, Delete delete)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter SsccRegionEndpoint coprocessor: checkAndDelete, txid: " + transactionId);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
boolean result = false;
byte[] rsValue = null;
Get get = new Get(row);
get.addColumn(family, qualifier);
Result rs = this.get(transactionId, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
int lv_return = 0;
if (rs.isEmpty() && valueIsNull) {
lv_return = this.delete(transactionId, startId, delete);
result = (lv_return == STATELESS_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete, txid: "
+ transactionId + " rsValue.length == 0, lv_return: " + lv_return + ", result is " + result);
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
lv_return = this.delete(transactionId, startId, delete);
result = (lv_return == STATELESS_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete, txid: "
+ transactionId + " rsValue.length == 0, lv_return: " + lv_return + ", result is: " + result);
}
else {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete, txid: "
+ transactionId + " rsValue.length != 0, result is false");
result = false;
}
} else if ((!rs.isEmpty())
&& !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
lv_return = this.delete(transactionId, startId, delete);
result = (lv_return == STATELESS_UPDATE_OK) ? true : false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete, txid: "
+ transactionId + " rsValue matches the row, lv_return is: " + lv_return + ", result is: " + result);
} else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete setting result is " + result + " row: " + row);
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndDelete EXIT result is " + result + " row: " + row);
return result;
}
/**
* Processes a checkAndPut operation using a region transaction
* @param long tid
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Put put
* @param boolean autoCommit
* @return boolean
* @throws IOException
*/
public boolean checkAndPutRegionTx(long tid, long startId, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Put put,
final boolean autoCommit)throws IOException {
SsccTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
boolean result = false;
byte[] rsValue = null;
Get get = new Get(row);
get.addColumn(family, qualifier);
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: " + tid +
", startId: " + startId + ", row " + row + ", autoCommit " + autoCommit);
Result rs = this.get(tid, startId, get);
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + ", result is empty " + rs.isEmpty() +
", value is " + Bytes.toString(value));
boolean valueIsNull = value == null || value.length == 0;
int lv_return = 0;
if (rs.isEmpty() && valueIsNull) {
lv_return = this.put(tid, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + " valueIsNull, lv_return is: " + lv_return + ", result is: " + result);
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
lv_return = this.put(tid, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + " rsValue.length == 0, lv_return: " + lv_return
+ ", result is: " + result);
}
else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + " rsValue.length != 0, result is false");
}
} else if ((!rs.isEmpty()) && !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
lv_return = this.put(tid, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true : false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + " rsValue matches the row, lv_return is: " + lv_return + ", result is: " + result);
} else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx, tid: "
+ tid + " last case, result is false");
}
// Now we must perform conflict checking.
boolean success = commitIfPossible(tid);
if (! success) {
LOG.error("checkAndPutRegionTx - tid " + tid + " was not successful. Throwing IOException ");
throw new IOException("checkAndPutRegionTx - tid " + tid + " was not successful. Throwing IOException ");
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx tid " + tid
+ " returns " + result + " for row: " + row);
return result;
}
/**
* Processes a transactional checkAndPut
* @param long transactionId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Put put
* @return boolean
* @throws IOException
*/
public boolean checkAndPut(long transactionId, long startId, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Put put)
throws IOException {
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
boolean result = false;
byte[] rsValue = null;
Get get = new Get(row);
get.addColumn(family, qualifier);
if (LOG.isTraceEnabled()) LOG.trace("Enter SsccRegionEndpoint coprocessor: checkAndPut, txid: " + transactionId +
", startId: " + startId + ", row " + row);
Result rs = this.get(transactionId, startId, get);
if (LOG.isTraceEnabled()) LOG.trace("Enter SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + ", result is empty " + rs.isEmpty() +
", value is " + Bytes.toString(value));
boolean valueIsNull = value == null || value.length == 0;
int lv_return = 0;
if (rs.isEmpty() && valueIsNull) {
lv_return = this.put(transactionId, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + " valueIsNull, lv_return is: " + lv_return + ", result is: " + result);
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
lv_return = this.put(transactionId, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true: false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + " rsValue.length == 0, lv_return: " + lv_return + ", result is: " + result);
}
else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + " rsValue.length != 0, result is false");
}
} else if ((!rs.isEmpty()) && !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
lv_return = this.put(transactionId, startId, put, false /* stateful */);
result = (lv_return == STATEFUL_UPDATE_OK) ? true : false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + " rsValue matches the row, lv_return is: " + lv_return + ", result is: " + result);
} else {
result = false;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut, txid: "
+ transactionId + " last case, result is false");
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: checkAndPut returns " + result + " for row: " + row);
return result;
}
@Override
public void deleteMultiple(RpcController controller,
SsccDeleteMultipleTransactionalRequest request,
RpcCallback<SsccDeleteMultipleTransactionalResponse> done) {
SsccDeleteMultipleTransactionalResponse response = SsccDeleteMultipleTransactionalResponse.getDefaultInstance();
java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto> results;
results = request.getDeleteList();
int resultCount = request.getDeleteCount();
byte [] row = null;
Delete delete = null;
MutationType type;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
long transId = request.getTransactionId();
long startId = request.getStartId();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor deleteMultiple: Entry");
if (wre == null) {
for (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto proto : results)
{
delete = null;
if (proto != null)
{
type = proto.getMutateType();
if (type == MutationType.DELETE && proto.hasRow())
{
row = proto.getRow().toByteArray();
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor deleteMultiple:delete Caught exception " +
"after protobuf conversion " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
// Process in local memory
if (delete != null)
{
try {
status = delete(transId, startId, delete);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor deleteMultiple:delete Caught exception " +
"after internal delete " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: deleteMultiple - id " + transId +
", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toString(row));
if (status != STATELESS_UPDATE_OK) {
String returnString;
switch (status){
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(status));
break;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:deleteMultiple returned status: " + returnString);
break;
}
}
}
}
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: deleteMultiple - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", delete proto was null");
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccDeleteMultipleTransactionalResponse.Builder deleteMultipleTransactionalResponseBuilder = SsccDeleteMultipleTransactionalResponse.newBuilder();
deleteMultipleTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
deleteMultipleTransactionalResponseBuilder.setHasException(true);
deleteMultipleTransactionalResponseBuilder.setException(t.toString());
}
if (wre != null)
{
deleteMultipleTransactionalResponseBuilder.setHasException(true);
deleteMultipleTransactionalResponseBuilder.setException(wre.toString());
}
deleteMultipleTransactionalResponseBuilder.setStatus(status);
SsccDeleteMultipleTransactionalResponse dresponse = deleteMultipleTransactionalResponseBuilder.build();
done.run(dresponse);
}
@Override
public void putMultiple(RpcController controller,
SsccPutMultipleTransactionalRequest request,
RpcCallback<SsccPutMultipleTransactionalResponse> done) {
SsccPutMultipleTransactionalResponse response = SsccPutMultipleTransactionalResponse.getDefaultInstance();
java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto> results;
boolean stateless = false;
if (request.hasIsStateless()) {
stateless = request.getIsStateless();
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor putMultiple: Entry, stateless: " + stateless);
results = request.getPutList();
int resultCount = request.getPutCount();
byte [] row = null;
Put put = null;
MutationType type;
Throwable t = null;
WrongRegionException wre = null;
int status = 0;
long transId = request.getTransactionId();
long startId = request.getStartId();
if (wre == null){
for (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto proto : results)
{
put = null;
if (proto != null)
{
type = proto.getMutateType();
if (type == MutationType.PUT && proto.hasRow())
{
row = proto.getRow().toByteArray();
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor putMultiple: converting put");
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor putMultiple:put Caught exception " +
"after protobuf conversion " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
// Process in local memory
if (put != null)
{
try {
status = put(transId, startId, put, stateless);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:putMultiple returned status: " + status);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor putMultiple:put Caught exception " +
"after internal put " + e.getMessage() + "" + stackTraceToString(e));
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: putMultiple - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toString(row));
}
else {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: putMultiple - put is null ");
}
if (status != STATEFUL_UPDATE_OK) {
String returnString;
switch (status){
case STATEFUL_UPDATE_OK:
returnString = new String("STATEFUL_UPDATE_OK");
break;
case STATEFUL_UPDATE_CONFLICT:
returnString = new String("STATEFUL_UPDATE_CONFLICT");
break;
case STATELESS_UPDATE_OK:
returnString = new String("STATELESS_UPDATE_OK");
break;
case STATELESS_UPDATE_CONFLICT:
returnString = new String("STATELESS_UPDATE_CONFLICT");
break;
default:
returnString = new String("Unknown return value: " + Integer.toString(status));
break;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:putMultiple returned status: " + returnString);
break;
}
}
}
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: putMultiple - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", proto was null");
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutMultipleTransactionalResponse.Builder putMultipleTransactionalResponseBuilder = SsccPutMultipleTransactionalResponse.newBuilder();
putMultipleTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
putMultipleTransactionalResponseBuilder.setHasException(true);
putMultipleTransactionalResponseBuilder.setException(t.toString());
}
if (wre != null)
{
putMultipleTransactionalResponseBuilder.setHasException(true);
putMultipleTransactionalResponseBuilder.setException(wre.toString());
}
putMultipleTransactionalResponseBuilder.setStatus(status);
SsccPutMultipleTransactionalResponse pmresponse = putMultipleTransactionalResponseBuilder.build();
done.run(pmresponse);
}
//****************************************************************
//
// ANYTHING ABOVE THIS LINE HAS BEEN CONVERTED FOR SSCC
//
// ANYTHING BELOW THIS COMMENT HAS NOT BEEN CONVERTED FOR SSCC
// AND IS LEFT HERE FOR REFERENCE ONLY. WILL BE REMOVED AFTER POC
//
//****************************************************************
// TrxRegionService methods
@Override
public void recoveryRequest(RpcController controller,
SsccRecoveryRequestRequest request,
RpcCallback<SsccRecoveryRequestResponse> done) {
int tmId = request.getTmId();
Throwable t = null;
WrongRegionException wre = null;
long transId = request.getTransactionId();
long startId = request.getStartId();
if (reconstructIndoubts == 0) {
if(LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: RECOV recovery Request");
// try {
constructIndoubtTransactions();
// }
// catch (IdTmException e){
// LOG.error("SsccRegionEndpoint coprocessor: RECOV recovery Request exception " + e);
// t = new IOException("SsccRegionEndpoint coprocessor: RECOV recovery Request exception " + e);
// }
// catch (Exception e2){
// LOG.error("SsccRegionEndpoint coprocessor: RECOV recovery Request exception " + e2);
// t = new IOException("SsccRegionEndpoint coprocessor: RECOV recovery Request exception " + e2);
// }
}
// Placeholder for real work when recovery is added
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: recoveryResponse - id " + transId + ", regionName " + regionInfo.getRegionNameAsString() + ", tmId" + tmId);
org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccRecoveryRequestResponse.Builder recoveryResponseBuilder = SsccRecoveryRequestResponse.newBuilder();
List<Long> indoubtTransactions = new ArrayList<Long>();
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + regionInfo.getEncodedName() + " receives recovery request from TM " + tmId + " with region state " + regionState);
switch(regionState) {
case 1: // INIT, assume open the TRegion if necessary
regionState = 1; //Note. ??? should we call openHRegion directly here
break;
case 0: // RECOVERING, already create a list of in-doubt txn, but still in the state of resolving them,
// retrieve all in-doubt txn from rmid and return them into a long a
for (Entry<Long, WALEdit> entry : indoubtTransactionsById.entrySet()) {
long tid = entry.getKey();
if ((int) TransactionState.getNodeId(tid) == tmId) {
indoubtTransactions.add(tid);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction " + tid + " has been added into the recovery repply to TM " + tmId);
}
}
break;
case 2: // START
break;
}
// Placeholder response forced to zero for now
for (Long transactionId:indoubtTransactions) {
recoveryResponseBuilder.addResult(transactionId);
}
// Placeholder response forced to zero for now
recoveryResponseBuilder.setHasException(false);
if (t != null)
{
recoveryResponseBuilder.setHasException(true);
recoveryResponseBuilder.setException(t.toString());
}
if (wre != null)
{
recoveryResponseBuilder.setHasException(true);
recoveryResponseBuilder.setException(wre.toString());
}
SsccRecoveryRequestResponse rresponse = recoveryResponseBuilder.build();
done.run(rresponse);
}
/**
* Gives the maximum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, maximum value for the
* entire column family will be returned.
*/
@Override
public void getMax(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getMax entry");
RegionScanner scanner = null;
SsccTransactionalAggregateResponse response = null;
T max = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
T temp;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = getScanner(transactionId, startId, scan);
List<Cell> results = new ArrayList<Cell>();
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
// qualifier can be null.
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
}
results.clear();
} while (hasMoreRows);
if (max != null) {
SsccTransactionalAggregateResponse.Builder builder = SsccTransactionalAggregateResponse.newBuilder();
builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
response = builder.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getMax - Maximum from this region is "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
done.run(response);
}
/**
* Gives the minimum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, minimum value for the
* entire column family will be returned.
*/
@Override
public void getMin(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getMin entry");
SsccTransactionalAggregateResponse response = null;
RegionScanner scanner = null;
T min = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
List<Cell> results = new ArrayList<Cell>();
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
}
results.clear();
} while (hasMoreRows);
if (min != null) {
response = SsccTransactionalAggregateResponse.newBuilder().addFirstPart(
ci.getProtoForCellType(min).toByteString()).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("Minimum from this region is "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
done.run(response);
}
/**
* Gives the sum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, sum for the entire column
* family will be returned.
*/
@Override
public void getSum(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getSum entry");
SsccTransactionalAggregateResponse response = null;
RegionScanner scanner = null;
long sum = 0L;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
boolean firstCell;
do {
hasMoreRows = scanner.next(results);
firstCell=true;
Result verResult = null;
Result statusResult = null;
Result colResult = null;
for (Cell c : results) {
if(firstCell == true){
if(CellUtil.cloneFamily(c) != DtmConst.TRANSACTION_META_FAMILY){
//get the statusList
Get statusGet = new Get(c.getRow()); //TODO: deprecated API
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getSum first row: " + c.getRow());
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
statusResult = m_Region.get(statusGet);
//get the colList
Get colGet = new Get(c.getRow()); //TODO: deprecated API
//colGet.setTimeStamp(startId);
colGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.COLUMNS_COL);
colGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
colResult = m_Region.get(colGet);
//get the versionList
Get verGet = new Get(c.getRow());//TODO: deprecated API
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
verResult = m_Region.get(verGet);
firstCell = false;
}
if(firstCell == false) {
temp = ci.getValue(colFamily, qualifier, c);
if (temp != null) {
if (state.handleResult(c,statusResult.listCells(),verResult.listCells(),colResult.listCells(),transactionId) == true) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getSum adding cell: " + c.getRow());
sumVal = ci.add(sumVal, ci.castToReturnType(temp));
break;
}
}
}
}
}
results.clear();
} while (hasMoreRows);
if (sumVal != null) {
response = SsccTransactionalAggregateResponse.newBuilder().addFirstPart(
ci.getProtoForPromotedType(sumVal).toByteString()).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("Sum from this region is "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
done.run(response);
}
/**
* Gives the row count for the given column family and column qualifier, in
* the given row range as defined in the Scan object.
* @throws IOException
*/
@Override
public void getRowNum(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getRowNum entry");
SsccTransactionalAggregateResponse response = null;
long counter = 0L;
List<Cell> results = new ArrayList<Cell>();
RegionScanner scanner = null;
long transactionId = 0L;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[][] colFamilies = scan.getFamilies();
byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
NavigableSet<byte[]> qualifiers = colFamilies != null ?
scan.getFamilyMap().get(colFamily) : null;
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
boolean hasMoreRows = false;
boolean firstCell;
do {
hasMoreRows = scanner.next(results);
firstCell=true;
Result verResult = null;
Result statusResult = null;
Result colResult = null;
for (Cell c : results) {
if(firstCell == true){
if(CellUtil.cloneFamily(c) != DtmConst.TRANSACTION_META_FAMILY){
//get the statusList
Get statusGet = new Get(c.getRow()); //TODO: deprecated API
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getRowNum first row: " + c.getRow());
//statusGet.setTimeStamp(startId);
statusGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.STATUS_COL);
statusGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
statusResult = m_Region.get(statusGet);
//get the colList
Get colGet = new Get(c.getRow()); //TODO: deprecated API
//colGet.setTimeStamp(startId);
colGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.COLUMNS_COL);
colGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
colResult = m_Region.get(colGet);
//get the versionList
Get verGet = new Get(c.getRow());//TODO: deprecated API
//verGet.setTimeStamp(startId);
verGet.addColumn(DtmConst.TRANSACTION_META_FAMILY,SsccConst.VERSION_COL);
verGet.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
verResult = m_Region.get(verGet);
firstCell = false;
}
if(firstCell == false) {
if (state.handleResult(c,statusResult.listCells(),verResult.listCells(),colResult.listCells(),transactionId) == true) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getRowNum adding cell: " + c.getRow());
counter++;
break;
}
}
}
}
results.clear();
} while (hasMoreRows);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
bb.rewind();
response = SsccTransactionalAggregateResponse.newBuilder().addFirstPart(
ByteString.copyFrom(bb)).build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("Row counter for transactionId " + transactionId + " from this region: "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + " is " + counter);
done.run(response);
}
/**
* Gives a Pair with first object as Sum and second object as row count,
* computed for a given combination of column qualifier and column family in
* the given row range as defined in the Scan object. In its current
* implementation, it takes one column family and one column qualifier (if
* provided). In case of null column qualifier, an aggregate sum over all the
* entire column family will be returned.
* <p>
* The average is computed in
* AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
* processing results from all regions, so its "ok" to pass sum and a Long
* type.
*/
@Override
public void getAvg(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getAvg entry");
SsccTransactionalAggregateResponse response = null;
RegionScanner scanner = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
results.clear();
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
SsccTransactionalAggregateResponse.Builder pair = SsccTransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Gives a Pair with first object a List containing Sum and sum of squares,
* and the second object as row count. It is computed for a given combination of
* column qualifier and column family in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* one column qualifier (if provided). The idea is get the value of variance first:
* the average of the squares less the square of the average a standard
* deviation is square root of variance.
*/
@Override
public void getStd(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getStd entry");
RegionScanner scanner = null;
SsccTransactionalAggregateResponse response = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
tempVal = null;
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
SsccTransactionalAggregateResponse.Builder pair = SsccTransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumSqVal);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Gives a List containing sum of values and sum of weights.
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
*/
@Override
public void getMedian(RpcController controller, SsccTransactionalAggregateRequest request,
RpcCallback<SsccTransactionalAggregateResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getMedian entry");
SsccTransactionalAggregateResponse response = null;
RegionScanner scanner = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] valQualifier = null, weightQualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
valQualifier = qualifiers.pollFirst();
// if weighted median is requested, get qualifier for the weight column
weightQualifier = qualifiers.pollLast();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
tempVal = null;
tempWeight = null;
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
valQualifier, kv)));
if (weightQualifier != null) {
tempWeight = ci.add(tempWeight,
ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
}
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumWeights = ci.add(sumWeights, tempWeight);
} while (hasMoreRows);
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
SsccTransactionalAggregateResponse.Builder pair = SsccTransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumWeights);
response = pair.build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
@SuppressWarnings("unchecked")
ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
SsccTransactionalAggregateRequest request) throws IOException {
String className = request.getInterpreterClassName();
Class<?> cls;
try {
cls = Class.forName(className);
ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
if (request.hasInterpreterSpecificBytes()) {
ByteString b = request.getInterpreterSpecificBytes();
P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
ci.initialize(initMsg);
}
return ci;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
@Override
public Service getService() {
return this;
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
* from the region where this coprocessor is loaded.
* Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("SsccRegionEndpoint coprocessor: start - Must be loaded on a table region!");
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: start");
RegionCoprocessorEnvironment tmp_env =
(RegionCoprocessorEnvironment)env;
#ifdef CDH5.7 APACHE1.2
this.m_Region =
tmp_env.getRegion();
#else
this.m_Region = (HRegion)
tmp_env.getRegion();
#endif
this.regionInfo = this.m_Region.getRegionInfo();
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
this.config = tmp_env.getConfiguration();
this.fs = FileSystem.get(config);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
synchronized (stoppableLock) {
try {
this.transactionLeaseTimeout = HBaseConfiguration.getInt(conf,
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
DEFAULT_LEASE_TIME);
this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
scannerThreadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.cleanTimer = conf.getInt(SLEEP_CONF, DEFAULT_SLEEP);
if (this.transactionLeases == null)
this.transactionLeases = new Leases(LEASE_CHECK_FREQUENCY);
if (LOG.isTraceEnabled()) LOG.trace("Transaction lease time: " + transactionLeaseTimeout);
if (LOG.isTraceEnabled()) LOG.trace("Scanner lease time: " + scannerThreadWakeFrequency);
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e)
{
LOG.fatal("CleanOldTransactionChore uncaughtException: " + t.getName(), e);
}
};
String n = Thread.currentThread().getName();
if (TransactionalLeasesThread == null) {
TransactionalLeasesThread = new Thread(this.transactionLeases);
if (TransactionalLeasesThread != null) {
Threads.setDaemonThreadRunning(TransactionalLeasesThread, "Transactional leases");
}
}
} catch (Exception e) {
throw new CoprocessorException("SsccRegionEndpoint coprocessor: start threw exception " + e);
}
}
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
RegionServerServices rss = tmp_env.getRegionServerServices();
tHLog = rss.getWAL(regionInfo);
ServerName sn = rss.getServerName();
lv_hostName = sn.getHostname();
lv_port = sn.getPort();
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Hostname " + lv_hostName + " port " + lv_port);
this.regionInfo = this.m_Region.getRegionInfo();
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
zkw1 = rss.getZooKeeper();
this.configuredEarlyLogging = conf.getBoolean("hbase.regionserver.region.transactional.earlylogging", false);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: get the reference from Region CoprocessorEnvrionment ");
if (tmp_env.getSharedData().isEmpty())
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: shared map is empty ");
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: shared map is NOT empty Yes ... ");
transactionsByIdTestz = TrxRegionObserver.getRefMap();
if (transactionsByIdTestz.isEmpty())
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: reference map is empty ");
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: reference map is NOT empty Yes ... ");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: UUU Region " + this.m_Region.getRegionInfo().getRegionNameAsString() + " check indoubt list from reference map ");
indoubtTransactionsById = (TreeMap<Long, WALEdit>)transactionsByIdTestz.get(
this.m_Region.getRegionInfo().getRegionNameAsString()+TrxRegionObserver.trxkeypendingTransactionsById);
indoubtTransactionsCountByTmid = (TreeMap<Integer,Integer>)transactionsByIdTestz.get(
this.m_Region.getRegionInfo().getRegionNameAsString()+TrxRegionObserver.trxkeyindoubtTransactionsCountByTmid);
if (indoubtTransactionsCountByTmid != null) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor:OOO successfully get the reference from Region CoprocessorEnvrionment ");
}
try {
idServer = new IdTm(false);
}
catch (Exception e){
LOG.error("SsccRegionEndpoint coprocessor: unble to new IdTm " + e);
}
long logSeqId = nextLogSequenceId.get();
long currentTime = System.currentTimeMillis();
long ssccSeqId = currentTime > logSeqId ? currentTime : logSeqId;
nextSsccSequenceId = new AtomicLong(ssccSeqId);
LOG.info("Generate SequenceID start from " + nextSsccSequenceId);
LOG.info("SsccRegionEndpoint coprocessor: start");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: stop ");
stoppable.stop("stop() SsccRegionEndpoint");
}
// Internal support methods
/**
* Checks if the region is closing
* @param long transactionId
* @return String
* @throws IOException
*/
private void checkClosing(final long transactionId) throws IOException {
if (closing) {
LOG.error("SsccRegionEndpoint coprocessor: Trafodion Recovery: checkClosing(" + transactionId + ") - raising exception. no more transaction allowed.");
throw new IOException("closing region, no more transaction allowed");
}
}
public void deleteRecoveryzNode(int node, String encodedName) throws IOException {
synchronized(zkRecoveryCheckLock) {
// default zNodePath
String zNodeKey = lv_hostName + "," + lv_port + "," + encodedName;
StringBuilder sb = new StringBuilder();
sb.append("TM");
sb.append(node);
String str = sb.toString();
String zNodePathTM = zNodePath + str;
String zNodePathTMKey = zNodePathTM + "/" + zNodeKey;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Delete region recovery znode" + node + " zNode Path " + zNodePathTMKey);
// delete zookeeper recovery zNode, call ZK ...
try {
ZKUtil.deleteNodeFailSilent(zkw1, zNodePathTMKey);
} catch (KeeperException e) {
throw new IOException("Trafodion Recovery Region Observer CP: ZKW Unable to delete recovery zNode to TM " + node, e);
}
}
} // end of deleteRecoveryzNode
/**
* Starts the region after a recovery
*/
public void startRegionAfterRecovery() throws IOException {
boolean isFlush = false;
try {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: Flushing cache in startRegionAfterRecovery " + m_Region.getRegionInfo().getRegionNameAsString());
#ifdef HDP2.3 HDP2.4 APACHE1.1 CDH5.7 APACHE1.2
m_Region.flush(true);
#else
m_Region.flushcache();
#endif
//if (!m_Region.flushcache().isFlushSucceeded()) {
// LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: Flushcache returns false !!! " + m_Region.getRegionInfo().getRegionNameAsString());
//}
} catch (IOException e) {
LOG.error("SsccRegionEndpoint coprocessor: Trafodion Recovery: Flush failed after replay edits" + m_Region.getRegionInfo().getRegionNameAsString());
return;
}
//FileSystem fileSystem = m_Region.getFilesystem();
//Path archiveTHLog = new Path (recoveryTrxPath.getParent(),"archivethlogfile.log");
//if (fileSystem.exists(archiveTHLog)) fileSystem.delete(archiveTHLog, true);
//if (fileSystem.exists(recoveryTrxPath))fileSystem.rename(recoveryTrxPath,archiveTHLog);
if (indoubtTransactionsById != null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: region " + recoveryTrxPath + " has " + indoubtTransactionsById.size() + " in-doubt transactions and edits are archived.");
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: region " + recoveryTrxPath + " has 0 in-doubt transactions and edits are archived.");
regionState = 2;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: Trafodion Recovery: region " + m_Region.getRegionInfo().getEncodedName() + " is STARTED.");
}
/**
* Rssolves the transaction from the log
* @param SsccTransactionState transactionState
* @throws IOException
*/
private void resolveTransactionFromLog(final SsccTransactionState transactionState) throws IOException {
LOG.error("SsccRegionEndpoint coprocessor: Global transaction log is not Implemented. (Optimisticly) assuming transaction commit!");
commit(transactionState);
}
/**
* TransactionLeaseListener
*/
private class TransactionLeaseListener implements LeaseListener {
//private final long transactionName;
private final String transactionName;
TransactionLeaseListener(final long n) {
this.transactionName = getTransactionalUniqueId(n);
}
public void leaseExpired() {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Transaction [" + this.transactionName
+ "] expired in region ["
+ m_Region.getRegionInfo().getRegionNameAsString() + "]");
SsccTransactionState s = null;
synchronized (transactionsById) {
s = transactionsById.remove(transactionName);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Removing transaction: " + this.transactionName + " from list");
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Removing transaction: " + this.transactionName + " from list");
}
if (s == null) {
LOG.warn("leaseExpired Unknown transaction expired " + this.transactionName);
return;
}
switch (s.getStatus()) {
case PENDING:
s.setStatus(Status.ABORTED);
break;
case COMMIT_PENDING:
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Transaction " + s.getTransactionId()
+ " expired in COMMIT_PENDING state");
String key = getTransactionalUniqueId(s.getTransactionId());
try {
if (s.getCommitPendingWaits() > MAX_COMMIT_PENDING_WAITS) {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Checking transaction status in transaction log");
resolveTransactionFromLog(s);
break;
}
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired renewing lease and hoping for commit");
s.incrementCommitPendingWaits();
synchronized (transactionsById) {
transactionsById.put(key, s);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: leaseExpired Adding transaction: " + s.getTransactionId() + " to list");
}
try {
transactionLeases.createLease(key, transactionLeaseTimeout, this);
} catch (LeaseStillHeldException e) {
transactionLeases.renewLease(key);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
break;
default:
LOG.warn("SsccRegionEndpoint coprocessor: leaseExpired Unexpected status on expired lease");
}
}
}
/**
* Processes multiple transactional deletes
* @param long transactionId
* @param Delete[] deletes
* @throws IOException
*/
public synchronized void delete(long transactionId, long startId, Delete[] deletes)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter SsccRegionEndpoint coprocessor: deletes[], txid: " + transactionId);
checkClosing(transactionId);
SsccTransactionState state = this.beginTransIfNotExist(transactionId, startId);
//for (Delete del : deletes) {
// state.addDelete(del);
//}
}
public void constructIndoubtTransactions() /*throws IdTmException*/ {
synchronized (recoveryCheckLock) {
if ((indoubtTransactionsById == null) || (indoubtTransactionsById.size() == 0)) {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor: Region " + regionInfo.getRegionNameAsString() + " has no in-doubt transaction, set region START ");
regionState = 2; // region is started for transactional access
reconstructIndoubts = 1;
try {
startRegionAfterRecovery();
} catch (IOException exp1) {
LOG.debug("Trafodion Recovery: flush error during region start");
}
return;
}
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor: Trafodion Recovery RegionObserver to Endpoint coprocessor data exchange test");
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor: try to access indoubt transaction list with size " + indoubtTransactionsById.size());
if (reconstructIndoubts == 0) {
//Retrieve (tid,Edits) from indoubt Transaction and construct/add into desired transaction data list
for (Entry<Long, WALEdit> entry : indoubtTransactionsById.entrySet()) {
long transactionId = entry.getKey();
String key = String.valueOf(transactionId);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor:E11 Region " + regionInfo.getRegionNameAsString() + " process in-doubt transaction " + transactionId);
IdTmId seqId;
// try {
// seqId = new IdTmId();
// if (LOG.isDebugEnabled()) LOG.debug("Trafodion Recovery Endpoint Coprocessor:getting new IdTM sequence ");
// idServer.id(ID_TM_SERVER_TIMEOUT, seqId);
// if (LOG.isDebugEnabled()) LOG.debug("Trafodion Recovery Endpoint Coprocessor: IdTM sequence is " + seqId.val);
// } catch (IdTmException exc) {
// LOG.error("Trafodion Recovery Endpoint Coprocessor: IdTm threw exception exc " + exc);
// throw new IdTmException("Trafodion Recovery Endpoint Coprocessor: IdTm threw exception exc " + exc);
// } catch (Exception exc2) {
// LOG.error("Trafodion Recovery Endpoint Coprocessor: IdTm threw exception exc2 " + exc2);
// throw new IdTmException("Trafodion Recovery Endpoint Coprocessor: IdTm threw exception exc2 " + exc2);
// }
//TBD Need to get HLOG ???
if (LOG.isTraceEnabled()) LOG.trace("constructIndoubtTransactions for transId " + transactionId);
SsccTransactionState state = new SsccTransactionState(transactionId, /* 1L my_Region.getLog().getSequenceNumber()*/
nextLogSequenceId.getAndIncrement(), nextLogSequenceId,
regionInfo, m_Region.getTableDesc(), tHLog, false,
// seqId.val);
nextSsccSequenceId.getAndIncrement());
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor:E22 Region " + regionInfo.getRegionNameAsString() + " create transaction state for " + transactionId);
state.setStartSequenceNumber(state.getStartId());
transactionsById.put(getTransactionalUniqueId(transactionId), state);
state.setReinstated();
state.setStatus(Status.COMMIT_PENDING);
commitPendingTransactions.add(state);
// try {
// idServer.id(ID_TM_SERVER_TIMEOUT, seqId);
// } catch (IdTmException exc) {
// LOG.error("Trafodion Recovery Endpoint Coprocessor: IdTm threw exception 2 " + exc);
// }
// state.setSequenceNumber(seqId.val);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint setting sequenceNumber to commitId: " + state.getCommitId());
state.setSequenceNumber(state.getCommitId());
// state.setSequenceNumber(nextSsccSequenceId.getAndIncrement());
commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state);
int tmid = (int) TransactionState.getNodeId(transactionId);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Endpoint Coprocessor:E33 Region " + regionInfo.getRegionNameAsString() + " add prepared " + transactionId + " to TM " + tmid);
} // for all txns in indoubt transcation list
} // not reconstruct indoubtes yet
reconstructIndoubts = 1;
} // synchronized
/* //TBD cleanup lists (this is for testing purpose only)
LOG.debug("Trafodion Recovery Endpoint Coprocessor:EEE Clean up recovery test for indoubt transactions");
transactionsById.clear();
commitPendingTransactions.clear();
commitedTransactionsBySequenceNumber.clear();
LOG.debug("Trafodion Recovery Endpoint Coprocessor:EEE Clean up indoubt transactions in data lists");
*/
}
/**
* Obtains a scanner lease id
* @param long scannerId
* @return String
*/
private String getScannerLeaseId(final long scannerId) {
String lstring = m_Region.getRegionInfo().getRegionNameAsString() + scannerId;
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: getScannerLeaseId -- EXIT txId: "
+ scannerId + " lease string " + lstring);
return m_Region.getRegionInfo().getRegionNameAsString() + scannerId;
}
/**
* Obtains a transactional lease id
* @param long transactionId
* @return String
*/
private String getTransactionalUniqueId(final long transactionId) {
if (LOG.isTraceEnabled()) {
String lstring = m_Region.getRegionInfo().getRegionNameAsString() + transactionId;
LOG.trace("SsccRegionEndpoint coprocessor: getTransactionalUniqueId -- EXIT txId: "
+ transactionId + " transactionsById size: "
+ transactionsById.size() + " name " + lstring);
}
return m_Region.getRegionInfo().getRegionNameAsString() + transactionId;
}
/**
* Formats a cleanup message for a Throwable
* @param Throwable t
* @param String msg
* @return Throwable
*/
private Throwable cleanup(final Throwable t, final String msg) {
if (t instanceof NotServingRegionException) {
if (LOG.isTraceEnabled()) LOG.trace("NotServingRegionException; " + t.getMessage());
return t;
}
if (msg == null) {
LOG.error("cleanup message was null");
} else {
LOG.error("cleanup message was " + msg);
}
return t;
}
private IOException convertThrowableToIOE(final Throwable t) {
return convertThrowableToIOE(t, null);
}
/*
* @param t
*
* @param msg Message to put in new IOE if passed <code>t</code>
* is not an IOE
*
* @return Make <code>t</code> an IOE if it isn't already.
*/
private IOException convertThrowableToIOE(final Throwable t, final String msg) {
return (t instanceof IOException ? (IOException) t : msg == null
|| msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
}
/**
* Checks if the file system is available
* @return boolean
*/
public boolean checkFileSystem() {
if (this.fs != null) {
try {
FSUtils.checkFileSystemAvailable(this.fs);
} catch (IOException e) {
if (LOG.isTraceEnabled()) LOG.trace("File System not available threw IOException " + e.getMessage());
return false;
}
}
return true;
}
/**
* Prepares the family keys if the scan has no families defined
* @param Scan scan
* @throws IOException
*/
public void prepareScanner(Scan scan) throws IOException {
if(!scan.hasFamilies()) {
for(byte[] family: this.m_Region.getTableDesc().getFamiliesKeys()){
scan.addFamily(family);
}
}
}
/**
* Checks if the row is within this region's row range
* @param byte[] row
* @param String op
* @throws IOException
*/
public void checkRow(final byte [] row, String op) throws IOException {
if(!HRegion.rowIsInRange(this.regionInfo, row)) {
throw new WrongRegionException("Requested row out of range for " +
op + " on HRegion " + this + ", startKey='" +
Bytes.toStringBinary(this.regionInfo.getStartKey()) + "', getEndKey()='" +
Bytes.toStringBinary(this.regionInfo.getEndKey()) + "', row='" +
Bytes.toStringBinary(row) + "'");
}
}
/**
* Removes the scanner associated with the specified ID from the internal
* id->scanner TransactionalRegionScannerHolder map
*
* @param long scannerId
* @return a Scanner or throws UnknownScannerException
* @throws UnknownScannerException
*/
protected synchronized RegionScanner removeScanner(long scannerId)
throws UnknownScannerException {
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: removeScanner scanners map is " + scanners + ", count is " + scanners.size());
TransactionalRegionScannerHolder rsh =
scanners.remove(scannerId);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: removeScanner scanners map is " + scanners + ", count is " + scanners.size());
if (rsh != null)
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: removeScanner rsh is " + rsh + "rsh.s is" + rsh.s );
return rsh.s;
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: removeScanner rsh is null");
throw new UnknownScannerException(
"ScannerId: " + scannerId + ", already closed?");
}
}
/**
* Adds a region scanner to the TransactionalRegionScannerHolder map
* @param RegionScanner s
* @param HRegion r
* @return long
* @throws LeaseStillHeldException
*/
#ifdef APACHE1.2 CDH5.7
protected synchronized long addScanner(long transId, RegionScanner s, Region r)
#else
protected synchronized long addScanner(long transId, RegionScanner s, HRegion r)
#endif
throws LeaseStillHeldException {
long scannerId = performScannerId.getAndIncrement();
TransactionalRegionScannerHolder rsh =
new TransactionalRegionScannerHolder(transId, scannerId, s,r);
if (rsh != null)
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: scannerId is " + scannerId + ", addScanner rsh is " + rsh);
else
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: scannerId is " + scannerId + ", addScanner rsh is null");
TransactionalRegionScannerHolder existing =
scanners.putIfAbsent(scannerId, rsh);
if (LOG.isTraceEnabled()) LOG.trace("SsccRegionEndpoint coprocessor: addScanner scanners map is " + scanners + ", count is " + scanners.size());
/*
scannerLeases.createLease(getScannerLeaseId(scannerId),
this.scannerLeaseTimeoutPeriod,
new TransactionalScannerListener(scannerId));
*/
return scannerId;
}
/**
* * Instantiated as a scanner lease. If the lease times out, the scanner is
* * closed
* */
/*
private class TransactionalScannerListener implements LeaseListener {
private final long scannerId;
TransactionalScannerListener(final long id) {
this.scannerId = id;
}
@Override
public void leaseExpired() {
TransactionalRegionScannerHolder rsh = scanners.remove(this.scannerId);
if (rsh != null) {
RegionScanner s = rsh.s;
if (LOG.isTraceEnabled()) LOG.trace("Scanner " + this.scannerId + " lease expired on region "
+ s.getRegionInfo().getRegionNameAsString());
try {
HRegion region = rsh.r;
s.close();
} catch (IOException e) {
LOG.error("Closing scanner for "
+ s.getRegionInfo().getRegionNameAsString(), e);
}
} else {
if (LOG.isTraceEnabled()) LOG.trace("Scanner " + this.scannerId + " lease expired");
}
}
}
*/
/**
* Formats the throwable stacktrace to a string
* @param Throwable e
* @return String
*/
public String stackTraceToString(Throwable e) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : e.getStackTrace()) {
sb.append(element.toString());
sb.append("\n");
}
return sb.toString();
}
/**
* Returns the Scanner Leases for this coprocessor
* @return Leases
*/
//synchronized protected Leases getScannerLeases() {
// return this.scannerLeases;
//}
/**
* Returns the Leases for this coprocessor
* @return Leases
*/
synchronized protected Leases getTransactionalLeases() {
return this.transactionLeases;
}
/**
* Removes unneeded committed transactions
*/
synchronized public void removeUnNeededCommitedTransactions() {
Long minStartSeqNumber = getMinStartSequenceNumber();
if (minStartSeqNumber == null) {
minStartSeqNumber = Long.MAX_VALUE;
}
int numRemoved = 0;
synchronized (commitedTransactionsBySequenceNumber) {
for (Entry<Long, SsccTransactionState> entry : new LinkedList<Entry<Long, SsccTransactionState>>(
commitedTransactionsBySequenceNumber.entrySet())) {
if (entry.getKey() >= minStartSeqNumber) {
break;
}
numRemoved = numRemoved
+ (commitedTransactionsBySequenceNumber.remove(entry
.getKey()) == null ? 0 : 1);
numRemoved++;
}
}
/*
StringBuilder traceMessage = new StringBuilder();
if (numRemoved > 0) {
traceMessage.append("Removed [").append(numRemoved)
.append("] commited transactions");
if (minStartSeqNumber == Integer.MAX_VALUE) {
traceMessage.append(" with any sequence number.");
} else {
traceMessage.append(" with sequence lower than [")
.append(minStartSeqNumber).append("].");
}
if (!commitedTransactionsBySequenceNumber.isEmpty()) {
traceMessage.append(" Still have [")
.append(commitedTransactionsBySequenceNumber.size())
.append("] left.");
} else {
traceMessage.append(" None left.");
}
if (LOG.isTraceEnabled()) LOG.trace(traceMessage.toString());
} else if (commitedTransactionsBySequenceNumber.size() > 0) {
traceMessage.append("Could not remove any transactions, and still have ")
.append(commitedTransactionsBySequenceNumber.size())
.append(" left");
if (LOG.isTraceEnabled()) LOG.trace(traceMessage.toString());
}
*/
}
/**
* Returns the minimum start sequence number
* @return Long
*/
private Long getMinStartSequenceNumber() {
List<SsccTransactionState> transactionStates;
synchronized (transactionsById) {
transactionStates = new ArrayList<SsccTransactionState>(
transactionsById.values());
}
Long min = null;
for (SsccTransactionState transactionState : transactionStates) {
if (min == null || transactionState.getStartSequenceNumber() < min) {
min = transactionState.getStartSequenceNumber();
}
}
return min;
}
/**
* Returns the region name as a string
* @return String
*/
public String getRegionNameAsString() {
return this.m_Region.getRegionInfo().getRegionNameAsString();
}
/**
* Simple helper class that just keeps track of whether or not its stopped.
*/
private static class StoppableImplementation implements Stoppable {
private volatile boolean stop = false;
@Override
public void stop(String why) {
this.stop = true;
}
@Override
public boolean isStopped() {
return this.stop;
}
}
}
//1}