blob: 33f54546730374aabaf986a816f83978fdcdddcc [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.apache.hadoop.hbase.coprocessor.transactional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState;
import org.apache.hadoop.hbase.wal.WAL;
//import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.client.DtmConst;
import org.apache.hadoop.hbase.client.SsccConst;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Mutation;
import java.util.ListIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.fs.FileSystem;
#ifdef HDP2.3 APACHE1.1 CDH5.5 CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.regionserver.ScannerContext;
#endif
#ifdef HDP2.3 APACHE1.1 CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.regionserver.Region;
#endif
public class TrxRegionObserver extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(TrxRegionObserver.class);
public static final String trxkey = "TRAFODION";
public static final String trxkeytransactionsById = "transactionsById";
public static final String trxkeycommitedTransactionsBySequenceNumber = "commitedTransactionsBySequenceNumber";
public static final String trxkeycommitPendingTransactions = "commitPendingTransactions";
public static final String trxkeypendingTransactionsById = "pendingTransactionsById";
public static final String trxkeyindoubtTransactionsCountByTmid = "indoubtTransactionsCountByTmid";
public static final String trxkeyCheckBlockAllVar = "checkBlockAllVar";
public static final String trxkeyCheckBlockNonPhase2Var = "checkBlockNonPhase2Var";
public static final String trxkeyCheckBlockNewTransVar = "checkBlockNewTransVar";
public static final String trxkeyClosingVar = "checkClosingVariable";
public static final String trxkeyScanners = "trxScanners";
public static final String SPLIT_DELAY_NOFLUSH = "hbase.transaction.split.delay.noflush";
public static final String SPLIT_DELAY_LIMIT = "hbase.transaction.split.delay.limit";
public static final String EARLY_DRAIN = "hbase.transaction.split.drain.early";
public static final String ACTIVE_DELAY_LEN = "hbase.transaction.split.active.delay";
public static final String PENDING_DELAY_LEN = "hbase.transaction.split.pending.delay";
public static final int TS_ACTIVE = 0;
public static final int TS_COMMIT_REQUEST = 1;
public static final int TS_COMMIT = 2;
public static final int TS_ABORT = 3;
public static final int TS_CONTROL_POINT_COMMIT = 4;
public static final int TS_REGION_TX_ACTIVE = 5;
public static final int TS_REGION_TX_COMMIT_REQUEST = 6;
public static final int TS_REGION_TX_COMMIT = 7;
public static final byte TS_TRAFODION_TXN_TAG_TYPE = 41;
public static final int ACTIVETXN_DELAY_DEFAULT = 15 * 1000;
public static final int PENDINGTXN_DELAY_DEFAULT = 500;
public static final int SPLIT_DELAY_DEFAULT = 360;
// In-doubt transaction list during recovered WALEdit replay
private SortedMap<Long, List<WALEdit>> pendingTransactionsById = new TreeMap<Long, List<WALEdit>>();//WALEdit>();
// Array to hold the number of indoubt transactions for each TM
private Map<Integer, Integer> indoubtTransactionsCountByTmid = new TreeMap<Integer,Integer>();
// Map for Transactional Region to exchange data structures between Region Observer coprocessor and Endpoint Coprocessor
static ConcurrentHashMap<String, Object> transactionsRefMap = new ConcurrentHashMap<String, Object>();
private ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners =
new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
static ConcurrentHashMap<String, Object> trxRegionMap;
private ConcurrentHashMap<Long, TrxTransactionState> transactionsById = new ConcurrentHashMap<Long, TrxTransactionState>();
private SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumber = Collections.synchronizedSortedMap(new TreeMap<Long, TrxTransactionState>());
private Set<TrxTransactionState> commitPendingTransactions = Collections.synchronizedSet(new HashSet<TrxTransactionState>());
private AtomicBoolean blockAll = new AtomicBoolean(false);
private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false);
private AtomicBoolean blockNewTrans = new AtomicBoolean(false);
private AtomicBoolean closing = new AtomicBoolean(false);
private boolean hasClosed = false;
private boolean hasFlushed = false;
private boolean m_isTrafodionMetadata = false;
Configuration my_config;
#ifdef APACHE1.2 CDH5.7
Region my_Region;
#else
HRegion my_Region;
#endif
HRegionInfo regionInfo;
WAL tHLog;
ServerName sn;
String hostName;
int port;
int splitDelayLimit;
boolean earlyDrain;
boolean splitDelayNoFlush;
int activeDelayLen;
int pendingDelayLen;
long activeCount = 0;
long abortCount = 0;
long commitCount = 0;
long commitRequestCount = 0;
long regionTxCommitCount = 0;
long regionTxCommitRequestCount = 0;
long totalEdits = 0;
long skippedEdits = 0;
int cleanAT = 0;
long minSeqID = 0;
int flushCount = 0;
int regionState = 0;
FileSystem fs = null;
private Object recoveryCheckLock = new Object();
private Object editReplay = new Object();
public static String zTrafPath = "/hbase/Trafodion/";
private static String zNodePath = zTrafPath + "recovery/";
private static ZooKeeperWatcher zkw1 = null;
private static Object zkRecoveryCheckLock = new Object();
String m_regionDetails = null;
SplitBalanceHelper sbHelper;
RegionServerServices rss;
// Region Observer Coprocessor START
@Override
public void start(CoprocessorEnvironment e) throws IOException {
trxRegionMap = TrxRegionEndpoint.getRegionMap();
RegionCoprocessorEnvironment regionCoprEnv = (RegionCoprocessorEnvironment)e;
RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
#ifdef APACHE1.2 CDH5.7
my_Region = re.getRegion();
#else
my_Region = (HRegion) re.getRegion();
#endif
regionInfo = my_Region.getRegionInfo();
String lv_regionName = this.regionInfo.getRegionNameAsString();
String skey = (Bytes.equals(this.regionInfo.getStartKey(), HConstants.EMPTY_START_ROW)) ? "skey=null" : ("skey=" + Hex.encodeHexString(regionInfo.getStartKey()));
String ekey = (Bytes.equals(this.regionInfo.getEndKey(), HConstants.EMPTY_END_ROW)) ? "ekey=null" : ("ekey=" + Hex.encodeHexString(regionInfo.getEndKey()));
m_regionDetails = new String(lv_regionName + "," + skey + "," + ekey);
m_isTrafodionMetadata = lv_regionName.contains("_MD_");
LOG.info ("TRX RegionObserver coprocessor, "
+ " region: " + m_regionDetails
+ " isTrafodionMD: " + m_isTrafodionMetadata
);
this.my_config = regionCoprEnv.getConfiguration();
this.fs = FileSystem.get(my_config);
org.apache.hadoop.conf.Configuration conf = regionCoprEnv.getConfiguration();
this.splitDelayLimit = conf.getInt(SPLIT_DELAY_LIMIT, SPLIT_DELAY_DEFAULT);
this.activeDelayLen = conf.getInt(ACTIVE_DELAY_LEN, ACTIVETXN_DELAY_DEFAULT);
this.pendingDelayLen = conf.getInt(PENDING_DELAY_LEN, PENDINGTXN_DELAY_DEFAULT);
this.earlyDrain = conf.getBoolean(EARLY_DRAIN, false);
this.splitDelayNoFlush = conf.getBoolean(SPLIT_DELAY_NOFLUSH, false);
if (LOG.isTraceEnabled()) {
LOG.trace("Properties for -- " + lv_regionName);
LOG.trace("Property: splitDelayLimit = " + this.splitDelayLimit);
LOG.trace("Property: activeDelayLen = " + this.activeDelayLen);
LOG.trace("Property: pendingDelayLen = " + this.pendingDelayLen);
LOG.trace("Property: earlyDrain = " + this.earlyDrain);
LOG.trace("Property: splitDelayNoFlush = " + this.splitDelayNoFlush);
}
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver load start ");
if (!re.getSharedData().containsKey(trxkey)) {
// there is a short race here, in the worst case we create a watcher that will be notified once
re.getSharedData().putIfAbsent(trxkey, transactionsRefMap);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver put data structure into CoprocessorEnvironment ");
}
rss = re.getRegionServerServices();
tHLog = rss.getWAL(regionInfo);
sn = rss.getServerName();
hostName = sn.getHostname();
port = sn.getPort();
if (LOG.isTraceEnabled()) LOG.trace("Hostname " + hostName + " port " + port);
zkw1 = rss.getZooKeeper();
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) {
LOG.info("Trafodion Recovery Region Observer CP: HRegion " + lv_regionName + " starts to put transactional data lists into reference map ... ");
if(transactionsRefMap.isEmpty()) {
LOG.info("Empty Shared Map, will put objects -- TrxRegionObserver.");
}
}
@SuppressWarnings("unchecked")
SortedMap<Long, List<WALEdit>> pendingTransactionsByIdCheck = (SortedMap<Long, List<WALEdit>>)transactionsRefMap
.get(lv_regionName+trxkeypendingTransactionsById);
if(pendingTransactionsByIdCheck != null) {
this.pendingTransactionsById = pendingTransactionsByIdCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeypendingTransactionsById, this.pendingTransactionsById);
}
@SuppressWarnings("unchecked")
Map<Integer, Integer> indoubtTransactionsCountByTmidCheck = (Map<Integer, Integer>)transactionsRefMap
.get(lv_regionName+trxkeyindoubtTransactionsCountByTmid);
if(indoubtTransactionsCountByTmidCheck != null) {
this.indoubtTransactionsCountByTmid = indoubtTransactionsCountByTmidCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyindoubtTransactionsCountByTmid, this.indoubtTransactionsCountByTmid);
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<Long, TrxTransactionState> transactionsByIdCheck = (ConcurrentHashMap<Long, TrxTransactionState>)
transactionsRefMap
.get(lv_regionName+trxkeytransactionsById);
if(transactionsByIdCheck != null) {
this.transactionsById = transactionsByIdCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeytransactionsById, this.transactionsById);
}
@SuppressWarnings("unchecked")
SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumberCheck = (SortedMap<Long, TrxTransactionState>)
transactionsRefMap
.get(lv_regionName+trxkeycommitedTransactionsBySequenceNumber);
if(commitedTransactionsBySequenceNumberCheck != null) {
this.commitedTransactionsBySequenceNumber = commitedTransactionsBySequenceNumberCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeycommitedTransactionsBySequenceNumber, this.commitedTransactionsBySequenceNumber);
}
@SuppressWarnings("unchecked")
Set<TrxTransactionState> commitPendingTransactionsCheck = (Set<TrxTransactionState>)transactionsRefMap
.get(lv_regionName+trxkeycommitPendingTransactions);
if(commitPendingTransactionsCheck != null) {
this.commitPendingTransactions = commitPendingTransactionsCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeycommitPendingTransactions,
this.commitPendingTransactions);
}
AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsRefMap
.get(lv_regionName+trxkeyCheckBlockAllVar);
if(blockAllCheck != null) {
this.blockAll = blockAllCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyCheckBlockAllVar, this.blockAll);
}
AtomicBoolean closingCheck = (AtomicBoolean)transactionsRefMap
.get(lv_regionName+trxkeyClosingVar);
if(closingCheck != null) {
this.closing = closingCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyClosingVar, this.closing);
}
AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsRefMap
.get(lv_regionName+trxkeyCheckBlockNonPhase2Var);
if(blockNonPhase2Check != null) {
this.blockNonPhase2 = blockNonPhase2Check;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyCheckBlockNonPhase2Var, this.blockNonPhase2);
}
AtomicBoolean blockNewTransCheck = (AtomicBoolean)transactionsRefMap
.get(lv_regionName+trxkeyCheckBlockNewTransVar);
if(blockNewTransCheck != null) {
this.blockNewTrans = blockNewTransCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyCheckBlockNewTransVar,this.blockNewTrans);
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
(ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsRefMap
.get(lv_regionName+trxkeyScanners);
if(scannersCheck != null) {
this.scanners = scannersCheck;
}
else {
transactionsRefMap.put(lv_regionName+trxkeyScanners, this.scanners);
}
sbHelper = new SplitBalanceHelper((HRegion)my_Region, zkw1, conf);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver load start complete");
} // end of start
static ConcurrentHashMap<String, Object> getRefMap() {
return transactionsRefMap;
} // end of getRefmap
// Region Observer Coprocessor preWALRestore, called in HRegion ReplayRecoveredEdits
@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
String lv_regionName = logKey.getTablename().getNameAsString();
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: preWALRestore coprocessor is invoked ... in table "+ logKey.getTablename().getNameAsString());
ArrayList<Cell> kvs = logEdit.getCells();
if (kvs.size() <= 0) {
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP:PWR00 No KV inside Edits, skip ... "+ lv_regionName);
return;
}
// Retrieve KV to see if it has the Trafodion Transaction context tag
Cell kv = kvs.get(0); // get the first KV to check the associated transactional tag (all the KV pairs contain the same tag)
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("KV hex dump " + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/));
byte[] tagArray = Bytes.copy(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
byte tagType = TS_TRAFODION_TXN_TAG_TYPE;
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: "+ lv_regionName +" tag array hex dump " + Hex.encodeHexString(tagArray));
Tag tag = Tag.getTag(tagArray, 0, kv.getTagsLength(), tagType); //TagType.TS_TRAFODION_TXN_TAG_TYPE
if (tag == null) {
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: there is no desired transactional tag in KV, skip ... "+ lv_regionName);
return;
}
byte[] b = tag.getBuffer();
int offset = Tag.TYPE_LENGTH_SIZE + Tag.TAG_LENGTH_SIZE;
int version = Bytes.toInt(b,offset);
int op = Bytes.toInt(b,Bytes.SIZEOF_INT+offset);
long tid = Bytes.toLong(b,Bytes.SIZEOF_INT+Bytes.SIZEOF_INT+offset);
long logSeqId = Bytes.toLong(b,Bytes.SIZEOF_INT+Bytes.SIZEOF_INT+Bytes.SIZEOF_LONG+offset);
ArrayList <WALEdit> editList;
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP:PWR01 Find transactional tag within Edits for " + lv_regionName + " tid " + tid + " op " + op + " log seq " + logSeqId + " version " + version);
//Trafodion Recovery : process each edit according to its nature (prepare, commit, abort)
switch (op) {
case TS_ACTIVE:
if (pendingTransactionsById.containsKey(tid)) {
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " processing active edit for transaction: " + tid
+ ", already find a previous edit with that id");
//get the editList from pendingTransactionsById, and add the logEdit into the editList
editList = (ArrayList<WALEdit>) (pendingTransactionsById.get(tid));
editList.add(logEdit);
activeCount++;
// no need to do the "put" back to the map since it gets the reference already
}
else {
// create the editList
editList = new ArrayList<WALEdit>();
editList.add(logEdit);
pendingTransactionsById.put(tid, editList);
activeCount++;
if (LOG.isDebugEnabled()) LOG.debug("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " find in-doubt transaction " + tid + " in active state");
}
break;
case TS_COMMIT_REQUEST: // Replace it to ACTIVE
if (pendingTransactionsById.containsKey(tid)) {
LOG.info("TRAF RCOV Region Observer CP: " + regionInfo.getRegionNameAsString() + " processing commit request for transaction: " + tid
+ ", already find a previous edit with that id");
//throw new IOException("Corrupted transaction log");
// get the editList from pendingTransactionsById, and add the logEdit into the editList
editList = (ArrayList<WALEdit>) (pendingTransactionsById.get(tid));
editList.add(logEdit);
// no need to do the "put" back to the map since it gets the reference already
}
else {
// create the editList
editList = new ArrayList<WALEdit>();
editList.add(logEdit);
pendingTransactionsById.put(tid, editList);
//pendingTransactionsById.put(tid, logEdit);
commitRequestCount++;
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " find in-doubt transaction " + tid + " in prepared state");
}
break;
case TS_ABORT:
if (!pendingTransactionsById.containsKey(tid)) {
LOG.info("TRAF RCOV Region Observer CP: " + regionInfo.getRegionNameAsString()
+ " processing abort for transaction: " + tid
+ ", but don't find a previous edit with that id");
abortCount++;
break;
//throw new IOException("Corrupted transaction log");
}
pendingTransactionsById.remove(tid);
abortCount++;
break;
case TS_COMMIT:
if (!pendingTransactionsById.containsKey(tid)) {
LOG.warn("TRAF RCOV Region Observer CP: " + regionInfo.getRegionNameAsString()
+ " processing commit for transaction: " + tid
+ ", but don't find a previous edit with that id");
commitCount++;
break;
//throw new IOException("Corrupted transaction log");
}
editList = (ArrayList<WALEdit>) (pendingTransactionsById.get(tid));
if ((m_isTrafodionMetadata) || (LOG.isTraceEnabled())) LOG.info("TRAF RCOV Region Observer CP: "
+ regionInfo.getRegionNameAsString() + " find in-doubt transaction " + tid + " to commit");
replayCommittedTransaction(tid, editList);
pendingTransactionsById.remove(tid);
commitCount++;
break;
case TS_CONTROL_POINT_COMMIT:
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("AAA3 get CP edit : " + tid
+ " region " + regionInfo.getRegionNameAsString());
ArrayList<WALEdit> tempEditList = new ArrayList<WALEdit>();
tempEditList.add(logEdit);
replayCommittedTransaction(tid, tempEditList);
if (pendingTransactionsById.containsKey(tid)) pendingTransactionsById.remove(tid);
commitCount++;
break;
case TS_REGION_TX_COMMIT:
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("TRAF RCOV Region Observer CP: Region Transaction commit " + tid
+ " region " + regionInfo.getRegionNameAsString());
regionTxCommitCount++;
break;
case TS_REGION_TX_COMMIT_REQUEST: // Treat as commit
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("TRAF RCOV Region Observer CP: Region Transaction commitRequest " + tid
+ " region " + regionInfo.getRegionNameAsString());
regionTxCommitRequestCount++;
// create the editList
editList = new ArrayList<WALEdit>();
editList.add(logEdit);
replayCommittedTransaction(tid, editList);
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " find in-doubt REGION transaction " + tid + " in prepared state");
break;
default:
throw new IllegalStateException("Trafodion Recovery Region Observer CP: Unexpected log entry type detected in audit replay");
} // switch op
totalEdits++;
if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV PREWAL Region Observer CP: " + regionInfo.getRegionNameAsString()
+ " Edits replay read " + totalEdits + " transactional operations (skipped " + skippedEdits
+ " because sequence id <= " + minSeqID + "): " + commitRequestCount + " commitRequests, "
+ regionTxCommitRequestCount + " regionTxCommitRequests, " + abortCount + " aborts, " + commitCount
+ " commits, " + regionTxCommitCount + " regionTxCommits, and " + flushCount + " flushes.");
//recovery patch
env.complete(); // do not need to invoke further down coprocessor
env.bypass();
} // end of preWALRestore
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
if(!this.splitDelayNoFlush) {
@SuppressWarnings("rawtypes")
TrxRegionEndpoint tre = (TrxRegionEndpoint)trxRegionMap.get(regionInfo.getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
if(tre == null) {
LOG.error("Unable to obtain TrxRegionEndpoint object from shared map for " + regionInfo.getRegionNameAsString());
}
else {
Path readPath = null;
StringBuilder sbPath = new StringBuilder();
if(sbHelper.getSplit(sbPath)) {
sbHelper.clearSplit();
}
else if(sbHelper.getBalance(sbPath)) {
readPath = new Path(sbPath.toString());
try {
tre.readTxnInfo(readPath);
} catch(IOException ioe) {
if (LOG.isErrorEnabled()) LOG.error("Unable to read Transactional Info for balance coordination: " + ioe);
}
sbHelper.clearBalance();
}
}
}
// Trafodion Recovery : after Open, we should have already constructed all the indoubt transactions in
// pendingTransactionsById now process it and construct transaction list by TM id. These two data
// structures are put in the reference map which is shared with TrxRegionEndpoint coprocessor class per region
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: postOpen coprocessor is invoked ... in region "+ regionInfo.getRegionNameAsString());
// discard any in-doubt transaction if ENV (likely the property XML) indicates (should be set in start)
// just ignore it for now
if (cleanAT == 1) {
if ((pendingTransactionsById != null) && (pendingTransactionsById.size() > 0)) {
LOG.info("Trafodion Recovery Region Observer CP: TM clean AT mode " + cleanAT + " discards " + pendingTransactionsById.size() + " in-doubt transaction ");
pendingTransactionsById.clear();
}
}
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Region " + regionInfo.getRegionNameAsString() + " is in state RECOVERING ");
// remove split-log under region dir if TRegion has been recovered competely
if ((pendingTransactionsById == null) || (pendingTransactionsById.size() == 0)) {
// call method startRegionAfterRecovery to 1) archive the split-thlog, and 2) set region state = STARTED
LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " has no in-doubt transaction, set region START ");
try {
startRegionAfterRecovery();
} catch (IOException exp) {
LOG.error("Trafodion Recovery Region Observer CP:Flush failed after postOpen flush" + regionInfo.getRegionNameAsString());
}
return;
}
LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " find " + pendingTransactionsById.size() +
" in-doubt transaction during edit replay, now reconstruct transaction state ");
//for each indoubt transaction from pendingTransactionsById, build related transaction state object and add it into required lists for endPoint
//build a list of TMs for in-doubt transactions, -2 is used for all peer's transactions
for (Entry<Long, List<WALEdit>> entry : pendingTransactionsById.entrySet()) {
synchronized (recoveryCheckLock) {
long transactionId = entry.getKey();
String key = String.valueOf(transactionId);
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " process in-doubt transaction " + transactionId);
int tmid = (int) TransactionState.getNodeId(transactionId);
int count = 1;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " add prepared " + transactionId + " to TM " + tmid);
if (indoubtTransactionsCountByTmid.containsKey(tmid))
count = (int) indoubtTransactionsCountByTmid.get(tmid) + 1;
indoubtTransactionsCountByTmid.put(tmid, count);
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " has " + count +
" in-doubt-transaction from TM " + tmid);
//TBD may need to write the LOG again for reinstated txn (redo does not generate edits)
//since after open, HBase may toss out split-log while there are indoubt list in memory
//if the system crash again, those indoubt could be lost (so write them out), a recovery comletion should
//lead HBase to flush the memstore and reset the logSequenceId --> need to verify about the failure during recovery case
//it should be idempotent
} // synchronized
} // for all txns in indoubt transcation list
// Now we need to inform TM through ZK (TBD here may need to check if 0.98 requires new APIs to construct region info
byte [] lv_byte_region_info = regionInfo.toByteArray();
String lv_encoded = regionInfo.getEncodedName();
// loop for every tm, call createzNode (tmid, region encoded name, zNodedata)
for (int node : indoubtTransactionsCountByTmid.keySet()) {
try {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Create Recovery zNode TM " + node + " region encoded name " + lv_encoded + " region info bytes " + new String(lv_byte_region_info));
createRecoveryzNode(node, lv_encoded, lv_byte_region_info);
} catch (IOException exp) {
LOG.error("Trafodion Recovery Region Observer CP postOpen: ZKW Create recovery zNode failed ", exp);
}
}
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Complete post of recovery zNode for region info " + new String(lv_byte_region_info));
// Flush the cache (since we don't do it during replay committed transactions) and may need to re-write all the edits for in-doubt txn
// in case the failure occurred again before the resolution
/*
try {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Flushing cache in postOpen " + regionInfo.getRegionNameAsString());
HRegion.FlushResult fr = my_Region.flushcache();
if (!fr.isFlushSucceeded()) {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Flushcache returns false !!! " + regionInfo.getRegionNameAsString());
}
} catch (IOException exp) {
LOG.error("Trafodion Recovery: Flush failed after replay edits" + regionInfo.getRegionNameAsString());
return;
}
*/
} // end of postOpen
public void replayCommittedTransaction(long transactionId, ArrayList<WALEdit> editList) throws IOException {
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " replay commit for transaction: " + transactionId);
int num = editList.size();
if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV Region Observer CP: " + regionInfo.getRegionNameAsString() + " replay commit for transaction: "
+ transactionId + " with editList size is " + num);
for ( int i = 0; i < num; i++){
WALEdit val = editList.get(i);
for (Cell kv : val.getCells()) {
synchronized (editReplay) {
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: " + regionInfo.getRegionNameAsString() + " replay commit for transaction: "
+ transactionId + " edit num " + i + " with Op " + kv.getTypeByte());
if (kv.getTypeByte() == KeyValue.Type.Put.getCode()) {
Put put = new Put(CellUtil.cloneRow(kv), kv.getTimestamp()); // kv.getRow()
put.add(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp(), CellUtil.cloneValue(kv));
//put.add(kv);
my_Region.put(put); // let it generate new edits at this moment
} else if (CellUtil.isDelete(kv)) {
Delete del = new Delete(CellUtil.cloneRow(kv), kv.getTimestamp());
if (CellUtil.isDeleteFamily(kv)) {
del.addFamily(CellUtil.cloneFamily(kv));
} else if (CellUtil.isDeleteType(kv)) {
del.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
}
my_Region.delete(del);
}
} // synchronized of editReplay
} // for-loop (KeyValues)
} // for-loop (edits)
} // end of replayCommittedTransaction
public void startRegionAfterRecovery() throws IOException {
boolean isFlush = false;
//TBD
// if we have indoubt transaction, do we need to rewrite back to HLOG, otherwise, if the system crash in the middle of recovery
// we could lose the memory and HLOG, but the split-log may already be removed after region open
// if flush succeeds, then it is not necessary
// regionState = 2; // region started, Endpoint coprocessor can set region state STARTED if it detects there is no indoubt txn
if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: Region " + my_Region.getRegionInfo().getEncodedName() + " is STARTED.");
} // end of startRegionAfterRecovery
public void createRecoveryzNode(int node, String encodedName, byte [] data) throws IOException {
synchronized(zkRecoveryCheckLock) {
// default zNodePath for recovery
String zNodeKey = hostName + "," + port + "," + encodedName;
StringBuilder sb = new StringBuilder();
sb.append("TM");
sb.append(node);
String str = sb.toString();
String zNodePathTM = zNodePath + str;
String zNodePathTMKey = zNodePathTM + "/" + zNodeKey;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Post region recovery znode" + node + " zNode Path " + zNodePathTMKey);
// create zookeeper recovery zNode, call ZK ...
try {
if (ZKUtil.checkExists(zkw1, zNodePathTM) == -1) {
// create parent nodename
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW create parent zNodes " + zNodePathTM);
ZKUtil.createWithParents(zkw1, zNodePathTM);
}
ZKUtil.createAndFailSilent(zkw1, zNodePathTMKey, data);
} catch (KeeperException e) {
if (LOG.isErrorEnabled()) LOG.error("Trafodion Recovery Region Observer CP: ZKW Unable to create recovery zNode to TM " + node + " ", e);
throw new IOException("Trafodion Recovery Region Observer CP: ZKW Unable to create recovery zNode to TM, throw IOException " + node, e);
}
}
} // end of createRecoveryzNode
@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException {
if(LOG.isTraceEnabled()) LOG.trace("preSplit -- ENTRY region: " + regionInfo.getRegionNameAsString());
if(LOG.isTraceEnabled()) LOG.trace("preSplit -- transactionsById (" + transactionsById.size() + " ), commitPendingTransactions (" + commitPendingTransactions.size() +"), scanners (" + scanners.size() + ")");
blockNewTrans.set(true);
if(splitDelayNoFlush) {
if(!this.earlyDrain)
sbHelper.activeWait(transactionsById, activeDelayLen, splitDelayLimit);
closing.set(true);
sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen);
}
else {
blockNonPhase2.set(true);
closing.set(true);
sbHelper.pendingAndScannersWait(commitPendingTransactions, scanners, transactionsById, pendingDelayLen);
sbHelper.setSplit();
}
blockAll.set(true);
if(LOG.isTraceEnabled()) LOG.trace("preSplit -- EXIT region: " + regionInfo.getRegionNameAsString());
}
@Override
#ifdef HDP2.3 APACHE1.1 CDH5.7 APACHE1.2
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r) {
#else
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) {
#endif
if(LOG.isTraceEnabled()) LOG.trace("postSplit -- ENTRY");
if(splitDelayNoFlush)
return;
@SuppressWarnings("rawtypes")
TrxRegionEndpoint treL = (TrxRegionEndpoint)trxRegionMap.get(l.getRegionInfo().getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
@SuppressWarnings("rawtypes")
TrxRegionEndpoint treR = (TrxRegionEndpoint)trxRegionMap.get(r.getRegionInfo().getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
if(treL == null || treR == null) {
LOG.error("Unable to obtain TrxRegionEndpoint object from shared map for " + regionInfo.getRegionNameAsString());
}
else {
try {
// don't need to set NewTrans flag because blockNonPhase2 will catch up
treL.setBlockAll(true);
treR.setBlockAll(true);
treL.setClosing(true);
treR.setClosing(true);
Thread readThread = new Thread(new TxnReadThread(treL, sbHelper.getPath(), true));
readThread.start();
//treL.readTxnInfo(sbHelper.getPath(), true);
treR.readTxnInfo(sbHelper.getPath(), true);
readThread.join();
treL.setBlockAll(false);
treR.setBlockAll(false);
treL.setBlockNonPhase2(false);
treR.setBlockNonPhase2(false);
treL.setNewTrans(false);
treR.setNewTrans(false);
treL.setClosing(false);
treR.setClosing(false);
sbHelper.clearSplit();
} catch (IOException ioe) {
if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction Info for transactional split coordination: " + ioe);
} catch (InterruptedException ie) {
if(LOG.isErrorEnabled()) LOG.error("Thread issue hit while trying to read Transaction Info for transactional split coordination: " + ie);
}
}
}
@Override
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) {
LOG.info("preClose - " + regionInfo.getRegionNameAsString() + " - transactionsById (" + transactionsById.size()
+ " ), committed Transactions (" + commitedTransactionsBySequenceNumber.size()
+"), scanners (" + scanners.size() + ")");
if (hasFlushed ||
hasClosed ||
splitDelayNoFlush ||
c.getEnvironment().getRegionServerServices().isStopping() ||
c.getEnvironment().getRegionServerServices().isStopped())
return;
if(LOG.isTraceEnabled()) LOG.trace("preClose -- commitPendingTransactions (" + commitPendingTransactions.size() +")");
//This flag indicates if pendingAndScannersWait call completed successfully.
//Setting this flag to true indicates there are no pending scanners or
//commitPendingList outstanding. OR commitPendingList may have transactions
//in commitPending that have the table marked for drop in the same transaction.
boolean commitAndScannersChecked = false;
if (!hasClosed) {
blockNonPhase2.set(true);
if(LOG.isInfoEnabled()) {
#ifdef CDH5.7 APACHE1.2
Region region = c.getEnvironment().getRegion();
#else
HRegion region = (HRegion) c.getEnvironment().getRegion();
#endif
LOG.debug("preClose -- setting close var to true on: " + region.getRegionInfo().getRegionNameAsString());
}
try {
sbHelper.pendingAndScannersWait(commitPendingTransactions, null /*scanners*/, transactionsById, pendingDelayLen);
commitAndScannersChecked = true;
} catch(IOException ioe) {
LOG.error("Encountered exception when calling pendingAndScannersWait(): " + ioe);
}
blockAll.set(true);
closing.set(true);
hasClosed = true;
}
if (c.getEnvironment().getRegionServerServices().isStopping() ||
c.getEnvironment().getRegionServerServices().isStopped() ||
hasFlushed)
return;
if (((transactionsById.size() <= 0) && (commitPendingTransactions.size() <= 0)) ||
commitAndScannersChecked ) {
hasFlushed = true;
commitedTransactionsBySequenceNumber.clear();
return;
}
@SuppressWarnings("rawtypes")
TrxRegionEndpoint tre = (TrxRegionEndpoint)trxRegionMap.get(regionInfo.getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
if(tre == null) {
LOG.error("Unable to obtain TrxRegionEndpoint objet from shared map for " + regionInfo.getRegionNameAsString());
}
else {
try {
tre.flushToFS(sbHelper.getPath());
if(!sbHelper.getSplit()) {
try {
sbHelper.setBalance();
} catch (IOException ioe) {
if(LOG.isErrorEnabled()) LOG.error("Unable to set balance: " + ioe);
}
}
hasFlushed = true;
commitedTransactionsBySequenceNumber.clear();
} catch (IOException ioe) {
if (LOG.isErrorEnabled()) LOG.error("Unable to flush to filesystem");
hasFlushed = false;
}
}
}
@Override
public void postClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) {
String regionName = my_Region.getRegionInfo().getRegionNameAsString();
transactionsRefMap.remove(regionName+trxkeypendingTransactionsById);
transactionsRefMap.remove(regionName+trxkeyindoubtTransactionsCountByTmid);
transactionsRefMap.remove(regionName+trxkeytransactionsById);
transactionsRefMap.remove(regionName+trxkeycommitPendingTransactions);
transactionsRefMap.remove(regionName+trxkeyCheckBlockAllVar);
transactionsRefMap.remove(regionName+trxkeyCheckBlockNonPhase2Var);
transactionsRefMap.remove(regionName+trxkeyCheckBlockNewTransVar);
transactionsRefMap.remove(regionName+trxkeyClosingVar);
transactionsRefMap.remove(regionName+trxkeyScanners);
}
protected InternalScanner getWrappedScanner(final long lowStartId, final InternalScanner s) {
return new InternalScanner() {
int versionCount = 0;
#ifndef CDH5.7 APACHE1.2
@Override
#endif
public boolean next(List<Cell> results) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("preCompact: call next without limit");
}
boolean ret=true;
boolean skip=false;
ConcurrentHashMap<String, Integer> verCountByCol = new ConcurrentHashMap<String,Integer>();
try {
ret = s.next(results);
if(ret == false) return ret;
ListIterator<Cell> cellIter = null;
for( cellIter = results.listIterator(); cellIter.hasNext();) {
Cell c = cellIter.next();
String key=Bytes.toString(CellUtil.cloneQualifier(c));
int vCount = 0;
long t = c.getTimestamp();
if (LOG.isTraceEnabled()) LOG.trace("preCompact: get c timestamp "+ t + " lowest is " + lowStartId + " for col " + key);
if ( t < lowStartId ) {
if(verCountByCol.containsKey(key) == true)
{
vCount = verCountByCol.get(key);
vCount++;
verCountByCol.put(key,vCount);
}
else
{
vCount = 1;
verCountByCol.put(key,vCount);
}
if (vCount > 1) { //this is a unneed version
cellIter.remove();
}
}
}
} catch (Throwable t) {
throw new IOException("scanner next exception" );
}
return ret;
}
@Override
public void close() throws IOException {
s.close();
}
@Override
#ifdef HDP2.3 APACHE1.1 CDH5.5 CDH5.7 APACHE1.2
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("preCompact: call next with scannerContext limit " + scannerContext);
#else
public boolean next(List<Cell> result, int limit) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("preCompact: call next with limit " + limit);
#endif
boolean ret=true;
boolean skip=false;
ConcurrentHashMap<String, Integer> verCountByCol = new ConcurrentHashMap<String,Integer>();
try {
#ifdef HDP2.3 APACHE1.1 CDH5.5 CDH5.7 APACHE1.2
ret = s.next(result,scannerContext);
#else
ret = s.next(result,limit);
#endif
if(ret == false) return ret;
ListIterator<Cell> cellIter = null;
for( cellIter = result.listIterator(); cellIter.hasNext();) {
Cell c = cellIter.next();
String key=Bytes.toString(CellUtil.cloneQualifier(c));
int vCount = 0;
long t = c.getTimestamp();
if (LOG.isTraceEnabled()) LOG.trace("preCompact: get c timestamp "+ t + " lowest is " + lowStartId + " for col " + key);
if ( t < lowStartId ) {
if(verCountByCol.containsKey(key) == true)
{
vCount = verCountByCol.get(key);
vCount++;
verCountByCol.put(key,vCount);
}
else
{
vCount = 1;
verCountByCol.put(key,vCount);
}
if (vCount > 1) { //this is a unneed version
cellIter.remove();
}
}
}
} catch (Throwable t) {
throw new IOException("scanner next exception" );
}
return ret;
}
};
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store,
InternalScanner scanner,
ScanType scanType,
CompactionRequest request) throws IOException {
//check if it is major compaction
if( request.isMajor() == false ) return scanner;
//get lowStartId from ZooKeeper
String zNodeGCPath = "/hbase/Trafodion/GC";
long lowStartId = Long.MAX_VALUE;
try{
List<String> allTms = ZKUtil.listChildrenNoWatch(zkw1,zNodeGCPath);
if(allTms != null) {
// find lowest startID
for( String tm : allTms){
byte[] v = ZKUtil.getData(zkw1,zNodeGCPath+"/"+tm);
long ts = Bytes.toLong(v);
if( ts < lowStartId ) lowStartId= ts;
}
}
}catch (KeeperException ee) {
throw new IOException("Trafodion Region Observer GC: ZKW Unable to check GC znode, throw IOException ", ee);
}
catch(Exception ie) { //Different distribution required different exception to catch, catch everything here
throw new IOException("Trafodion Region Observer GC: ZKW Unable to check GC znode, throw IOException ", ie);
}
if(lowStartId == Long.MAX_VALUE || lowStartId == -1) //This is not SSCC or no start id is avaiable
return scanner;
return getWrappedScanner(lowStartId , scanner);
}
private static class TxnReadThread implements Runnable {
@SuppressWarnings("rawtypes")
private TrxRegionEndpoint tre;
private Path path;
private boolean isSplit;
TxnReadThread(@SuppressWarnings("rawtypes") TrxRegionEndpoint tre, Path path, boolean isSplit) {
this.tre = tre;
this.path = path;
this.isSplit = isSplit;
}
public void run(){
try {
tre.readTxnInfo(path, isSplit);
} catch (IOException ioe) {
if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction Info for transactional split coordination: " + ioe);
}
}
}
} // end of TrxRegionObserver Class