| /** |
| * @@@ START COPYRIGHT @@@ |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| * @@@ END COPYRIGHT @@@ |
| **/ |
| |
| package org.apache.hadoop.hbase.regionserver.transactional; |
| |
| import java.io.IOException; |
| |
| import java.lang.Class; |
| |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.ListIterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.commons.codec.binary.Hex; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.Tag; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValue.Type; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.regionserver.wal.HLog; |
| import org.apache.hadoop.hbase.regionserver.InternalScanner; |
| import org.apache.hadoop.hbase.regionserver.KeyValueScanner; |
| import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; |
| import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher; |
| import org.apache.hadoop.hbase.regionserver.ScanType; |
| import org.apache.hadoop.hbase.regionserver.ScanInfo; |
| import org.apache.hadoop.hbase.regionserver.wal.WALEdit; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.io.DataInputBuffer; |
| |
| /** |
| * Holds the state of a transaction. This includes a buffer of all writes, a record of all reads / scans, and |
| * information about which other transactions we need to check against. |
| */ |
| public class TrxTransactionState extends TransactionState{ |
| |
| static boolean sb_sqm_98_1; |
| static boolean sb_sqm_98_4; |
| static java.lang.reflect.Constructor c98_1 = null; |
| static java.lang.reflect.Constructor c98_4 = null; |
| |
| static { |
| sb_sqm_98_1 = true; |
| try { |
| NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null; |
| c98_1 = ScanQueryMatcher.class.getConstructor( |
| new Class [] { |
| Scan.class, |
| ScanInfo.class, |
| java.util.NavigableSet.class, |
| ScanType.class, |
| long.class, |
| long.class, |
| long.class |
| }); |
| } |
| catch (NoSuchMethodException exc_nsm) { |
| sb_sqm_98_1 = false; |
| sb_sqm_98_4 = true; |
| try { |
| c98_4 = ScanQueryMatcher.class.getConstructor( |
| new Class [] { |
| Scan.class, |
| ScanInfo.class, |
| java.util.NavigableSet.class, |
| ScanType.class, |
| long.class, |
| long.class, |
| long.class, |
| RegionCoprocessorHost.class |
| }); |
| } |
| catch (NoSuchMethodException exc_nsm2) { |
| sb_sqm_98_4 = false; |
| } |
| } |
| |
| if (sb_sqm_98_1) { |
| LOG.info("Got info of Class ScanQueryMatcher for HBase 98.1"); |
| } |
| if (sb_sqm_98_4) { |
| LOG.info("Got info of Class ScanQueryMatcher for HBase 98.4"); |
| } |
| } |
| |
| /** |
| * Simple container of the range of the scanners we've opened. Used to check for conflicting writes. |
| */ |
| private static class ScanRange { |
| |
| protected byte[] startRow; |
| protected byte[] endRow; |
| |
| public ScanRange(final byte[] startRow, final byte[] endRow) { |
| this.startRow = startRow == HConstants.EMPTY_START_ROW ? null : startRow; |
| this.endRow = endRow == HConstants.EMPTY_END_ROW ? null : endRow; |
| } |
| |
| /** |
| * Check if this scan range contains the given key. |
| * |
| * @param rowKey |
| * @return boolean |
| */ |
| public boolean contains(final byte[] rowKey) { |
| if (startRow != null && Bytes.compareTo(rowKey, startRow) < 0) { |
| return false; |
| } |
| if (endRow != null && Bytes.compareTo(endRow, rowKey) < 0) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public String toString() { |
| return "startRow: " + (startRow == null ? "null" : Bytes.toStringBinary(startRow)) + ", endRow: " |
| + (endRow == null ? "null" : Bytes.toStringBinary(endRow)); |
| } |
| } |
| private List<ScanRange> scans = Collections.synchronizedList(new LinkedList<ScanRange>()); |
| private List<Delete> deletes = Collections.synchronizedList(new LinkedList<Delete>()); |
| private List<WriteAction> writeOrdering = Collections.synchronizedList(new LinkedList<WriteAction>()); |
| private Set<TrxTransactionState> transactionsToCheck = Collections.synchronizedSet(new HashSet<TrxTransactionState>()); |
| private WALEdit e; |
| |
| public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo, |
| HTableDescriptor htd, HLog hLog, boolean logging) { |
| super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging); |
| this.e = new WALEdit(); |
| } |
| |
| public synchronized void addRead(final byte[] rowKey) { |
| scans.add(new ScanRange(rowKey, rowKey)); |
| } |
| |
| public synchronized void addWrite(final Put write) { |
| if (LOG.isTraceEnabled()) LOG.trace("addWrite -- ENTRY: write: " + write.toString()); |
| WriteAction waction; |
| KeyValue kv; |
| WALEdit e1 = new WALEdit(); |
| updateLatestTimestamp(write.getFamilyCellMap().values(), EnvironmentEdgeManager.currentTimeMillis()); |
| // Adding read scan on a write action |
| addRead(new WriteAction(write).getRow()); |
| |
| ListIterator<WriteAction> writeOrderIter = writeOrdering.listIterator(); |
| writeOrderIter.add(waction = new WriteAction(write)); |
| |
| if (this.earlyLogging) { // immeditaely write edit out to HLOG during DML (actve transaction state) |
| for (Cell value : waction.getCells()) { |
| //KeyValue kv = KeyValueUtil.ensureKeyValue(value); |
| kv = KeyValue.cloneAndAddTags(value, tagList); |
| //if (LOG.isTraceEnabled()) LOG.trace("KV hex dump " + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/)); |
| e1.add(kv); |
| e.add(kv); |
| } |
| try { |
| long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), |
| e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor, |
| this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE); |
| //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId); |
| if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 |
| } |
| catch (IOException exp1) { |
| LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception"); |
| //throw exp1; |
| } |
| } |
| else { // edits are buffered in ts and written out to HLOG in phase 1 |
| for (Cell value : waction.getCells()) { |
| kv = KeyValue.cloneAndAddTags(value, tagList); |
| e.add(kv); |
| } |
| } |
| if (LOG.isTraceEnabled()) LOG.trace("addWrite -- EXIT"); |
| } |
| |
| public boolean hasWrite() { |
| return writeOrdering.size() > 0; |
| } |
| |
| public synchronized void addDelete(final Delete delete) { |
| if (LOG.isTraceEnabled()) LOG.trace("addDelete -- ENTRY: delete: " + delete.toString()); |
| |
| WriteAction waction; |
| WALEdit e1 = new WALEdit(); |
| long now = EnvironmentEdgeManager.currentTimeMillis(); |
| updateLatestTimestamp(delete.getFamilyCellMap().values(), now); |
| if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) { |
| delete.setTimestamp(now); |
| } |
| deletes.add(delete); |
| |
| ListIterator<WriteAction> writeOrderIter = writeOrdering.listIterator(); |
| writeOrderIter.add(waction = new WriteAction(delete)); |
| |
| if (this.earlyLogging) { |
| for (Cell value : waction.getCells()) { |
| KeyValue kv = KeyValue.cloneAndAddTags(value, tagList); |
| e1.add(kv); |
| e.add(kv); |
| } |
| try { |
| long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), |
| e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor, |
| this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE); |
| //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with txid " + txid + " ts flush id " + this.flushTxId); |
| if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 |
| } |
| catch (IOException exp1) { |
| LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception"); |
| } |
| } |
| else { |
| for (Cell value : waction.getCells()) { |
| KeyValue kv = KeyValue.cloneAndAddTags(value, tagList); |
| e.add(kv); |
| } |
| } |
| |
| if (LOG.isTraceEnabled()) LOG.trace("addDelete -- EXIT"); |
| } |
| |
| public synchronized void applyDeletes(final List<Cell> input, final long minTime, final long maxTime) { |
| if (deletes.isEmpty()) { |
| return; |
| } |
| for (ListIterator<Cell> itr = input.listIterator(); itr.hasNext();) { |
| Cell included = applyDeletes(itr.next(), minTime, maxTime); |
| if (null == included) { |
| itr.remove(); |
| } |
| } |
| } |
| |
| public synchronized Cell applyDeletes(final Cell kv, final long minTime, final long maxTime) { |
| if (deletes.isEmpty()) { |
| return kv; |
| } |
| |
| ListIterator<Delete> deletesIter = null; |
| |
| for (deletesIter = deletes.listIterator(); |
| deletesIter.hasNext();) { |
| Delete delete = deletesIter.next(); |
| |
| // Skip if delete should not apply |
| if (!Bytes.equals(kv.getRow(), delete.getRow()) || kv.getTimestamp() > delete.getTimeStamp() |
| || delete.getTimeStamp() > maxTime || delete.getTimeStamp() < minTime) { |
| continue; |
| } |
| |
| // Whole-row delete |
| if (delete.isEmpty()) { |
| return null; |
| } |
| |
| for (Entry<byte[], List<Cell>> deleteEntry : delete.getFamilyCellMap().entrySet()) { |
| byte[] family = deleteEntry.getKey(); |
| if (!Bytes.equals(kv.getFamilyArray(), family)) { |
| continue; |
| } |
| List<Cell> familyDeletes = deleteEntry.getValue(); |
| if (familyDeletes == null) { |
| return null; |
| } |
| for (Cell keyDeletes : familyDeletes) { |
| byte[] deleteQualifier = keyDeletes.getQualifierArray(); |
| byte[] kvQualifier = kv.getQualifierArray(); |
| if (keyDeletes.getTimestamp() > kv.getTimestamp() && Bytes.equals(deleteQualifier, kvQualifier)) { |
| return null; |
| } |
| } |
| } |
| } |
| |
| return kv; |
| } |
| |
| public void clearState() { |
| |
| clearTransactionsToCheck(); |
| clearWriteOrdering(); |
| clearScanRange(); |
| clearDeletes(); |
| clearTags(); |
| clearWALEdit(); |
| } |
| |
| public void clearTransactionsToCheck() { |
| transactionsToCheck.clear(); |
| } |
| |
| public void clearWriteOrdering() { |
| writeOrdering.clear(); |
| } |
| |
| public void clearScanRange() { |
| scans.clear(); |
| } |
| |
| public void clearDeletes() { |
| deletes.clear(); |
| } |
| |
| public void clearTags() { |
| tagList.clear(); |
| } |
| |
| public void clearWALEdit() { |
| if (e.size() > 0) { |
| |
| DataInputBuffer in = new DataInputBuffer(); |
| try { |
| e.readFields(in); |
| e.getKeyValues().clear(); |
| } catch (java.io.EOFException eeof) { |
| // DataInputBuffer was empty, successfully emptied kvs |
| } catch (Exception e) { |
| if (LOG.isTraceEnabled()) LOG.trace("TrxTransactionState clearWALEdit: Clearing WALEdit caught an exception for transaction "+ this.transactionId + ", regionInfo is [" + regionInfo.getRegionNameAsString() + "]" + ": exception " + e.toString()); |
| } finally { |
| try { |
| in.close(); |
| } catch (IOException io) { |
| } |
| } |
| } |
| if (e.size() > 0) |
| if (LOG.isTraceEnabled()) LOG.trace("TrxTransactionState clearWALEdit: Possible leak with kvs entries in WALEDIT, for transaction "+ this.transactionId + ", regionInfo is [" + regionInfo.getRegionNameAsString() + "], e is " + e.toString()); |
| } |
| |
| public void addTransactionToCheck(final TrxTransactionState transaction) { |
| transactionsToCheck.add(transaction); |
| } |
| |
| public synchronized boolean hasConflict() { |
| |
| for (TrxTransactionState transactionState : transactionsToCheck) { |
| try { |
| if (hasConflict(transactionState)) { |
| if (LOG.isTraceEnabled()) LOG.trace("TrxTransactionState hasConflict: Returning true for " + transactionState.toString() + ", regionInfo is [" + regionInfo.getRegionNameAsString() + "]"); |
| return true; |
| } |
| } catch (Exception e) { |
| // We are unable to ascertain if we have had a conflict with |
| // the rows we are trying to modify. We will return true, |
| // indicating that we can not allow the pending changes |
| // for this transaction to commit. |
| LOG.error("TrxTransactionState hasConflict: Returning true. Caught exception for transaction " + transactionState.toString() + ", regionInfo is [" + regionInfo.getRegionNameAsString() + "], exception is " + e.toString()); |
| return true; |
| } |
| } |
| return false; |
| } |
| private boolean hasConflict(final TrxTransactionState checkAgainst) throws Exception{ |
| if (checkAgainst.getStatus().equals(TransactionState.Status.ABORTED)) { |
| return false; // Cannot conflict with aborted transactions |
| } |
| |
| ListIterator<WriteAction> writeOrderIter = null; |
| |
| for (writeOrderIter = checkAgainst.writeOrdering.listIterator(); |
| writeOrderIter.hasNext();) { |
| WriteAction otherUpdate = writeOrderIter.next(); |
| |
| try { |
| byte[] row = otherUpdate.getRow(); |
| if (row == null) { |
| LOG.warn("TrxTransactionState hasConflict: row is null - this Transaction [" + this.toString() + "] checkAgainst Transaction [" + checkAgainst.toString() + "] "); |
| } |
| if (this.getTransactionId() == checkAgainst.getTransactionId()) |
| { |
| if (LOG.isTraceEnabled()) LOG.trace("TrxTransactionState hasConflict: Continuing - this Transaction [" + this.toString() + "] is the same as the against Transaction [" + checkAgainst.toString() + "]"); |
| continue; |
| } |
| if (this.scans != null && !this.scans.isEmpty()) { |
| ListIterator<ScanRange> scansIter = null; |
| |
| for (scansIter = this.scans.listIterator(); |
| scansIter.hasNext();) { |
| ScanRange scanRange = scansIter.next(); |
| |
| if (scanRange == null) |
| if (LOG.isTraceEnabled()) LOG.trace("Transaction [" + this.toString() + "] scansRange is null"); |
| if (scanRange != null && scanRange.contains(row)) { |
| LOG.warn("Transaction [" + this.toString() + "] has scan which conflicts with [" |
| + checkAgainst.toString() + "]: region [" + regionInfo.getRegionNameAsString() |
| + "], scanRange[" + scanRange.toString() + "] ,row[" + Bytes.toStringBinary(row) + "]"); |
| return true; |
| } |
| //else { |
| // LOG.trace("Transaction [" + this.toString() + "] has scanRange checked against [" |
| // + checkAgainst.toString() + "]: region [" + regionInfo.getRegionNameAsString() |
| // + "], scanRange[" + scanRange.toString() + "] ,row[" + Bytes.toStringBinary(row) + "]"); |
| //} |
| } |
| } |
| else { |
| if (this.scans == null) |
| LOG.trace("Transaction [" + this.toString() + "] scans was equal to null"); |
| else |
| LOG.trace("Transaction [" + this.toString() + "] scans was empty "); |
| } |
| } |
| catch (Exception e) { |
| LOG.warn("TrxTransactionState hasConflict: Unable to get row - this Transaction [" + this.toString() + "] checkAgainst Transaction [" |
| + checkAgainst.toString() + "] " + " Exception: " + e); |
| throw e; |
| } |
| } |
| return false; |
| } |
| |
| public WALEdit getEdit() { |
| return e; |
| } |
| |
| |
| @Override |
| public String toString() { |
| StringBuilder result = new StringBuilder(); |
| result.append("[transactionId: "); |
| result.append(transactionId); |
| result.append(" status: "); |
| result.append(status.name()); |
| result.append(" scan Size: "); |
| result.append(scans.size()); |
| result.append(" write Size: "); |
| result.append(getWriteOrdering().size()); |
| result.append(" startSQ: "); |
| result.append(startSequenceNumber); |
| if (sequenceNumber != null) { |
| result.append(" commitedSQ:"); |
| result.append(sequenceNumber); |
| } |
| result.append("]"); |
| |
| return result.toString(); |
| } |
| |
| public synchronized void addScan(final Scan scan) { |
| ScanRange scanRange = new ScanRange(scan.getStartRow(), scan.getStopRow()); |
| if (LOG.isTraceEnabled()) LOG.trace(String.format("Adding scan for transaction [%s], from startRow [%s] to endRow [%s]", transactionId, |
| scanRange.startRow == null ? "null" : Bytes.toStringBinary(scanRange.startRow), scanRange.endRow == null ? "null" |
| : Bytes.toStringBinary(scanRange.endRow))); |
| scans.add(scanRange); |
| } |
| |
| /** |
| * Get deletes. |
| * |
| * @return deletes |
| */ |
| public synchronized List<Delete> getDeletes() { |
| return deletes; |
| } |
| |
| /** |
| * Get a scanner to go through the puts and deletes from this transaction. Used to weave together the local trx puts |
| * with the global state. |
| * |
| * @return scanner |
| */ |
| public KeyValueScanner getScanner(final Scan scan) { |
| return new TransactionScanner(scan); |
| } |
| |
| private synchronized Cell[] getAllCells(final Scan scan) { |
| //if (LOG.isTraceEnabled()) LOG.trace("getAllCells -- ENTRY"); |
| List<Cell> kvList = new ArrayList<Cell>(); |
| |
| ListIterator<WriteAction> writeOrderIter = null; |
| |
| for (writeOrderIter = writeOrdering.listIterator(); |
| writeOrderIter.hasNext();) { |
| WriteAction action = writeOrderIter.next(); |
| byte[] row = action.getRow(); |
| List<Cell> kvs = action.getCells(); |
| |
| if (scan.getStartRow() != null && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) |
| && Bytes.compareTo(row, scan.getStartRow()) < 0) { |
| continue; |
| } |
| if (scan.getStopRow() != null && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) |
| && Bytes.compareTo(row, scan.getStopRow()) > 0) { |
| continue; |
| } |
| |
| if (!scan.hasFamilies()) { |
| kvList.addAll(kvs); |
| continue; |
| } |
| // Pick only the Cell's that match the 'scan' specifications |
| for (Cell lv_kv : kvs) { |
| byte[] lv_kv_family = lv_kv.getFamilyArray(); |
| Map<byte [], NavigableSet<byte []>> lv_familyMap = scan.getFamilyMap(); |
| NavigableSet<byte []> set = lv_familyMap.get(lv_kv_family); |
| if (set == null || set.size() == 0) { |
| kvList.add(lv_kv); |
| continue; |
| } |
| if (set.contains(lv_kv.getQualifierArray())) { |
| kvList.add(lv_kv); |
| } |
| } |
| } |
| |
| if (LOG.isTraceEnabled()) LOG.trace("getAllCells -- EXIT kvList size = " + kvList.size()); |
| return kvList.toArray(new Cell[kvList.size()]); |
| } |
| |
| private synchronized KeyValue[] getAllKVs(final Scan scan) { |
| //if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- ENTRY"); |
| List<KeyValue> kvList = new ArrayList<KeyValue>(); |
| |
| ListIterator<WriteAction> writeOrderIter = null; |
| |
| for (writeOrderIter = writeOrdering.listIterator(); |
| writeOrderIter.hasNext();) { |
| WriteAction action = writeOrderIter.next(); |
| byte[] row = action.getRow(); |
| List<KeyValue> kvs = action.getKeyValues(); |
| |
| if (scan.getStartRow() != null && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) |
| && Bytes.compareTo(row, scan.getStartRow()) < 0) { |
| continue; |
| } |
| if (scan.getStopRow() != null && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) |
| && Bytes.compareTo(row, scan.getStopRow()) > 0) { |
| continue; |
| } |
| |
| if (!scan.hasFamilies()) { |
| kvList.addAll(kvs); |
| continue; |
| } |
| |
| // Pick only the Cell's that match the 'scan' specifications |
| Map<byte [], NavigableSet<byte []>> lv_familyMap = scan.getFamilyMap(); |
| for (KeyValue lv_kv : kvs) { |
| byte[] lv_kv_family = lv_kv.getFamily(); |
| NavigableSet<byte []> set = lv_familyMap.get(lv_kv_family); |
| if (set == null || set.size() == 0) { |
| kvList.add(lv_kv); |
| continue; |
| } |
| if (set.contains(lv_kv.getQualifier())) { |
| kvList.add(lv_kv); |
| } |
| } |
| } |
| |
| if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- EXIT kvList size = " + kvList.size()); |
| return kvList.toArray(new KeyValue[kvList.size()]); |
| } |
| |
| |
| private synchronized int getTransactionSequenceIndex(final Cell kv) { |
| ListIterator<WriteAction> writeOrderIter = null; |
| int i = 0; |
| |
| for (writeOrderIter = writeOrdering.listIterator(); |
| writeOrderIter.hasNext();) { |
| i++; |
| WriteAction action = writeOrderIter.next(); |
| if (isKvInPut(kv, action.getPut())) { |
| return i; |
| } |
| if (isKvInDelete(kv, action.getDelete())) { |
| return i; |
| } |
| } |
| throw new IllegalStateException("Can not find kv in transaction writes"); |
| } |
| |
| |
| /** |
| * Scanner of the puts and deletes that occur during this transaction. |
| * |
| * @author clint.morgan |
| */ |
| public class TransactionScanner extends KeyValueListScanner implements InternalScanner { |
| |
| private ScanQueryMatcher matcher; |
| |
| TransactionScanner(final Scan scan) { |
| super(new KeyValue.KVComparator() { |
| @Override |
| public int compare(final Cell left, final Cell right) { |
| int result = super.compare(left, right); |
| if (result != 0) { |
| return result; |
| } |
| if (left == right) { |
| return 0; |
| } |
| int put1Number = getTransactionSequenceIndex(left); |
| int put2Number = getTransactionSequenceIndex(right); |
| return put2Number - put1Number; |
| } |
| }, getAllKVs(scan)); |
| |
| // We want transaction scanner to always take priority over store |
| // scanners. |
| super.setSequenceID(Long.MAX_VALUE); |
| |
| //Store.ScanInfo scaninfo = new Store.ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, Cell.COMPARATOR); |
| ScanInfo scaninfo = new ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, KeyValue.COMPARATOR); |
| |
| try { |
| if (sb_sqm_98_1) { |
| try { |
| matcher = (ScanQueryMatcher) c98_1.newInstance(scan, |
| scaninfo, |
| null, |
| ScanType.USER_SCAN, |
| Long.MAX_VALUE, |
| HConstants.LATEST_TIMESTAMP, |
| 0); |
| if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.1"); |
| } |
| catch (InstantiationException exc_ins) { |
| LOG.error("InstantiationException: " + exc_ins); |
| } |
| catch (IllegalAccessException exc_ill_acc) { |
| LOG.error("IllegalAccessException: " + exc_ill_acc); |
| } |
| catch (InvocationTargetException exc_inv_tgt) { |
| LOG.error("InvocationTargetException: " + exc_inv_tgt); |
| } |
| |
| } |
| else { |
| try { |
| matcher = (ScanQueryMatcher) c98_4.newInstance(scan, |
| scaninfo, |
| null, |
| ScanType.USER_SCAN, |
| Long.MAX_VALUE, |
| HConstants.LATEST_TIMESTAMP, |
| (long) 0, |
| null); |
| if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.4"); |
| } |
| catch (InstantiationException exc_ins) { |
| LOG.error("InstantiationException: " + exc_ins); |
| } |
| catch (IllegalAccessException exc_ill_acc) { |
| LOG.error("IllegalAccessException: " + exc_ill_acc); |
| } |
| catch (InvocationTargetException exc_inv_tgt) { |
| LOG.error("InvocationTargetException: " + exc_inv_tgt); |
| } |
| |
| } |
| } |
| catch (Exception e) { |
| LOG.error("error while instantiating the ScanQueryMatcher()" + e); |
| } |
| |
| } |
| |
| /** |
| * Get the next row of values from this transaction. |
| * |
| * @param outResult |
| * @param limit |
| * @return true if there are more rows, false if scanner is done |
| */ |
| @Override |
| public synchronized boolean next(final List<Cell> outResult, final int limit) throws IOException { |
| Cell peeked = this.peek(); |
| if (peeked == null) { |
| close(); |
| return false; |
| } |
| |
| matcher.setRow(peeked.getRowArray(), peeked.getRowOffset(), peeked.getRowLength()); |
| |
| KeyValue kv; |
| List<Cell> results = new ArrayList<Cell>(); |
| LOOP: while ((kv = this.peek()) != null) { |
| ScanQueryMatcher.MatchCode qcode = matcher.match(kv); |
| switch (qcode) { |
| case INCLUDE: |
| Cell next = this.next(); |
| results.add(next); |
| if (limit > 0 && results.size() == limit) { |
| break LOOP; |
| } |
| continue; |
| |
| case DONE: |
| // copy jazz |
| outResult.addAll(0, results); |
| return true; |
| |
| case DONE_SCAN: |
| close(); |
| |
| // copy jazz |
| outResult.addAll(0, results); |
| |
| return false; |
| |
| case SEEK_NEXT_ROW: |
| this.next(); |
| break; |
| |
| case SEEK_NEXT_COL: |
| this.next(); |
| break; |
| |
| case SKIP: |
| this.next(); |
| break; |
| |
| default: |
| throw new RuntimeException("UNEXPECTED"); |
| } |
| } |
| |
| if (!results.isEmpty()) { |
| // copy jazz |
| outResult.addAll(0, results); |
| return true; |
| } |
| |
| // No more keys |
| close(); |
| return false; |
| } |
| |
| @Override |
| /* Commenting out for HBase 0.98 |
| public boolean next(final List<KeyValue> results) throws IOException { |
| return next(results, -1); |
| } |
| // May need to use metric value |
| @Override |
| public boolean next(List<KeyValue> results, String metric) throws IOException{ |
| return next(results, -1); |
| } |
| |
| // May need to use metric value |
| @Override |
| public boolean next(List<Cell> results, int limit, String metric) throws IOException { |
| |
| return next(results,limit); |
| } |
| |
| */ |
| |
| public synchronized boolean next(final List<Cell> results) throws IOException { |
| return next(results, -1); |
| } |
| |
| } |
| |
| private synchronized boolean isKvInPut(final Cell kv, final Put put) { |
| if (null != put) { |
| for (List<Cell> putKVs : put.getFamilyCellMap().values()) { |
| for (Cell putKV : putKVs) { |
| if (putKV == kv) { |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| private synchronized boolean isKvInDelete(final Cell kv, final Delete delete) { |
| if (null != delete) { |
| for (List<Cell> putKVs : delete.getFamilyCellMap().values()) { |
| for (Cell deleteKv : putKVs) { |
| if (deleteKv == kv) { |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Get the puts and deletes in transaction order. |
| * |
| * @return Return the writeOrdering. |
| */ |
| public List<WriteAction> getWriteOrdering() { |
| return writeOrdering; |
| } |
| |
| /* |
| * Get the puts and deletes in transaction order. |
| * |
| * @return Return the writeOrdering as an iterator. |
| */ |
| public ListIterator<WriteAction> getWriteOrderingIter() { |
| return writeOrdering.listIterator(); |
| } |
| |
| /** |
| * Simple wrapper for Put and Delete since they don't have a common enough interface. |
| */ |
| public class WriteAction { |
| |
| private Put put; |
| private Delete delete; |
| |
| public WriteAction(final Put put) { |
| if (null == put) { |
| throw new IllegalArgumentException("WriteAction requires a Put or a Delete."); |
| } |
| this.put = put; |
| } |
| |
| public WriteAction(final Delete delete) { |
| if (null == delete) { |
| throw new IllegalArgumentException("WriteAction requires a Put or a Delete."); |
| } |
| this.delete = delete; |
| } |
| |
| public Put getPut() { |
| return put; |
| } |
| |
| public Delete getDelete() { |
| return delete; |
| } |
| |
| public synchronized byte[] getRow() { |
| if (put != null) { |
| return put.getRow(); |
| } else if (delete != null) { |
| return delete.getRow(); |
| } |
| throw new IllegalStateException("WriteAction is invalid"); |
| } |
| |
| synchronized List<Cell> getCells() { |
| List<Cell> edits = new ArrayList<Cell>(); |
| Collection<List<Cell>> kvsList; |
| |
| if (put != null) { |
| kvsList = put.getFamilyCellMap().values(); |
| } else if (delete != null) { |
| if (delete.getFamilyCellMap().isEmpty()) { |
| // If whole-row delete then we need to expand for each |
| // family |
| kvsList = new ArrayList<List<Cell>>(1); |
| for (byte[] family : tabledescriptor.getFamiliesKeys()) { |
| Cell familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(), |
| KeyValue.Type.DeleteFamily); |
| kvsList.add(Collections.singletonList(familyDelete)); |
| } |
| } else { |
| kvsList = delete.getFamilyCellMap().values(); |
| } |
| } else { |
| throw new IllegalStateException("WriteAction is invalid"); |
| } |
| |
| for (List<Cell> kvs : kvsList) { |
| for (Cell kv : kvs) { |
| edits.add(kv); |
| //if (LOG.isDebugEnabled()) LOG.debug("Trafodion Recovery: " + regionInfo.getRegionNameAsString() + " create edits for transaction: " |
| // + transactionId + " with Op " + kv.getType()); |
| } |
| } |
| return edits; |
| } |
| |
| synchronized List<KeyValue> getKeyValues() { |
| List<KeyValue> edits = new ArrayList<KeyValue>(); |
| Collection<List<KeyValue>> kvsList = null; |
| |
| if (put != null) { |
| if (!put.getFamilyMap().isEmpty()) { |
| kvsList = put.getFamilyMap().values(); |
| } |
| } else if (delete != null) { |
| if (delete.getFamilyCellMap().isEmpty()) { |
| // If whole-row delete then we need to expand for each |
| // family |
| kvsList = new ArrayList<List<KeyValue>>(1); |
| for (byte[] family : tabledescriptor.getFamiliesKeys()) { |
| KeyValue familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(), |
| KeyValue.Type.DeleteFamily); |
| kvsList.add(Collections.singletonList(familyDelete)); |
| } |
| } else { |
| kvsList = delete.getFamilyMap().values(); |
| } |
| } else { |
| throw new IllegalStateException("WriteAction is invalid"); |
| } |
| |
| if (kvsList != null) { |
| for (List<KeyValue> kvs : kvsList) { |
| for (KeyValue kv : kvs) { |
| edits.add(kv); |
| //if (LOG.isDebugEnabled()) LOG.debug("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " create edits for transaction: " |
| // + transactionId + " with Op " + kv.getType()); |
| } |
| } |
| } |
| else |
| if (LOG.isTraceEnabled()) LOG.trace("Trafodion getKeyValues: " |
| + regionInfo.getRegionNameAsString() + " kvsList was null"); |
| return edits; |
| } |
| } |
| public Set<TrxTransactionState> getTransactionsToCheck() { |
| return transactionsToCheck; |
| } |
| |
| |
| } |