blob: 7f1271e63571ae271b02b76722655bbba77f196a [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.apache.hadoop.hbase.coprocessor.transactional;
import java.io.IOException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import java.io.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.management.MemoryMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.StringBuilder;
import java.lang.StringBuilder;
import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ProtoUtil;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionPersist;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionStateMsg;
import com.google.protobuf.CodedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.HBaseAdmin;
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
#endif
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerTimeoutException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.transactional.MemoryUsageException;
import org.apache.hadoop.hbase.client.transactional.NonPendingTransactionException;
import org.apache.hadoop.hbase.client.transactional.OutOfOrderProtocolException;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.client.transactional.BatchException;
import org.apache.hadoop.hbase.client.transactional.TransState;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
#endif
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.regionserver.LeaseListener;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
//import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
#ifdef CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.regionserver.Region;
#endif
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
//import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
//import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.transactional.CleanOldTransactionsChore;
import org.apache.hadoop.hbase.regionserver.transactional.MemoryUsageChore;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState;
import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState.TransactionScanner;
import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState.WriteAction;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.CommitProgress;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.coprocessor.transactional.TrxRegionObserver;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.BeginTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.BeginTransactionResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CloseScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CloseScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitIfPossibleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitIfPossibleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutRegionTxRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutRegionTxResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogTransactionStatesFromIntervalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogTransactionStatesFromIntervalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogWriteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogWriteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafEstimateRowCountRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafEstimateRowCountResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
#ifdef CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
#endif
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class TrxRegionEndpoint<T, S, P extends Message, Q extends Message, R extends Message> extends TrxRegionService implements
CoprocessorService, Coprocessor {
private static final Log LOG = LogFactory.getLog(TrxRegionEndpoint.class);
private RegionCoprocessorEnvironment env;
protected Map<Long, Long> transactionsByIdTest = null;
ConcurrentHashMap<String, Object> transactionsByIdTestz = null;
// Collection of active transactions (PENDING) keyed by id.
protected ConcurrentHashMap<String, TrxTransactionState> transactionsById = new ConcurrentHashMap<String, TrxTransactionState>();
// Map of recent transactions that are COMMIT_PENDING or COMMITED keyed
// by their sequence number
private SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumber = Collections.synchronizedSortedMap(new TreeMap<Long, TrxTransactionState>());
// Collection of transactions that are COMMIT_PENDING
private Set<TrxTransactionState> commitPendingTransactions = Collections.synchronizedSet(new HashSet<TrxTransactionState>());
// an in-doubt transaction list during recovery WALEdit replay
private Map<Long, List<WALEdit>> indoubtTransactionsById = new TreeMap<Long, List<WALEdit>>();
// list of transactions to check for stale scanners
private List<Long> cleanScannersForTransactions = Collections.synchronizedList(new LinkedList<Long>());
// an in-doubt transaction list count by TM id
private Map<Integer, Integer> indoubtTransactionsCountByTmid = new TreeMap<Integer,Integer>();
// Concurrent map for transactional region scanner holders
// Protected by synchronized methods
private ConcurrentHashMap<Long,
TransactionalRegionScannerHolder> scanners =
new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
// Atomic values to manage region scanners
private AtomicLong performScannerId = new AtomicLong(0);
private AtomicLong nextSequenceId = new AtomicLong(0);
private Object commitCheckLock = new Object();
private Object recoveryCheckLock = new Object();
private Object editReplay = new Object();
private static Object stoppableLock = new Object();
private int reconstructIndoubts = 0;
//temporary THLog getSequenceNumber() replacement
private AtomicLong nextLogSequenceId = new AtomicLong(0);
public AtomicLong controlPointEpoch = new AtomicLong(1);
private final int oldTransactionFlushTrigger = 0;
private final Boolean splitDelayEnabled = false;
private final Boolean doWALHlog = false;
static Leases transactionLeases = null;
CleanOldTransactionsChore cleanOldTransactionsThread;
static MemoryUsageChore memoryUsageThread = null;
Stoppable stoppable = new StoppableImplementation();
static Stoppable stoppable2 = new StoppableImplementation();
private int cleanTimer = 5000; // 5 secs overriden by DEFAULT_SLEEP
private int memoryUsageTimer = 60000; // One minute
private int regionState = 0;
private Path recoveryTrxPath = null;
private int cleanAT = 0;
private long onlineEpoch = EnvironmentEdgeManager.currentTime();
private long[] commitCheckTimes = new long[50];
private long[] hasConflictTimes = new long[50];
private long[] putBySequenceTimes = new long[50];
private long[] writeToLogTimes = new long[50];
private AtomicInteger timeIndex = new AtomicInteger (0);
private AtomicInteger totalCommits = new AtomicInteger (0);
private AtomicInteger writeToLogOperations = new AtomicInteger (0);
private AtomicInteger putBySequenceOperations = new AtomicInteger (0);
private long totalCommitCheckTime = 0;
private long totalConflictTime = 0;
private long totalPutTime = 0;
private long totalWriteToLogTime = 0;
private long minCommitCheckTime = 1000000000;
private long maxCommitCheckTime = 0;
private double avgCommitCheckTime = 0;
private long minConflictTime = 1000000000;
private long maxConflictTime = 0;
private double avgConflictTime = 0;
private long minPutTime = 1000000000;
private long maxPutTime = 0;
private double avgPutTime = 0;
private long minWriteToLogTime = 1000000000;
private long maxWriteToLogTime = 0;
private double avgWriteToLogTime = 0;
private HRegionInfo regionInfo = null;
#ifdef CDH5.7 APACHE1.2
private Region m_Region = null;
#else
private HRegion m_Region = null;
#endif
private String m_regionName = null;
private String m_regionDetails = null;
private boolean m_isTrafodionMetadata = false;
#ifdef CDH5.7 APACHE1.2
private HRegion t_Region = null;
#else
private TransactionalRegion t_Region = null;
#endif
private FileSystem fs = null;
private RegionCoprocessorHost rch = null;
private WAL tHLog = null;
private AtomicBoolean closing = new AtomicBoolean(false);
private AtomicBoolean blockAll = new AtomicBoolean(false);
private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false);
private AtomicBoolean blockNewTrans = new AtomicBoolean(false);
private boolean configuredEarlyLogging = false;
private boolean configuredConflictReinstate = false;
private static Object zkRecoveryCheckLock = new Object();
private static Object choreDetectStaleBranchLock = new Object();
private static ZooKeeperWatcher zkw1 = null;
String lv_hostName;
int lv_port;
private static String zNodePath = "/hbase/Trafodion/recovery/";
private static final String COMMITTED_TXNS_KEY = "1_COMMITED_TXNS_KEY";
private static final String TXNS_BY_ID_KEY = "2_TXNS_BY_ID_KEY";
private HFileContext context = new HFileContextBuilder().withIncludesTags(false).build();
private static final int MINIMUM_LEASE_TIME = 7200 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000;
private static final int DEFAULT_SLEEP = 30 * 1000; // 30 seconds
private static final int DEFAULT_MEMORY_THRESHOLD = 100; // 100% memory used
private static final int DEFAULT_STARTUP_MEMORY_THRESHOLD = 90; // initial value : 90% memory used
private static final int DEFAULT_MEMORY_SLEEP = 15 * 1000;
private static final boolean DEFAULT_MEMORY_WARN_ONLY = true;
private static final boolean DEFAULT_MEMORY_PERFORM_GC = false;
private static final int DEFAULT_ASYNC_WAL = 1;
private static final boolean DEFAULT_SKIP_WAL = false;
private static final boolean DEFAULT_COMMIT_EDIT = false;
private static final boolean DEFAULT_SUPPRESS_OOP = false;
private static final boolean DEFAULT_TM_USE_COMMIT_ID_IN_CELLS = false;
private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
private static final String LEASE_CONF = "hbase.transaction.lease.timeout";
private static final String MEMORY_THRESHOLD = "hbase.transaction.memory.threshold";
private static final String MEMORY_WARN_ONLY = "hbase.transaction.memory.warn.only";
private static final String MEMORY_CONF = "hbase.transaction.memory.sleep";
private static final String MEMORY_PERFORM_GC = "hbase.transaction.memory.perform.GC";
private static final String CONF_ASYNC_WAL = "hbase.trafodion.async.wal";
private static final String CONF_SKIP_WAL = "hbase.trafodion.skip.wal";
private static final String CONF_COMMIT_EDIT = "hbase.trafodion.full.commit.edit";
private static final String SUPPRESS_OOP = "hbase.transaction.suppress.OOP.exception";
private static final String CHECK_ROW = "hbase.transaction.check.row";
private static final String CONF_TM_USE_COMMIT_ID_IN_CELLS = "hbase.transaction.use.commitId";
protected static int transactionLeaseTimeout = 0;
private static int scannerLeaseTimeoutPeriod = 0;
private static int scannerThreadWakeFrequency = 0;
private static int memoryUsageThreshold = DEFAULT_MEMORY_THRESHOLD;
private static boolean memoryUsagePerformGC = DEFAULT_MEMORY_PERFORM_GC;
private static boolean memoryUsageWarnOnly = DEFAULT_MEMORY_WARN_ONLY;
private static int asyncWal = DEFAULT_ASYNC_WAL;
private static boolean skipWal = DEFAULT_SKIP_WAL;
private static boolean fullEditInCommit = DEFAULT_COMMIT_EDIT;
private static boolean useCommitIdInCells = DEFAULT_TM_USE_COMMIT_ID_IN_CELLS;
private static MemoryMXBean memoryBean = null;
private static float memoryPercentage = 0;
private static boolean memoryThrottle = false;
private static boolean suppressOutOfOrderProtocolException = DEFAULT_SUPPRESS_OOP;
private Configuration config;
private static boolean checkRowBelongs = true;
// Transaction state defines
private static final int COMMIT_OK = 1;
private static final int COMMIT_OK_READ_ONLY = 2;
private static final int COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR = 3;
private static final int COMMIT_CONFLICT = 5;
private static final int COMMIT_RESEND = 6;
private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
private static final int MAX_COMMIT_PENDING_WAITS = 10;
private Thread ChoreThread = null;
private static Thread ChoreThread2 = null;
//private static Thread ScannerLeasesThread = null;
private static Thread TransactionalLeasesThread = null;
public static final int TS_ACTIVE = 0;
public static final int TS_COMMIT_REQUEST = 1;
public static final int TS_COMMIT = 2;
public static final int TS_ABORT = 3;
public static final int TS_CONTROL_POINT_COMMIT = 4;
public static final int TS_REGION_TX_ACTIVE = 5;
public static final int TS_REGION_TX_COMMIT_REQUEST = 6;
public static final int TS_REGION_TX_COMMIT = 7;
public static final int REGION_STATE_RECOVERING = 0;
public static final int REGION_STATE_START = 2;
public static final String trxkeyEPCPinstance = "EPCPinstance";
private static Method hdfsSetStoragePolicyMethod = null;
private static String hdfsSetStoragePolicyReflectErrorMsg = "";
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
static ChoreService s_ChoreService = null;
#endif
static int txnChoreServiceThreadPoolSize = 1;
public static final int DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE=5;
// TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict
static ConcurrentHashMap<String, Object> transactionsEPCPMap;
long choreCount = 1;
// TrxRegionService methods
@Override
public void abortTransaction(RpcController controller,
AbortTransactionRequest request,
RpcCallback<AbortTransactionResponse> done) {
AbortTransactionResponse response = AbortTransactionResponse.getDefaultInstance();
long transactionId = request.getTransactionId();
boolean dropTableRecorded = request.getDropTableRecorded();
boolean ignoreUnknownTransaction = request.getIgnoreUnknownTransactionException();
if (LOG.isTraceEnabled()) LOG.trace("abortTransaction - txId " + transactionId +
", dropTableRecoded " + dropTableRecorded + ", regionName " + m_regionDetails);
IOException ioe = null;
UnknownTransactionException ute = null;
Throwable t = null;
// Process in local memory
int participant = request.getParticipantNum();
try {
abortTransaction(transactionId, dropTableRecorded, ignoreUnknownTransaction);
} catch (UnknownTransactionException u) {
LOG.error("TrxRegionEndpoint coprocessor:abort - txId "
+ transactionId
+ " participant " + participant
+ ", Caught UnknownTransactionException after internal abortTransaction call - "
+ u.getMessage() + " "
+ stackTraceToString(u));
ute = u;
} catch (IOException e) {
LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant
+ ", Caught IOException after internal abortTransaction call - ", e);
ioe = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse.Builder abortTransactionResponseBuilder = AbortTransactionResponse.newBuilder();
abortTransactionResponseBuilder.setHasException(false);
if (t != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(t.toString());
}
if (ioe != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(ioe.toString());
}
if (ute != null)
{
abortTransactionResponseBuilder.setHasException(true);
abortTransactionResponseBuilder.setException(ute.toString());
}
AbortTransactionResponse aresponse = abortTransactionResponseBuilder.build();
done.run(aresponse);
}
@Override
public void abortTransactionMultiple(RpcController controller,
AbortTransactionMultipleRequest request,
RpcCallback<AbortTransactionMultipleResponse> done) {
AbortTransactionMultipleResponse response = AbortTransactionMultipleResponse.getDefaultInstance();
long transactionId = request.getTransactionId();
int i = 0;
int numOfRegion = request.getRegionNameCount();
String requestRegionName;
IOException ioe = null;
UnknownTransactionException ute = null;
Throwable t = null;
if (LOG.isTraceEnabled()) LOG.trace("abortMultiple - txId " + transactionId + " number of region is commitMultiple "
+ numOfRegion + ", master regionName " + m_regionName);
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse.Builder abortTransactionMultipleResponseBuilder = AbortTransactionMultipleResponse.newBuilder();
abortTransactionMultipleResponseBuilder.setHasException(false);
int participant = 0;
while (i < numOfRegion) {
requestRegionName = request.getRegionName(i).toStringUtf8();
abortTransactionMultipleResponseBuilder.addException(BatchException.EXCEPTION_OK.toString());
try {
participant = request.getParticipantNum();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint abortMultiple begins for region " + requestRegionName);
TrxRegionEndpoint regionEPCP = (TrxRegionEndpoint) transactionsEPCPMap.get(requestRegionName+trxkeyEPCPinstance);
if (regionEPCP == null) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint abortMultiple region NOT FOUND in EPCP map " + requestRegionName);
abortTransactionMultipleResponseBuilder.setHasException(true);
abortTransactionMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
regionEPCP.abortTransaction(transactionId);
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint abortMultiple ends");
// abortTransaction(transactionId);
} catch (UnknownTransactionException u) {
LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant + ", Caught UnknownTransactionException after internal abortTransaction call - " + u.getMessage() + " " + stackTraceToString(u));
ute = u;
} catch (IOException e) {
LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant
+ ", Caught IOException after internal abortTransaction call - ", e);
ioe = e;
}
if (t != null)
{
abortTransactionMultipleResponseBuilder.setHasException(true);
abortTransactionMultipleResponseBuilder.setException(i, t.toString());
}
if (ioe != null)
{
abortTransactionMultipleResponseBuilder.setHasException(true);
abortTransactionMultipleResponseBuilder.setException(i, ioe.toString());
}
if (ute != null)
{
abortTransactionMultipleResponseBuilder.setHasException(true);
abortTransactionMultipleResponseBuilder.setException(i, ute.toString());
}
i++; // move to next region
} // end of while-loop on all the regions in thecommitMultiple request
AbortTransactionMultipleResponse aresponse = abortTransactionMultipleResponseBuilder.build();
done.run(aresponse);
}
@Override
public void beginTransaction(RpcController controller,
BeginTransactionRequest request,
RpcCallback<BeginTransactionResponse> done) {
BeginTransactionResponse response = BeginTransactionResponse.getDefaultInstance();
Throwable t = null;
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - txId " + transactionId + ", regionName " + m_regionDetails);
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("beginTransaction - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("beginTransaction memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
else
{
try {
beginTransaction(transactionId, startId);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("beginTransaction - txId "
+ transactionId + ", Caught exception ", e);
t = e;
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.BeginTransactionResponse.Builder beginTransactionResponseBuilder = BeginTransactionResponse.newBuilder();
beginTransactionResponseBuilder.setHasException(false);
if (t != null)
{
beginTransactionResponseBuilder.setHasException(true);
beginTransactionResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
beginTransactionResponseBuilder.setHasException(true);
beginTransactionResponseBuilder.setException(mue.toString());
}
BeginTransactionResponse bresponse = beginTransactionResponseBuilder.build();
done.run(bresponse);
}
@Override
public void commit(RpcController controller,
CommitRequest request,
RpcCallback<CommitResponse> done) {
CommitResponse response = CommitResponse.getDefaultInstance();
Throwable t = null;
long transactionId = request.getTransactionId();
long commitId = request.getCommitId();
final int participantNum = request.getParticipantNum();
if (LOG.isDebugEnabled()) LOG.debug("commit - txId "
+ transactionId + ", commitId " + commitId + ", participantNum " + participantNum
+ ", region " + m_regionDetails);
// Process local memory
try {
commit(transactionId, commitId, participantNum, request.getIgnoreUnknownTransactionException());
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commit - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitResponse.Builder commitResponseBuilder = CommitResponse.newBuilder();
commitResponseBuilder.setHasException(false);
if (t != null)
{
commitResponseBuilder.setHasException(true);
commitResponseBuilder.setException(t.toString());
}
CommitResponse cresponse = commitResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitMultiple(RpcController controller,
CommitMultipleRequest request,
RpcCallback<CommitMultipleResponse> done) {
CommitMultipleResponse response = CommitMultipleResponse.getDefaultInstance();
Throwable t = null;
long transactionId = request.getTransactionId();
long commitId = request.getCommitId();
int i = 0;
int numOfRegion = request.getRegionNameCount();
String requestRegionName;
if (LOG.isTraceEnabled()) LOG.trace("commitMultiple - txId " + transactionId + " master regionName " + regionInfo.getRegionNameAsString());
if (LOG.isTraceEnabled()) LOG.trace("commitMultiple - txId " + transactionId + " number of region is commitMultiple " + numOfRegion);
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse.Builder commitMultipleResponseBuilder = CommitMultipleResponse.newBuilder();
commitMultipleResponseBuilder.setHasException(false);
while (i < numOfRegion) {
requestRegionName = request.getRegionName(i).toStringUtf8();
commitMultipleResponseBuilder.addException(BatchException.EXCEPTION_OK.toString());
try {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple begins for region " + requestRegionName);
TrxRegionEndpoint regionEPCP = (TrxRegionEndpoint) transactionsEPCPMap.get(requestRegionName+trxkeyEPCPinstance);
if (regionEPCP == null) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple region NOT FOUND in EPCP map " + requestRegionName);
commitMultipleResponseBuilder.setHasException(true);
commitMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
regionEPCP.commit(transactionId, commitId, i, request.getIgnoreUnknownTransactionException());
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple ends");
//commit(transactionId, request.getIgnoreUnknownTransactionException());
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commitMultiple - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
if (t != null)
{
commitMultipleResponseBuilder.setHasException(true);
commitMultipleResponseBuilder.setException(i, t.toString());
}
i++; // move to next region
} // end of while-loop on all the regions in thecommitMultiple request
CommitMultipleResponse cresponse = commitMultipleResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitIfPossible(RpcController controller,
CommitIfPossibleRequest request,
RpcCallback<CommitIfPossibleResponse> done) {
CommitIfPossibleResponse response = CommitIfPossibleResponse.getDefaultInstance();
boolean reply = false;
long transactionId = request.getTransactionId();
long commitId = request.getCommitId();
long startEpoch = request.getStartEpoch();
final int participantNum = request.getParticipantNum();
Throwable t = null;
// Process local memory
try {
if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId " + transactionId + ", regionName, " + m_regionDetails + "calling internal commitIfPossible");
reply = commitIfPossible(transactionId, startEpoch, commitId, participantNum);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commitIfPossible - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitIfPossibleResponse.Builder commitIfPossibleResponseBuilder = CommitIfPossibleResponse.newBuilder();
commitIfPossibleResponseBuilder.setHasException(false);
if (t != null)
{
commitIfPossibleResponseBuilder.setHasException(true);
commitIfPossibleResponseBuilder.setException(t.toString());
}
CommitIfPossibleResponse cresponse = commitIfPossibleResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitRequest(RpcController controller,
CommitRequestRequest request,
RpcCallback<CommitRequestResponse> done) {
CommitRequestResponse response = CommitRequestResponse.getDefaultInstance();
int status = 0;
IOException ioe = null;
UnknownTransactionException ute = null;
CommitConflictException cce = null;
Throwable t = null;
long transactionId = request.getTransactionId();
long startEpoch = request.getStartEpoch();
int participantNum = request.getParticipantNum();
boolean dropTableRecorded = request.getDropTableRecorded();
if (LOG.isTraceEnabled()) LOG.trace("commitRequest - txId "
+ transactionId + ", startEpoch " + startEpoch + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded +
", regionName " + m_regionDetails);
// Process local memory
try {
status = commitRequest(transactionId, startEpoch, participantNum, dropTableRecorded);
} catch (UnknownTransactionException u) {
LOG.info("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call in region " + m_regionDetails, u);
ute = u;
status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
} catch (CommitConflictException c) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught CommitConflictException after internal commitRequest call - "+ c.toString());
cce = c;
status = COMMIT_CONFLICT;
} catch (IOException e) {
LOG.error("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + " participant " + participantNum
+" , Caught IOException after internal commitRequest call - ", e);
ioe = e;
status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestResponse.Builder commitRequestResponseBuilder = CommitRequestResponse.newBuilder();
commitRequestResponseBuilder.setHasException(false);
if (t != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(t.toString());
}
if (cce != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(cce.toString());
}
if (ioe != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(ioe.toString());
}
if (ute != null)
{
commitRequestResponseBuilder.setHasException(true);
commitRequestResponseBuilder.setException(ute.toString());
}
commitRequestResponseBuilder.setResult(status);
CommitRequestResponse cresponse = commitRequestResponseBuilder.build();
done.run(cresponse);
}
@Override
public void commitRequestMultiple(RpcController controller,
CommitRequestMultipleRequest request,
RpcCallback<CommitRequestMultipleResponse> done) {
CommitRequestMultipleResponse response = CommitRequestMultipleResponse.getDefaultInstance();
int status = 0;
IOException ioe = null;
UnknownTransactionException ute = null;
Throwable t = null;
long transactionId = request.getTransactionId();
long startEpoch = request.getStartEpoch();
int i = 0;
int numOfRegion = request.getRegionNameCount();
String requestRegionName;
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequestMultiple - txId " + transactionId
+ " number of region is commitMultiple " + numOfRegion + ", master regionName " + regionInfo.getRegionNameAsString());
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse.Builder commitRequestMultipleResponseBuilder = CommitRequestMultipleResponse.newBuilder();
commitRequestMultipleResponseBuilder.setHasException(false);
while (i < numOfRegion) {
requestRegionName = request.getRegionName(i).toStringUtf8();
/*
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + Hex.encodeHexString(request.getRegionName(i).toStringUtf8().getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + regionInfo.getRegionNameAsString());
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + this.m_Region.getRegionName().toString());
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + Hex.encodeHexString(ByteString.copyFrom(this.m_Region.getRegionName()).toString().getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA1 Region Key " + Hex.encodeHexString(requestRegionName.getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA2 Region Key " + Hex.encodeHexString(regionInfo.getRegionNameAsString().getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA2 Region Key " + Hex.encodeHexString(this.m_Region.getRegionName()));
if (requestRegionName.equals(ByteString.copyFrom(this.m_Region.getRegionName()).toString())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB0 Region Key matches !! " + request.getRegionName(i).toString()); }
}
if (Arrays.equals(request.getRegionName(i).toStringUtf8().getBytes(), regionInfo.getRegionNameAsString().getBytes())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB1 Region Key matches !! " + request.getRegionName(i).toString()); }
}
if (request.getRegionName(i).toStringUtf8().equals(regionInfo.getRegionNameAsString())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB2 Region Key matches !! " + request.getRegionName(i).toString()); }
}
*/
commitRequestMultipleResponseBuilder.addException(BatchException.EXCEPTION_OK.toString());
try {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple begins for region " + requestRegionName);
TrxRegionEndpoint regionEPCP = (TrxRegionEndpoint) transactionsEPCPMap.get(requestRegionName+trxkeyEPCPinstance);
if (regionEPCP == null) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple region NOT FOUND in EPCP map " + requestRegionName);
commitRequestMultipleResponseBuilder.setHasException(true);
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, startEpoch, i, true);} // only the last region flush
else {status = regionEPCP.commitRequest(transactionId, startEpoch, i, false);}
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends");
//status = commitRequest(transactionId);
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("commitRequestMultiple - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
ute = u;
} catch (IOException e) {
LOG.error("commitRequestMultiple - txId " + transactionId
+ ", Caught IOException after internal commitRequest call - ", e);
ioe = e;
}
if (t != null)
{
commitRequestMultipleResponseBuilder.setHasException(true);
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_SKIPREMAININGREGIONS_OK.toString());
}
if (ioe != null)
{
commitRequestMultipleResponseBuilder.setHasException(true);
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_SKIPREMAININGREGIONS_OK.toString());
}
if (ute != null)
{
commitRequestMultipleResponseBuilder.setHasException(true);
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_SKIPREMAININGREGIONS_OK.toString());
}
commitRequestMultipleResponseBuilder.addResult(status);
i++; // move to next region
} // end of while-loop on all the regions in thecommitMultiple request
CommitRequestMultipleResponse cresponse = commitRequestMultipleResponseBuilder.build();
done.run(cresponse);
}
public void checkAndDeleteRegionTx(RpcController controller,
CheckAndDeleteRegionTxRequest request,
RpcCallback<CheckAndDeleteRegionTxResponse> done) {
CheckAndDeleteRegionTxResponse response = CheckAndDeleteRegionTxResponse.getDefaultInstance();
byte [] rowArray = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
boolean result = false;
long tid = request.getTid();
long commitId = request.getCommitId();
boolean autoCommit = request.getAutoCommit();
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRegionTxResponse.Builder checkAndDeleteRegionTxResponseBuilder = CheckAndDeleteRegionTxResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - performing memoryPercentage " + memoryPercentage
+ ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - performing memoryPercentage " + memoryPercentage
+ ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("checkAndDelete memory usage exceeds " + memoryUsageThreshold
+ " percent, tid is " + tid);
}
}
if (mue == null &&
type == MutationType.DELETE &&
proto.hasRow()){
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((delete != null) && (t == null)){
if (request.hasRow()) {
if (!Bytes.equals(proto.getRow().toByteArray(), request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"checkAndDeleteRegionTx row must match the passed row");
}
if (t == null) {
try {
result = checkAndDeleteRegionTx(tid, tid,
commitId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
delete,
autoCommit);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
}
checkAndDeleteRegionTxResponseBuilder.setResult(result);
}
}
else{
result = false;
checkAndDeleteRegionTxResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx - tid " + tid + ", result is " + result);
checkAndDeleteRegionTxResponseBuilder.setHasException(false);
if (t != null){
checkAndDeleteRegionTxResponseBuilder.setHasException(true);
checkAndDeleteRegionTxResponseBuilder.setException(t.toString());
}
if (mue != null){
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - performing memoryPercentage " + memoryPercentage
+ ", posting memory usage exceeds indicated percentage");
checkAndDeleteRegionTxResponseBuilder.setHasException(true);
checkAndDeleteRegionTxResponseBuilder.setException(mue.toString());
}
CheckAndDeleteRegionTxResponse checkAndDeleteRegionTxResponse = checkAndDeleteRegionTxResponseBuilder.build();
done.run(checkAndDeleteRegionTxResponse);
}
@Override
public void checkAndDelete(RpcController controller,
CheckAndDeleteRequest request,
RpcCallback<CheckAndDeleteResponse> done) {
CheckAndDeleteResponse response = CheckAndDeleteResponse.getDefaultInstance();
byte [] rowArray = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
boolean result = false;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
// First test if this region matches our region name
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse.Builder checkAndDeleteResponseBuilder = CheckAndDeleteResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("checkAndDelete memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
if (mue == null &&
type == MutationType.DELETE &&
proto.hasRow())
{
try {
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDelete - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((delete != null) && (t == null))
{
if (request.hasRow()) {
if (!Bytes.equals(proto.getRow().toByteArray(), request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"Delete row must match the passed row");
}
if (t == null) {
try {
result = checkAndDelete(transactionId,
startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
delete);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDelete - txId " + transactionId
+ ", Caught exception", e);
t = e;
}
}
checkAndDeleteResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndDeleteResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - txId " + transactionId + ", result is " + result);
checkAndDeleteResponseBuilder.setHasException(false);
if (t != null)
{
checkAndDeleteResponseBuilder.setHasException(true);
checkAndDeleteResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
checkAndDeleteResponseBuilder.setHasException(true);
checkAndDeleteResponseBuilder.setException(mue.toString());
}
CheckAndDeleteResponse checkAndDeleteResponse = checkAndDeleteResponseBuilder.build();
done.run(checkAndDeleteResponse);
}
@Override
public void checkAndPut(RpcController controller,
CheckAndPutRequest request,
RpcCallback<CheckAndPutResponse> done) {
CheckAndPutResponse response = CheckAndPutResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
MemoryUsageException mue = null;
Throwable t = null;
boolean result = false;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutResponse.Builder checkAndPutResponseBuilder = CheckAndPutResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("checkAndPut - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
mue = new MemoryUsageException("checkAndPut memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
}
}
if (mue == null &&
type == MutationType.PUT &&
proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndPut - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((put != null) && (t == null))
{
if (request.hasRow()) {
row = request.getRow();
if (!Bytes.equals(rowArray, request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"Put row must match the passed row");
}
if (t == null) {
if (request.hasRow())
row = request.getRow();
if (request.hasFamily())
family = request.getFamily();
if (request.hasQualifier())
qualifier = request.getQualifier();
if (request.hasValue())
value = request.getValue();
try {
result = checkAndPut(transactionId,
startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
put);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndPut - txId "
+ transactionId + ", Caught exception ", e);
t = e;
}
}
checkAndPutResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndPutResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - txId " + transactionId + ", result is " + result);
checkAndPutResponseBuilder.setHasException(false);
if (t != null)
{
checkAndPutResponseBuilder.setHasException(true);
checkAndPutResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
checkAndPutResponseBuilder.setHasException(true);
checkAndPutResponseBuilder.setException(mue.toString());
}
CheckAndPutResponse checkAndPutResponse = checkAndPutResponseBuilder.build();
done.run(checkAndPutResponse);
}
@Override
public void checkAndPutRegionTx(RpcController controller,
CheckAndPutRegionTxRequest request,
RpcCallback<CheckAndPutRegionTxResponse> done) {
CheckAndPutRegionTxResponse response = CheckAndPutRegionTxResponse.getDefaultInstance();
byte [] rowArray = null;
com.google.protobuf.ByteString row = null;
com.google.protobuf.ByteString family = null;
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
long tid = request.getTid();
long commitId = request.getCommitId();
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
boolean autoCommit = request.getAutoCommit();
Put put = null;
MemoryUsageException mue = null;
Throwable t = null;
boolean result = false;
long startId = tid;
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRegionTxResponse.Builder checkAndPutRegionTxResponseBuilder = CheckAndPutRegionTxResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("TrxRegionEndpoint coprocessor: checkAndPutRegionTx - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
mue = new MemoryUsageException("checkAndPutRegionTx memory usage exceeds " + memoryUsageThreshold + " percent, tid is " + tid);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPutRegionTx - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
}
}
if (mue == null &&
type == MutationType.PUT &&
proto.hasRow())
{
rowArray = proto.getRow().toByteArray();
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndPutRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((put != null) && (t == null))
{
if (request.hasRow()) {
row = request.getRow();
if (!Bytes.equals(rowArray, request.getRow().toByteArray()))
t = new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
"Put row must match the passed row");
}
if (t == null) {
if (request.hasRow())
row = request.getRow();
if (request.hasFamily())
family = request.getFamily();
if (request.hasQualifier())
qualifier = request.getQualifier();
if (request.hasValue())
value = request.getValue();
try {
result = checkAndPutRegionTx(tid,
startId, commitId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
request.getValue().toByteArray(),
put,
autoCommit);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndPutRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
}
checkAndPutRegionTxResponseBuilder.setResult(result);
}
}
else
{
result = false;
checkAndPutRegionTxResponseBuilder.setResult(result);
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPutRegionTx - tid " + tid + ", result is " + result);
checkAndPutRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
checkAndPutRegionTxResponseBuilder.setHasException(true);
checkAndPutRegionTxResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPutRegionTx - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
checkAndPutRegionTxResponseBuilder.setHasException(true);
checkAndPutRegionTxResponseBuilder.setException(mue.toString());
}
CheckAndPutRegionTxResponse checkAndPutRegionTxResponse = checkAndPutRegionTxResponseBuilder.build();
done.run(checkAndPutRegionTxResponse);
}
@Override
public void closeScanner(RpcController controller,
CloseScannerRequest request,
RpcCallback<CloseScannerResponse> done) {
RegionScanner scanner = null;
Throwable t = null;
OutOfOrderProtocolException oop = null;
Exception ce = null;
long transId = request.getTransactionId();
long scannerId = request.getScannerId();
if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", scanner id " + scannerId + ", regionName " + m_regionDetails);
// There should be a matching key in the transactionsById map
// associated with this transaction id. If there is not
// one, then the initial openScanner call for the transaction
// id was not called. This is a protocol error requiring
// openScanner, performScan followed by a closeScanner.
String key = getTransactionalUniqueId(transId);
boolean keyFound = transactionsById.containsKey(key);
if (keyFound != true)
{
if (LOG.isTraceEnabled()) LOG.trace("closeScanner - Unknown transaction [" + transId
+ "] in region [" + m_regionDetails
+ "], will create an OutOfOrderProtocol exception ");
oop = new OutOfOrderProtocolException("closeScanner does not have an active transaction with an open scanner, txId: " + transId);
}
if (oop == null) {
try {
// we want to allow closing scanners and remove operations up until the very end.
checkBlockAll(transId);
scanner = removeScanner(scannerId);
if (scanner != null) {
scanner.close();
}
else
if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", scanner was null for scanner id " + scannerId);
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
if (LOG.isTraceEnabled()) LOG.trace("closeScanner failed to get a lease " + scannerId);
}
*/
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
ce = e;
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CloseScannerResponse.Builder closeResponseBuilder = CloseScannerResponse.newBuilder();
closeResponseBuilder.setHasException(false);
if (t != null)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(t.toString());
}
if (ce != null)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(ce.toString());
}
if (oop != null)
{
if (this.suppressOutOfOrderProtocolException == false)
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(oop.toString());
LOG.warn("closeScanner - OutOfOrderProtocolExc, transaction was not found, txId: " + transId + ",returned exception" + ", region " + m_regionDetails);
}
else
LOG.warn("closeScanner - suppressing OutOfOrderProtocolExc, transaction was not found, txId: " + transId + ", region " + m_regionDetails);
}
CloseScannerResponse cresponse = closeResponseBuilder.build();
done.run(cresponse);
}
@Override
public void deleteMultiple(RpcController controller,
DeleteMultipleTransactionalRequest request,
RpcCallback<DeleteMultipleTransactionalResponse> done) {
DeleteMultipleTransactionalResponse response = DeleteMultipleTransactionalResponse.getDefaultInstance();
java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto> results;
results = request.getDeleteList();
int resultCount = request.getDeleteCount();
byte [] row = null;
Delete delete = null;
MutationType type;
Throwable t = null;
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("deleteMultiple memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
if (mue == null) {
for (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto proto : results)
{
delete = null;
if (proto != null)
{
type = proto.getMutateType();
if (type == MutationType.DELETE && proto.hasRow())
{
try {
checkBlockNonPhase2(transactionId); // throws IOException
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteMultiple - txId " + transactionId
+ ", Caught exception after protobuf conversion delete ", e);
t = e;
}
// Process in local memory
if ((delete != null) && (t == null))
{
try {
delete(transactionId, startId, delete);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteMultiple - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - txId " + transactionId + ", region " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
}
else
if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - txId " + transactionId + ", region " + m_regionDetails + ", delete proto was null");
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteMultipleTransactionalResponse.Builder deleteMultipleTransactionalResponseBuilder = DeleteMultipleTransactionalResponse.newBuilder();
deleteMultipleTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
deleteMultipleTransactionalResponseBuilder.setHasException(true);
deleteMultipleTransactionalResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteMultipleTransactionalResponseBuilder.setHasException(true);
deleteMultipleTransactionalResponseBuilder.setException(mue.toString());
}
DeleteMultipleTransactionalResponse dresponse = deleteMultipleTransactionalResponseBuilder.build();
done.run(dresponse);
}
public void deleteRegionTx(RpcController controller,
DeleteRegionTxRequest request,
RpcCallback<DeleteRegionTxResponse> done) {
DeleteRegionTxResponse response = DeleteRegionTxResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Boolean autoCommit = request.getAutoCommit();
Throwable t = null;
MemoryUsageException mue = null;
long tid = request.getTid();
long commitId = request.getCommitId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("deleteRegionTx - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("deleteRegionTx memory usage exceeds " + memoryUsageThreshold + " percent, tid is " + tid);
}
}
else
{
try {
checkBlockNonPhase2(tid); // throws IOException
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((delete != null) && (t == null)){
try {
deleteRegionTx(tid, tid, commitId, delete, autoCommit);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteRegionTx - tid " + tid
+ ", Caught exception after internal deleteRegionTx - ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - tid " + tid + ", regionName "
+ m_regionDetails + ", type " + type + ", row "
+ Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex "
+ Hex.encodeHexString(proto.getRow().toByteArray()));
}
else{
if (delete == null){
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - delete is null");
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteRegionTxResponse.Builder deleteRegionTxResponseBuilder = DeleteRegionTxResponse.newBuilder();
deleteRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
deleteRegionTxResponseBuilder.setHasException(true);
deleteRegionTxResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - performing memoryPercentage "
+ memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteRegionTxResponseBuilder.setHasException(true);
deleteRegionTxResponseBuilder.setException(mue.toString());
}
DeleteRegionTxResponse dresponse = deleteRegionTxResponseBuilder.build();
done.run(dresponse);
}
@Override
public void delete(RpcController controller,
DeleteTransactionalRequest request,
RpcCallback<DeleteTransactionalResponse> done) {
DeleteTransactionalResponse response = DeleteTransactionalResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getDelete();
MutationType type = proto.getMutateType();
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("delete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("delete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("delete memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
else
{
try {
checkBlockNonPhase2(transactionId); // throws IOException
delete = ProtobufUtil.toDelete(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("delete - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if ((delete != null) && (t == null))
{
try {
delete(transactionId, startId, delete);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("delete - txId " + transactionId
+ ", Caught exception after internal delete - ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("delete - txId " + transactionId + ", region " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse.Builder deleteTransactionalResponseBuilder = DeleteTransactionalResponse.newBuilder();
deleteTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
deleteTransactionalResponseBuilder.setHasException(true);
deleteTransactionalResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("delete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteTransactionalResponseBuilder.setHasException(true);
deleteTransactionalResponseBuilder.setException(mue.toString());
}
DeleteTransactionalResponse dresponse = deleteTransactionalResponseBuilder.build();
done.run(dresponse);
}
@Override
public void get(RpcController controller,
GetTransactionalRequest request,
RpcCallback<GetTransactionalResponse> done) {
GetTransactionalResponse response = GetTransactionalResponse.getDefaultInstance();
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get proto = request.getGet();
Get get = null;
RegionScanner scanner = null;
Throwable t = null;
Exception ge = null;
IOException gioe = null;
MemoryUsageException mue = null;
org.apache.hadoop.hbase.client.Result result2 = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
boolean exceptionThrown = false;
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true)
LOG.warn("get - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
else {
if (LOG.isTraceEnabled()) LOG.trace("get - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("get memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
exceptionThrown = true;
}
}
else
{
try {
checkBlockNonPhase2(transactionId); // throws IOException
get = ProtobufUtil.toGet(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("get - txId " + transactionId
+ ", Caught exception ", e);
t = e;
exceptionThrown = true;
}
if (exceptionThrown == false)
{
Scan scan = new Scan(get);
List<Cell> results = new ArrayList<Cell>();
try {
if (LOG.isTraceEnabled()) {
byte[] row = proto.getRow().toByteArray();
byte[] getrow = get.getRow();
String rowKey = Bytes.toString(row);
String getRowKey = Bytes.toString(getrow);
LOG.trace("get - txId " + transactionId +
", Calling getScanner for region " + m_regionDetails +
", row = " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) +
", getrow = " + Bytes.toStringBinary(getrow) + ", getrow in hex " + Hex.encodeHexString(getrow));
}
scanner = getScanner(transactionId, startId, scan);
if (scanner != null)
scanner.next(results);
result2 = Result.create(results);
if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", getScanner result2 isEmpty is "
+ result2.isEmpty()
+ ", row "
+ Bytes.toStringBinary(result2.getRow())
+ " result length: "
+ result2.size());
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
finally {
if (scanner != null) {
try {
scanner.close();
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
ge = e;
}
}
}
} // ExceptionThrown
} // End of MemoryUsageCheck
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalResponse.Builder getResponseBuilder = GetTransactionalResponse.newBuilder();
if (result2 != null)
{
getResponseBuilder.setResult(ProtobufUtil.toResult(result2));
}
else
{
if (t == null && ge == null)
gioe = new IOException("get - result2 was null");
if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", result2 was null ");
}
getResponseBuilder.setHasException(false);
if (t != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(t.toString());
}
if (ge != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(ge.toString());
}
if (gioe != null)
{
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(gioe.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("get - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(mue.toString());
}
GetTransactionalResponse gresponse = getResponseBuilder.build();
done.run(gresponse);
}
@Override
public void openScanner(RpcController controller,
OpenScannerRequest request,
RpcCallback<OpenScannerResponse> done) {
boolean hasMore = true;
RegionScanner scanner = null;
RegionScanner scannert = null;
Throwable t = null;
MemoryUsageException mue = null;
boolean exceptionThrown = false;
NullPointerException npe = null;
Exception ge = null;
IOException ioe = null;
LeaseStillHeldException lse = null;
Scan scan = null;
long scannerId = 0L;
boolean isLoadingCfsOnDemandSet = false;
long transId = request.getTransactionId();
long startId = request.getStartId();
if (LOG.isTraceEnabled()) LOG.trace("openScanner - ENTER txId " + transId + " in region " + m_regionDetails);
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("openScanner - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("openScanner - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
exceptionThrown = true;
mue = new MemoryUsageException("openScanner memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transId);
}
}
else
{
try {
scan = ProtobufUtil.toScan(request.getScan());
if (scan == null)
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scan was null");
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("openScanner - txId " + transId
+ ", Caught exception ", e);
t = e;
exceptionThrown = true;
}
if (!exceptionThrown) {
if (scan == null) {
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scan is null");
npe = new NullPointerException("openScanner - txId " + transId + ", scan is null ");
ioe = new IOException("Invalid arguments to openScanner", npe);
exceptionThrown = true;
}
else
{
try {
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
prepareScanner(scan);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("openScanner - txId " + transId
+ ", scan Caught exception ", e);
t = e;
exceptionThrown = true;
}
}
}
List<Cell> results = new ArrayList<Cell>();
if (!exceptionThrown) {
try {
scanner = getScanner(transId, startId, scan);
if (scanner != null) {
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", called getScanner, scanner is " + scanner);
// Add the scanner to the map
scannerId = addScanner(transId, scanner, this.m_Region);
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", called addScanner, scanner id " + scannerId + ", region " + m_regionDetails);
}
else
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", getScanner returned null, scanner id " + scannerId + ", region " + m_regionDetails);
} catch (LeaseStillHeldException llse) {
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
if (LOG.isTraceEnabled()) LOG.trace("getScanner failed to get a lease " + scannerId);
}
*/
LOG.error("openScanner - txId " + transId + ", getScanner Error opening scanner, ", llse);
exceptionThrown = true;
lse = llse;
} catch (IOException e) {
LOG.error("openScanner - txId " + transId + ", getScanner Error opening scanner, ", e);
exceptionThrown = true;
ioe = e;
}
}
if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scanner id " + scannerId + ", region " + m_regionDetails);
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.OpenScannerResponse.Builder openResponseBuilder = OpenScannerResponse.newBuilder();
openResponseBuilder.setScannerId(scannerId);
openResponseBuilder.setHasException(false);
if (t != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(t.toString());
}
if (ioe != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(ioe.toString());
}
if (lse != null)
{
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(lse.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("openScanner - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(mue.toString());
}
OpenScannerResponse oresponse = openResponseBuilder.build();
done.run(oresponse);
}
@Override
public void performScan(RpcController controller,
PerformScanRequest request,
RpcCallback<PerformScanResponse> done) {
boolean hasMore = true;
RegionScanner scanner = null;
Throwable t = null;
ScannerTimeoutException ste = null;
OutOfOrderProtocolException oop = null;
OutOfOrderScannerNextException ooo = null;
UnknownScannerException use = null;
MemoryUsageException mue = null;
Exception ne = null;
Scan scan = null;
List<Cell> cellResults = new ArrayList<Cell>();
List<Result> results = new ArrayList<Result>();
org.apache.hadoop.hbase.client.Result result = null;
long scannerId = request.getScannerId();
long transId = request.getTransactionId();
long startId = request.getStartId();
int numberOfRows = request.getNumberOfRows();
boolean closeScanner = request.getCloseScanner();
long nextCallSeq = request.getNextCallSeq();
long count = 0L;
boolean shouldContinue = true;
TransactionalRegionScannerHolder rsh = null;
boolean exceptionThrown = false;
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", numberOfRows " + numberOfRows
+ ", nextCallSeq " + nextCallSeq + ", closeScanner is " + closeScanner + ", region is " + m_regionDetails);
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("performScan - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("performScan - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("performScan memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transId);
}
}
else
{
// There should be a matching key in the transactionsById map
// associated with this transaction id. If there is not
// one, then the initial openScanner call for the transaction
// id was not called. This is a protocol error requiring
// openScanner, performScan followed by a closeScanner.
String key = getTransactionalUniqueId(transId);
boolean keyFound = transactionsById.containsKey(key);
if (keyFound != true)
{
if (LOG.isTraceEnabled()) LOG.trace("performScan - Unknown transaction [" + transId
+ "] in region [" + m_regionDetails
+ "], will create an OutOfOrderProtocol exception ");
oop = new OutOfOrderProtocolException("performScan does not have an active transaction with an open scanner, txId: " + transId);
}
if (oop == null) {
try {
scanner = getScanner(scannerId, nextCallSeq);
if (scanner != null)
{
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", scanner is not null");
while (shouldContinue) {
hasMore = scanner.next(cellResults);
result = Result.create(cellResults);
cellResults.clear();
if (!result.isEmpty()) {
results.add(result);
count++;
}
if (count == numberOfRows || !hasMore)
shouldContinue = false;
}
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", row count is " + count
+ ", hasMore is " + hasMore + ", result " + result.isEmpty() + " region " + m_regionDetails);
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + transId + ", scanner is null");
}
} catch(OutOfOrderScannerNextException ooone) {
LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught OutOfOrderScannerNextException " + ooone.getMessage() + " " + stackTraceToString(ooone));
ooo = ooone;
exceptionThrown = true;
} catch(ScannerTimeoutException cste) {
LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught ScannerTimeoutException " + cste.getMessage() + " " + stackTraceToString(cste));
ste = cste;
exceptionThrown = true;
} catch(Throwable e) {
LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught throwable exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
exceptionThrown = true;
}
finally {
if (scanner != null) {
try {
if (closeScanner) {
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", close scanner was true, closing the scanner" + ", closeScanner is " + closeScanner + ", region is " + m_regionDetails);
removeScanner(scannerId);
scanner.close();
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
if (LOG.isTraceEnabled()) LOG.trace("performScan failed to get a lease " + scannerId);
}
*/
}
} catch(Exception e) {
LOG.error("performScan - transaction id " + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
ne = e;
exceptionThrown = true;
}
}
}
if (exceptionThrown == false)
{
rsh = scanners.get(scannerId);
nextCallSeq++;
if (rsh == null)
{
if (LOG.isTraceEnabled()) LOG.trace("performScan rsh is null");
use = new UnknownScannerException(
"ScannerId: " + scannerId + ", already closed?");
}
else
{
rsh.nextCallSeq = nextCallSeq;
if (rsh == null)
{
if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", performScan rsh is null, UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + nextCallSeq + ", for region " + m_regionDetails);
use = new UnknownScannerException(
"ScannerId: " + scannerId + ", was scanner already closed?, transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region " + m_regionDetails);
}
else
{
rsh.nextCallSeq = nextCallSeq;
if (LOG.isTraceEnabled()) LOG.trace("performScan exit - txId " + transId + ", scanner id " + scannerId + " row count " + count + ", region " + m_regionDetails
+ ", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq + ", close scanner is " + closeScanner);
}
}
}
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanResponse.Builder performResponseBuilder = PerformScanResponse.newBuilder();
performResponseBuilder.setHasMore(hasMore);
performResponseBuilder.setNextCallSeq(nextCallSeq);
performResponseBuilder.setCount(count);
performResponseBuilder.setHasException(false);
if (results != null)
{
if (!results.isEmpty()) {
for (Result r: results) {
performResponseBuilder.addResult(ProtobufUtil.toResult(r));
}
}
}
if (t != null)
{
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(t.toString());
}
if (ste != null)
{
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ste.toString());
}
if (ne != null)
{
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ne.toString());
}
if (ooo != null)
{
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(ooo.toString());
}
if (use != null)
{
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(use.toString());
}
if (oop != null)
{
performResponseBuilder.setHasMore(false);
if (this.suppressOutOfOrderProtocolException == false)
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(oop.toString());
LOG.warn("performScan - OutOfOrderProtocolExc, transaction was not found, txId: " + transId + ", return exception" + ", region " + m_regionDetails);
}
else
LOG.warn("performScan - suppressing OutOfOrderProtocolExc, transaction was not found, txId: " + transId + ", region " + m_regionDetails);
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("performScan - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(mue.toString());
}
PerformScanResponse presponse = performResponseBuilder.build();
done.run(presponse);
}
public void deleteTlogEntries(RpcController controller,
TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) {
boolean hasMore = true;
InternalScanner scanner = null;
Throwable t = null;
ScannerTimeoutException ste = null;
OutOfOrderProtocolException oop = null;
OutOfOrderScannerNextException ooo = null;
UnknownScannerException use = null;
MemoryUsageException mue = null;
Exception ne = null;
Scan scan = null;
List<Cell> cellResults = new ArrayList<Cell>();
org.apache.hadoop.hbase.client.Result result = null;
long lvAsn = request.getAuditSeqNum();
boolean lvAgeCommitted = request.getAgeCommitted();
try{
scan = ProtobufUtil.toScan(request.getScan());
// use an internal scanner to perform scanning.
scanner = m_Region.getScanner(scan);
}
catch (Exception e){
if (LOG.isErrorEnabled()) LOG.error("deleteTlogEntries Exception in region: "
+ m_regionDetails + " getting scanner ", e );
}
long count = 0L;
boolean shouldContinue = true;
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries ENTRY. Records older than " + lvAsn
+ " will be deleted in region: " + m_regionDetails);
try {
if (scanner != null){
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - scanner is not null");
while (shouldContinue) {
hasMore = scanner.next(cellResults);
result = Result.create(cellResults);
if (!result.isEmpty()) {
for (Cell cell : result.rawCells()) {
String valueString = new String(CellUtil.cloneValue(cell));
StringTokenizer st = new StringTokenizer(valueString, ",");
if (st.hasMoreElements()) {
String asnToken = st.nextElement().toString();
String transidToken = st.nextElement().toString();
String stateToken = st.nextElement().toString();
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries transidToken: "
+ transidToken + " asnToken: " + asnToken);
if (Long.parseLong(asnToken) < lvAsn) {
if ( (stateToken.contains(TransState.STATE_FORGOTTEN.toString())) ||
(stateToken.equals(TransState.STATE_COMMITTED.toString()) && (lvAgeCommitted)) ||
(stateToken.equals(TransState.STATE_ABORTED.toString()) && (lvAgeCommitted))) {
if (LOG.isTraceEnabled()) LOG.trace("Deleting transid: " + transidToken
+ " from region: " + m_regionDetails + " with state: " + stateToken);
try {
Delete d = new Delete(result.getRow());
d.setDurability(Durability.SKIP_WAL);
m_Region.delete(d);
}
catch (Exception e) {
LOG.warn("deleteTlogEntries -"
+ " txId " + transidToken + ", Executing delete caught an exception ", e);
throw new IOException("deleteTlogEntries -"
+ " txId " + transidToken + ", Executing delete caught an exception " + e.toString());
}
count++;
}
} else {
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Ending scan at asn: " + asnToken
+ ", transid: " + transidToken +
" because it is not less than the comparator: " + lvAsn +
" in region: " + m_regionDetails);
shouldContinue = false;
break;
}
} // if (st.hasMoreElements()
} // for (Cell cell : result.rawCells()
} // if (!result.isEmpty()
cellResults.clear();
if (!hasMore){
shouldContinue = false;
}
} // while (shouldContinue)
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - count is " + count + ", hasMore is " + hasMore
+ ", result " + result.isEmpty());
}
else {
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - scanner is null");
}
} catch(OutOfOrderScannerNextException ooone) {
LOG.warn("deleteTlogEntries - Caught OutOfOrderScannerNextException ", ooone);
ooo = ooone;
} catch(ScannerTimeoutException cste) {
LOG.warn("deleteTlogEntries - Caught ScannerTimeoutException ", cste);
ste = cste;
} catch(Throwable e) {
LOG.warn("deleteTlogEntries - Caught throwable exception ", e);
t = e;
} finally {
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - closing the scanner, region is " + m_regionDetails);
try{
scanner.close();
}
catch(IOException ioe){
LOG.warn("deleteTlogEntries - exception closing the scanner ", ioe);
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse.Builder deleteResponseBuilder = TlogDeleteResponse.newBuilder();
deleteResponseBuilder.setCount(count);
deleteResponseBuilder.setHasException(false);
if (t != null){
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(t.toString());
}
if (ste != null){
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(ste.toString());
}
if (ne != null){
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(ne.toString());
}
if (ooo != null){
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(ooo.toString());
}
if (use != null){
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(use.toString());
}
if (mue != null){
if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - performing memoryPercentage "
+ memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteResponseBuilder.setHasException(true);
deleteResponseBuilder.setException(mue.toString());
}
TlogDeleteResponse TlogDel_response = deleteResponseBuilder.build();
done.run(TlogDel_response);
}
public void setStoragePolicy(RpcController controller,
TrafSetStoragePolicyRequest request,
RpcCallback<TrafSetStoragePolicyResponse> done) {
String path = request.getPath();
String policy = request.getPolicy();
if (LOG.isTraceEnabled()) LOG.trace("setStoragePolicy ENTRY. path " + path + " policy " + policy );
IOException t=null;
try {
invokeSetStoragePolicy(fs, path, policy);
}
catch (IOException e) {
t = e;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse.Builder setStoragePolicyResponseBuilder =
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse.newBuilder();
if(t != null)
{
LOG.error("setStoragePolicy error : " + t.toString() );
setStoragePolicyResponseBuilder.setStatus(false);
setStoragePolicyResponseBuilder.setException(t.toString());
}
else
{
setStoragePolicyResponseBuilder.setStatus(true);
setStoragePolicyResponseBuilder.setException("");
}
TrafSetStoragePolicyResponse resp = setStoragePolicyResponseBuilder.build();
done.run(resp);
}
private static void invokeSetStoragePolicy(final FileSystem fs, final String pathstr,
final String storagePolicy)
throws IOException {
Path path = new Path(pathstr);
if(hdfsSetStoragePolicyMethod == null)
throw new IOException(hdfsSetStoragePolicyReflectErrorMsg);
if (hdfsSetStoragePolicyMethod != null) {
try {
hdfsSetStoragePolicyMethod.invoke(fs, path, storagePolicy);
if (LOG.isDebugEnabled()) {
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
}
} catch (Exception e) {
LOG.error("invoke set storage policy error : " , e);
throw new IOException(e);
}
}
}
public void getTransactionStatesPriorToAsn(RpcController controller,
TlogTransactionStatesFromIntervalRequest request,
RpcCallback<TlogTransactionStatesFromIntervalResponse> done) {
boolean hasMore = true;
InternalScanner scanner = null;
Throwable t = null;
ScannerTimeoutException ste = null;
OutOfOrderScannerNextException ooo = null;
UnknownScannerException use = null;
MemoryUsageException mue = null;
Exception ne = null;
Scan scan = null;
List<Cell> cellResults = new ArrayList<Cell>();
List<Result> results = new ArrayList<Result>();
org.apache.hadoop.hbase.client.Result result = null;
long lvAsn = request.getAuditSeqNum();
long count = 0L;
boolean shouldContinue = true;
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn entries older than " + lvAsn + " will be returned, region is " + m_regionDetails);
try{
scan = ProtobufUtil.toScan(request.getScan());
// use an internal scanner to perform scanning.
scanner = m_Region.getScanner(scan);
}
catch (Exception e){
if (LOG.isErrorEnabled()) LOG.error("getTransactionStatesPriorToAsn Exception in region: "
+ m_regionDetails + " getting scanner ", e );
}
try {
if (scanner != null){
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - scanner is not null");
while (shouldContinue) {
hasMore = scanner.next(cellResults);
result = Result.create(cellResults);
if (!result.isEmpty()) {
for (Cell cell : result.rawCells()) {
String valueString = new String(CellUtil.cloneValue(cell));
StringTokenizer st = new StringTokenizer(valueString, ",");
if (st.hasMoreElements()) {
String asnToken = st.nextElement().toString() ;
String transidToken = st.nextElement().toString() ;
String stateToken = st.nextElement().toString() ;
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn Transid: " + transidToken + " has state: " + stateToken + " and ASN " + asnToken);
if (Long.parseLong(asnToken) < lvAsn) {
if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to result list");
results.add(result);
count++;
}
} // if (st.hasMoreElements()
} // for (Cell cell : result.rawCells()
} // if (!result.isEmpty()
cellResults.clear();
if (!hasMore){
shouldContinue = false;
}
} // while (shouldContinue)
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - count is " + count + ", hasMore is " + hasMore
+ ", result " + result.isEmpty());
}
else {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - scanner is null");
}
} catch(OutOfOrderScannerNextException ooone) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - Caught OutOfOrderScannerNextException "
+ ooone.getMessage() + " " + stackTraceToString(ooone));
ooo = ooone;
} catch(ScannerTimeoutException cste) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - Caught ScannerTimeoutException "
+ cste.getMessage() + " " + stackTraceToString(cste));
ste = cste;
} catch(Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - Caught throwable exception "
+ e.getMessage() + " " + stackTraceToString(e));
t = e;
}
finally {
if (scanner != null) {
try {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - close scanner in region " + m_regionDetails);
scanner.close();
} catch(Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
ne = e;
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogTransactionStatesFromIntervalResponse.Builder statesFromIntervalResponseBuilder = TlogTransactionStatesFromIntervalResponse.newBuilder();
statesFromIntervalResponseBuilder.setCount(count);
statesFromIntervalResponseBuilder.setHasException(false);
if (results != null){
if (!results.isEmpty()) {
for (Result r: results) {
statesFromIntervalResponseBuilder.addResult(ProtobufUtil.toResult(r));
}
}
}
if (t != null){
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(t.toString());
}
if (ste != null){
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(ste.toString());
}
if (ne != null){
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(ne.toString());
}
if (ooo != null){
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(ooo.toString());
}
if (use != null){
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(use.toString());
}
if (mue != null){
if (LOG.isTraceEnabled()) LOG.trace("getTransactionStatesPriorToAsn - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
statesFromIntervalResponseBuilder.setHasException(true);
statesFromIntervalResponseBuilder.setException(mue.toString());
}
TlogTransactionStatesFromIntervalResponse sfi_response = statesFromIntervalResponseBuilder.build();
done.run(sfi_response);
}
public void putTlog(RpcController controller, TlogWriteRequest request, RpcCallback<TlogWriteResponse> done) {
if (LOG.isTraceEnabled()) LOG.trace("putTlog - ENTRY ");
TlogWriteResponse response = TlogWriteResponse.getDefaultInstance();
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
Throwable t = null;
long transactionId = request.getTransactionId();
long commitId = request.getCommitId();
boolean forced = request.getForced();
boolean result;
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putTlog - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
// Process in local memory
if (put != null){
if (t == null) {
if (! forced){
put.setDurability(Durability.ASYNC_WAL);
}
try {
if (LOG.isTraceEnabled()) LOG.trace("putTlog - putting row " + put);
result = putTlog(transactionId, put);
}catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putTlog - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
if (LOG.isTraceEnabled())
LOG.trace("putTlog - txId " + transactionId
+ ", region " + m_regionDetails + ", type " + type + ", row "
+ Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex "
+ Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogWriteResponse.Builder TlogWriteResponseBuilder = TlogWriteResponse.newBuilder();
TlogWriteResponseBuilder.setHasException(false);
if (t != null)
{
TlogWriteResponseBuilder.setHasException(true);
TlogWriteResponseBuilder.setException(t.toString());
}
TlogWriteResponse tlwresponse = TlogWriteResponseBuilder.build();
done.run(tlwresponse);
}
/**
* Processes a transactional putTlog
* @param long transactionId
* @param Put put
* @return boolean
* @throws IOException
*/
public boolean putTlog(final long transactionId, Put put) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint putTlog, txid: "
+ transactionId + ", on HRegion " + this);
boolean result = false;
try {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint putTlog putting " + put.getRow().toString());
m_Region.put(put);
result = true;
} catch (Exception e) {
if (LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint putTlog - txid " + transactionId + ", Caught internal exception " + e.toString() + ", returning false");
throw new IOException("TrxRegionEndpoint putTlog - ", e);
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint putTlog EXIT - returns " + result + ", transId " + transactionId);
return result;
}
@Override
public void put(RpcController controller,
PutTransactionalRequest request,
RpcCallback<PutTransactionalResponse> done) {
PutTransactionalResponse response = PutTransactionalResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
Throwable t = null;
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("put - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("put - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("put memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
else
{
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("put - txId " + transactionId
+ ", Caught exception " , e);
t = e;
}
if ((mue == null && type == MutationType.PUT && proto.hasRow()) && (put != null))
{
// Process in local memory
try {
put(transactionId, startId, put);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("put - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("put - txId " + transactionId + ", regionName " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("put - txId " + transactionId + ", regionName " + m_regionDetails + "- no valid PUT type or does not contain a row");
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse.Builder putTransactionalResponseBuilder = PutTransactionalResponse.newBuilder();
putTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
putTransactionalResponseBuilder.setHasException(true);
putTransactionalResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("put - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
putTransactionalResponseBuilder.setHasException(true);
putTransactionalResponseBuilder.setException(mue.toString());
}
PutTransactionalResponse presponse = putTransactionalResponseBuilder.build();
done.run(presponse);
}
@Override
public void putMultiple(RpcController controller,
PutMultipleTransactionalRequest request,
RpcCallback<PutMultipleTransactionalResponse> done) {
PutMultipleTransactionalResponse response = PutMultipleTransactionalResponse.getDefaultInstance();
java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto> results;
results = request.getPutList();
int resultCount = request.getPutCount();
byte [] row = null;
Put put = null;
MutationType type;
Throwable t = null;
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("putMultiple - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("putMultiple - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("putMultiple memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
else
{
for (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto proto : results){
put = null;
if (proto != null){
type = proto.getMutateType();
if (type == MutationType.PUT && proto.hasRow()){
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putMultiple - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
// Process in local memory
if (put != null){
try {
put(transactionId, startId, put);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putMultiple - txId " + transactionId
+ ", Caught exception after internal put - ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("putMultiple - txId " + transactionId + ", region " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
}
else
if (LOG.isTraceEnabled()) LOG.trace("putMultiple - txId " + transactionId + ", region " + m_regionDetails + ", put proto was null");
}
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse.Builder putMultipleTransactionalResponseBuilder = PutMultipleTransactionalResponse.newBuilder();
putMultipleTransactionalResponseBuilder.setHasException(false);
if (t != null)
{
putMultipleTransactionalResponseBuilder.setHasException(true);
putMultipleTransactionalResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("putMultiple - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
putMultipleTransactionalResponseBuilder.setHasException(true);
putMultipleTransactionalResponseBuilder.setException(mue.toString());
}
PutMultipleTransactionalResponse pmresponse = putMultipleTransactionalResponseBuilder.build();
done.run(pmresponse);
}
@Override
public void recoveryRequest(RpcController controller,
RecoveryRequestRequest request,
RpcCallback<RecoveryRequestResponse> done) {
int tmId = request.getTmId();
Throwable t = null;
long transactionId = request.getTransactionId();
if (reconstructIndoubts == 0) {
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest - txId " + transactionId + ", RECOV");
constructIndoubtTransactions();
}
// Placeholder for real work when recovery is added
if (LOG.isInfoEnabled()) LOG.info("recoveryRequest - txId " + transactionId + ", region " + m_regionDetails + ", tmId" + tmId);
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse.Builder recoveryResponseBuilder = RecoveryRequestResponse.newBuilder();
List<Long> indoubtTransactions = new ArrayList<Long>();
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest Trafodion Recovery: region " + regionInfo.getEncodedName() + " receives recovery request from TM " + tmId + " with region state " + regionState);
switch(regionState) {
case REGION_STATE_RECOVERING: // RECOVERING, already create a list of in-doubt txn, but still in the state of resolving them,
// retrieve all in-doubt txn from rmid and return them into a long a
if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV:recoveryRequest in region starting " + regionInfo.getEncodedName() + " has in-doubt transaction " + indoubtTransactionsById.size());
for (Entry<Long, List<WALEdit>> entry : indoubtTransactionsById.entrySet()) {
long tid = entry.getKey();
int nodeid = (int) TransactionState.getNodeId(tid);
boolean add = false;
if (nodeid == tmId) add = true; // match TM
if (add) {
indoubtTransactions.add(tid);
if (LOG.isTraceEnabled()) LOG.info("recoveryRequest - txId " + transactionId + ", Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction "
+ tid + " has been added into the recovery reply to Node " + nodeid + " TM " + tmId + " during recovery ");
}
}
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest " + indoubtTransactions.size());
if (indoubtTransactions.size() == 0) {
String lv_encoded = m_Region.getRegionInfo().getEncodedName();
try {
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest - Trafodion Recovery: delete recovery zNode TM "
+ tmId + " region encoded name " + regionInfo.getEncodedName() + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmId, regionInfo.getEncodedName());
} catch (IOException e) {
LOG.error("recoveryRequest - Trafodion Recovery: delete recovery zNode failed ", e);
}
}
break;
case REGION_STATE_START: // START
List<TrxTransactionState> commitPendingCopy = new ArrayList<TrxTransactionState>(commitPendingTransactions);
if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV:recoveryRequest in region started" + regionInfo.getEncodedName()
+ " has in-doubt transaction " + commitPendingCopy.size());
for (TrxTransactionState commitPendingTS : commitPendingCopy) {
long tid = commitPendingTS.getTransactionId();
if ((int) commitPendingTS.getNodeId() == tmId) {
indoubtTransactions.add(tid);
if (LOG.isInfoEnabled()) LOG.info("recoveryRequest - Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction " + tid + " has been added into the recovery reply to TM " + tmId + " during start ");
}
}
// now remove the ZK node after TM has initiated the recovery request
try {
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest - Trafodion Recovery: delete recovery zNode TM "
+ tmId + " region encoded name " + regionInfo.getEncodedName() + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmId, regionInfo.getEncodedName());
} catch (IOException e) {
LOG.error("recoveryRequest - Trafodion Recovery: delete recovery zNode failed ", e);
}
break;
default:
LOG.error("Trafodion Recovery: encounter incorrect region state " + regionState);
break;
}
// Placeholder response forced to zero for now
for (Long transactionInDoubtId:indoubtTransactions) {
recoveryResponseBuilder.addResult(transactionInDoubtId);
}
// Placeholder response forced to zero for now
recoveryResponseBuilder.setHasException(false);
if (t != null)
{
recoveryResponseBuilder.setHasException(true);
recoveryResponseBuilder.setException(t.toString());
}
RecoveryRequestResponse rresponse = recoveryResponseBuilder.build();
done.run(rresponse);
}
/**
* Gives the maximum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, maximum value for the
* entire column family will be returned.
*/
@Override
public void getMax(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
RegionScanner scanner = null;
TransactionalAggregateResponse response = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
T max = null;
try {
checkBlockNonPhase2(transactionId); // throws IOException
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = getScanner(transactionId, startId, scan);
List<Cell> results = new ArrayList<Cell>();
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
// qualifier can be null.
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
}
results.clear();
} while (hasMoreRows);
if (max != null) {
TransactionalAggregateResponse.Builder builder = TransactionalAggregateResponse.newBuilder();
builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
response = builder.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("getMax - txId " + transactionId + ", Maximum from this region is "
+ m_regionDetails + ": " + max);
done.run(response);
}
/**
* Gives the minimum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, minimum value for the
* entire column family will be returned.
*/
@Override
public void getMin(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
TransactionalAggregateResponse response = null;
RegionScanner scanner = null;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
T min = null;
try {
checkBlockNonPhase2(transactionId); // throws IOException
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = getScanner(transactionId, startId, scan);
List<Cell> results = new ArrayList<Cell>();
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
}
results.clear();
} while (hasMoreRows);
if (min != null) {
response = TransactionalAggregateResponse.newBuilder().addFirstPart(
ci.getProtoForCellType(min).toByteString()).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("getMin - txId " + transactionId + ", Minimum from this region is "
+ m_regionDetails + ": " + min);
done.run(response);
}
/**
* Gives the sum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, sum for the entire column
* family will be returned.
*/
@Override
public void getSum(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
TransactionalAggregateResponse response = null;
RegionScanner scanner = null;
long sum = 0l;
long transactionId = request.getTransactionId();
long startId = request.getStartId();
try {
checkBlockNonPhase2(transactionId); // throws IOException
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
if (temp != null)
sumVal = ci.add(sumVal, ci.castToReturnType(temp));
}
results.clear();
} while (hasMoreRows);
if (sumVal != null) {
response = TransactionalAggregateResponse.newBuilder().addFirstPart(
ci.getProtoForPromotedType(sumVal).toByteString()).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isTraceEnabled()) LOG.trace("getSum - txId " + transactionId + ", Sum from this region is "
+ m_regionDetails + ": " + sum);
done.run(response);
}
/**
* Gives the row count for the given column family and column qualifier, in
* the given row range as defined in the Scan object.
* @throws IOException
*/
@Override
public void getRowNum(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
TransactionalAggregateResponse response = null;
long counter = 0L;
List<Cell> results = new ArrayList<Cell>();
RegionScanner scanner = null;
long transactionId = 0L;
long startId = 0L;
try {
checkBlockNonPhase2(transactionId); // throws IOException
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[][] colFamilies = scan.getFamilies();
byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
NavigableSet<byte[]> qualifiers = colFamilies != null ?
scan.getFamilyMap().get(colFamily) : null;
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
transactionId = request.getTransactionId();
startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;
}
results.clear();
} while (hasMoreRows);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
bb.rewind();
response = TransactionalAggregateResponse.newBuilder().addFirstPart(
ByteString.copyFrom(bb)).build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
if (LOG.isInfoEnabled()) LOG.info(String.format("Row counter for txId %d from this region: %s is %d, startKey is [%s], endKey is [%s]",
transactionId, m_regionDetails, counter,
regionInfo.getStartKey() == null ? "null" : Bytes.toStringBinary(regionInfo.getStartKey()),
regionInfo.getEndKey() == null ? "null" : Bytes.toStringBinary(regionInfo.getEndKey())));
done.run(response);
}
/**
* Gives a Pair with first object as Sum and second object as row count,
* computed for a given combination of column qualifier and column family in
* the given row range as defined in the Scan object. In its current
* implementation, it takes one column family and one column qualifier (if
* provided). In case of null column qualifier, an aggregate sum over all the
* entire column family will be returned.
* <p>
* The average is computed in
* AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
* processing results from all regions, so its "ok" to pass sum and a Long
* type.
*/
@Override
public void getAvg(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
TransactionalAggregateResponse response = null;
RegionScanner scanner = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0l;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
results.clear();
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
TransactionalAggregateResponse.Builder pair = TransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Gives a Pair with first object a List containing Sum and sum of squares,
* and the second object as row count. It is computed for a given combination of
* column qualifier and column family in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* one column qualifier (if provided). The idea is get the value of variance first:
* the average of the squares less the square of the average a standard
* deviation is square root of variance.
*/
@Override
public void getStd(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
RegionScanner scanner = null;
TransactionalAggregateResponse response = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0l;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
tempVal = null;
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
TransactionalAggregateResponse.Builder pair = TransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumSqVal);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Gives a List containing sum of values and sum of weights.
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
*/
@Override
public void getMedian(RpcController controller, TransactionalAggregateRequest request,
RpcCallback<TransactionalAggregateResponse> done) {
TransactionalAggregateResponse response = null;
RegionScanner scanner = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
Scan scan = ProtobufUtil.toScan(request.getScan());
long transactionId = request.getTransactionId();
long startId = request.getStartId();
scanner = getScanner(transactionId, startId, scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[] valQualifier = null, weightQualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
valQualifier = qualifiers.pollFirst();
// if weighted median is requested, get qualifier for the weight column
weightQualifier = qualifiers.pollLast();
}
List<Cell> results = new ArrayList<Cell>();
boolean hasMoreRows = false;
do {
tempVal = null;
tempWeight = null;
hasMoreRows = scanner.next(results);
for (Cell kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
valQualifier, kv)));
if (weightQualifier != null) {
tempWeight = ci.add(tempWeight,
ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
}
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumWeights = ci.add(sumWeights, tempWeight);
} while (hasMoreRows);
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
TransactionalAggregateResponse.Builder pair = TransactionalAggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumWeights);
response = pair.build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
@SuppressWarnings("unchecked")
ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
TransactionalAggregateRequest request) throws IOException {
String className = request.getInterpreterClassName();
Class<?> cls;
try {
cls = Class.forName(className);
ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
if (request.hasInterpreterSpecificBytes()) {
ByteString b = request.getInterpreterSpecificBytes();
P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
ci.initialize(initMsg);
}
return ci;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
@Override
public Service getService() {
return this;
}
static public ConcurrentHashMap<String, Object> getRegionMap() {
if (transactionsEPCPMap == null) {
transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
}
return transactionsEPCPMap;
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
* from the region where this coprocessor is loaded.
* Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (transactionsEPCPMap == null)
transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("start - Must be loaded on a table region!");
}
if (LOG.isTraceEnabled()) LOG.trace("start");
RegionCoprocessorEnvironment tmp_env =
(RegionCoprocessorEnvironment)env;
#ifdef CDH5.7 APACHE1.2
this.m_Region = tmp_env.getRegion();
#else
this.m_Region = (HRegion) tmp_env.getRegion();
#endif
this.regionInfo = this.m_Region.getRegionInfo();
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
this.config = tmp_env.getConfiguration();
this.fs = FileSystem.get(config);
try {
hdfsSetStoragePolicyMethod = fs.getClass().getDeclaredMethod("setStoragePolicy",
new Class<?>[] { Path.class, String.class });
hdfsSetStoragePolicyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
hdfsSetStoragePolicyMethod = null;
hdfsSetStoragePolicyReflectErrorMsg = "FileSystem doesn't support setStoragePolicy";
} catch (SecurityException e) {
hdfsSetStoragePolicyMethod = null;
hdfsSetStoragePolicyReflectErrorMsg = "No access to setStoragePolicy on FileSystem from the SecurityManager";
}
synchronized (stoppableLock) {
try {
this.transactionLeaseTimeout = config.getInt(LEASE_CONF, MINIMUM_LEASE_TIME);
if (this.transactionLeaseTimeout < MINIMUM_LEASE_TIME) {
if (LOG.isWarnEnabled()) LOG.warn("Transaction lease time: " + this.transactionLeaseTimeout + ", was less than the minimum lease time. Now setting the timeout to the minimum default value: " + MINIMUM_LEASE_TIME);
this.transactionLeaseTimeout = MINIMUM_LEASE_TIME;
}
this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(config,
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.scannerThreadWakeFrequency = config.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.cleanTimer = config.getInt(SLEEP_CONF, DEFAULT_SLEEP);
this.memoryUsageThreshold = config.getInt(MEMORY_THRESHOLD, DEFAULT_MEMORY_THRESHOLD);
this.memoryUsagePerformGC = config.getBoolean(MEMORY_PERFORM_GC, DEFAULT_MEMORY_PERFORM_GC);
this.asyncWal = config.getInt(CONF_ASYNC_WAL, DEFAULT_ASYNC_WAL);
this.skipWal = config.getBoolean(CONF_SKIP_WAL, DEFAULT_SKIP_WAL);
this.fullEditInCommit = config.getBoolean(CONF_COMMIT_EDIT, DEFAULT_COMMIT_EDIT);
this.useCommitIdInCells = config.getBoolean(CONF_TM_USE_COMMIT_ID_IN_CELLS, DEFAULT_TM_USE_COMMIT_ID_IN_CELLS);
m_regionName = this.regionInfo.getRegionNameAsString();
m_isTrafodionMetadata = m_regionName.contains("_MD_");
String skey = (Bytes.equals(this.regionInfo.getStartKey(), HConstants.EMPTY_START_ROW)) ? "skey=null" : ("skey=" + Hex.encodeHexString(regionInfo.getStartKey()));
String ekey = (Bytes.equals(this.regionInfo.getEndKey(), HConstants.EMPTY_END_ROW)) ? "ekey=null" : ("ekey=" + Hex.encodeHexString(regionInfo.getEndKey()));
m_regionDetails = new String(m_regionName + "," + skey + "," + ekey);
this.memoryUsageWarnOnly = config.getBoolean(MEMORY_WARN_ONLY, DEFAULT_MEMORY_WARN_ONLY);
this.memoryUsageTimer = config.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
this.checkRowBelongs = config.getBoolean(CHECK_ROW, true);
this.suppressOutOfOrderProtocolException = config.getBoolean(SUPPRESS_OOP, DEFAULT_SUPPRESS_OOP);
if (this.transactionLeases == null)
this.transactionLeases = new Leases(LEASE_CHECK_FREQUENCY);
//if (this.scannerLeases == null)
// this.scannerLeases = new Leases(scannerThreadWakeFrequency);
if (LOG.isTraceEnabled()) LOG.trace("Transaction lease time: "
+ this.transactionLeaseTimeout
+ " Scanner lease time: "
+ this.scannerThreadWakeFrequency
+ ", Scanner lease timeout period: "
+ this.scannerLeaseTimeoutPeriod
+ ", Clean timer: "
+ this.cleanTimer
+ ", MemoryUsage timer: "
+ this.memoryUsageTimer
+ ", MemoryUsageThreshold: "
+ this.memoryUsageThreshold
+ ", MemoryUsagePerformGC: "
+ this.memoryUsagePerformGC
+ ", MemoryUsageWarnOnly: "
+ this.memoryUsageWarnOnly
+ ", Suppress OutOfOrderProtocolExc: "
+ this.suppressOutOfOrderProtocolException);
// Start the clean core thread
this.cleanOldTransactionsThread = new CleanOldTransactionsChore(this, cleanTimer, stoppable);
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
this.txnChoreServiceThreadPoolSize =
tmp_env.getConfiguration().getInt("hbase.regionserver.region.transactional.chore_service_thread_pool_size",
DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE);
if (LOG.isTraceEnabled()) LOG.trace("Transactional chore thread pool size setting is " + this.txnChoreServiceThreadPoolSize);
if (this.cleanOldTransactionsThread != null) {
setupChoreService();
s_ChoreService.scheduleChore(this.cleanOldTransactionsThread);
}
#else
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e)
{
LOG.fatal("CleanOldTransactionChore uncaughtException: " + t.getName(), e);
}
};
String n = Thread.currentThread().getName();
ChoreThread = new Thread(this.cleanOldTransactionsThread);
Threads.setDaemonThreadRunning(ChoreThread, n + ".oldTransactionCleaner", handler);
#endif
// Start the memory usage chore thread if the threshold
// selected is greater than the default of 100%.
if (memoryUsageThreshold <= DEFAULT_MEMORY_THRESHOLD &&
memoryUsageThread == null) {
LOG.warn("start - starting memoryUsageThread");
memoryUsageThread = new MemoryUsageChore(this, memoryUsageTimer, stoppable2);
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
if (this.memoryUsageThread != null) {
s_ChoreService.scheduleChore(this.memoryUsageThread);
}
}
#else
UncaughtExceptionHandler handler2 = new UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e)
{
LOG.fatal("MemoryUsageChore uncaughtException: " + t.getName(), e);
}
};
String n2 = Thread.currentThread().getName();
ChoreThread2 = new Thread(memoryUsageThread);
Threads.setDaemonThreadRunning(ChoreThread2, n2 + ".memoryUsage", handler2);
}
#endif
if (TransactionalLeasesThread == null) {
TransactionalLeasesThread = new Thread(this.transactionLeases);
if (TransactionalLeasesThread != null) {
Threads.setDaemonThreadRunning(TransactionalLeasesThread, "Transactional leases");
}
}
/*
if (ScannerLeasesThread == null) {
ScannerLeasesThread = new Thread(this.scannerLeases);
if (ScannerLeasesThread != null) {
Threads.setDaemonThreadRunning(ScannerLeasesThread, "Scanner leases");
}
}
*/
} catch (Exception e) {
throw new CoprocessorException("start - Caught exception " + e);
}
}
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
RegionServerServices rss = tmp_env.getRegionServerServices();
tHLog = rss.getWAL(regionInfo);
ServerName sn = rss.getServerName();
lv_hostName = sn.getHostname();
lv_port = sn.getPort();
if (LOG.isTraceEnabled()) LOG.trace("hostname " + lv_hostName + " port " + lv_port);
#ifdef CDH5.7 APACHE1.2
this.t_Region = (HRegion)tmp_env.getRegion();
#else
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
#endif
zkw1 = rss.getZooKeeper();
this.configuredEarlyLogging = tmp_env.getConfiguration().getBoolean("hbase.regionserver.region.transactional.earlylogging", false);
if (LOG.isTraceEnabled()) LOG.trace("early logging setting is " + this.configuredEarlyLogging +
"\nget the reference from Region CoprocessorEnvironment ");
this.configuredConflictReinstate = tmp_env.getConfiguration().getBoolean("hbase.regionserver.region.transactional.conflictreinstate", false);
if (LOG.isTraceEnabled()) LOG.trace("conflict reinstate setting is " + this.configuredConflictReinstate +
"\nget the reference from Region CoprocessorEnvironment ");
if (tmp_env.getSharedData().isEmpty())
if (LOG.isTraceEnabled()) LOG.trace("shared map is empty ");
else
if (LOG.isTraceEnabled()) LOG.trace("shared map is NOT empty");
transactionsEPCPMap.put(regionInfo.getRegionNameAsString()+trxkeyEPCPinstance, this);
transactionsByIdTestz = TrxRegionObserver.getRefMap();
if (transactionsByIdTestz.isEmpty()) {
if (LOG.isTraceEnabled()) LOG.trace("reference map is empty ");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("reference map is NOT empty ");
}
if (LOG.isTraceEnabled()) LOG.trace("Region " + m_regionDetails + " check indoubt list from reference map ");
Map<Long, List<WALEdit>> indoubtTransactionsByIdCheck = (TreeMap<Long, List<WALEdit>>)transactionsByIdTestz.get(
regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeypendingTransactionsById);
if(indoubtTransactionsByIdCheck != null) {
this.indoubtTransactionsById = indoubtTransactionsByIdCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeypendingTransactionsById,
this.indoubtTransactionsById);
}
Map<Integer, Integer> indoubtTransactionsCountByTmidCheck = (TreeMap<Integer,Integer>)transactionsByIdTestz.get(
regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyindoubtTransactionsCountByTmid);
if(indoubtTransactionsCountByTmidCheck != null) {
this.indoubtTransactionsCountByTmid = indoubtTransactionsCountByTmidCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyindoubtTransactionsCountByTmid,
this.indoubtTransactionsCountByTmid);
}
Set<TrxTransactionState> commitPendingTransactionsCheck = (Set<TrxTransactionState>)transactionsByIdTestz.get(
regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeycommitPendingTransactions);
if(commitPendingTransactionsCheck != null) {
this.commitPendingTransactions = commitPendingTransactionsCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeycommitPendingTransactions,
this.commitPendingTransactions);
}
ConcurrentHashMap<String, TrxTransactionState> transactionsByIdCheck = (ConcurrentHashMap<String, TrxTransactionState>) transactionsByIdTestz.get(
regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeytransactionsById);
if(transactionsByIdCheck != null) {
this.transactionsById = transactionsByIdCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeytransactionsById,
this.transactionsById);
}
SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumberCheck = (SortedMap<Long, TrxTransactionState>)
transactionsByIdTestz
.get( regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeycommitedTransactionsBySequenceNumber);
if(commitedTransactionsBySequenceNumberCheck != null) {
this.commitedTransactionsBySequenceNumber = commitedTransactionsBySequenceNumberCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeycommitedTransactionsBySequenceNumber, this.commitedTransactionsBySequenceNumber);
}
AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsByIdTestz
.get(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar);
if(blockAllCheck != null) {
this.blockAll = blockAllCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar,
this.blockAll);
}
AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsByIdTestz
.get(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var);
if(blockNonPhase2Check != null) {
this.blockNonPhase2 = blockNonPhase2Check;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var,
this.blockNonPhase2);
}
AtomicBoolean newTransCheck = (AtomicBoolean)transactionsByIdTestz
.get(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar);
if(newTransCheck != null) {
this.blockNewTrans = newTransCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar,
this.blockNewTrans);
}
ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
(ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsByIdTestz
.get(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners);
if(scannersCheck != null) {
this.scanners = scannersCheck;
}
else {
transactionsByIdTestz.put(regionInfo.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners,
this.scanners);
}
// Set up the memoryBean from the ManagementFactory
if (memoryUsageThreshold < DEFAULT_MEMORY_THRESHOLD)
memoryBean = ManagementFactory.getMemoryMXBean();
LOG.info ("TRX Endpoint coprocessor, "
+ " regionDetails: " + m_regionDetails
+ ", isTrafodionMD: " + m_isTrafodionMetadata
+ ", starting with asyncWal: " + asyncWal
+ ", skipWal: " + skipWal
+ ", useCommitIdInCells: " + useCommitIdInCells
+ ", and fullEditInCommit: " + fullEditInCommit
+ ", Transaction lease time: " + this.transactionLeaseTimeout
+ ", Scanner lease time: " + this.scannerThreadWakeFrequency
+ ", Scanner lease timeout period: " + this.scannerLeaseTimeoutPeriod
+ ", Clean timer: " + this.cleanTimer
+ ", MemoryUsage timer: " + this.memoryUsageTimer
+ ", MemoryUsageThreshold: " + this.memoryUsageThreshold
+ ", MemoryUsagePerformGC: " + this.memoryUsagePerformGC
+ ", MemoryUsageWarnOnly: " + this.memoryUsageWarnOnly
+ ", Suppress OutOfOrderProtocolExc: " + this.suppressOutOfOrderProtocolException);
if (LOG.isTraceEnabled()) LOG.trace("start");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("stop ");
stoppable.stop("stop() TrxRegionEndpoint region: " + m_regionDetails);
transactionsEPCPMap.remove(regionInfo.getRegionNameAsString()+trxkeyEPCPinstance);
}
#ifdef CDH5.7 APACHE1.2
/**
* Determines if the specified row is within the row range specified by the
* specified HRegionInfo
*
* @param info HRegionInfo that specifies the row range
* @param row row to be checked
* @return true if the row is within the range specified by the HRegionInfo
*/
public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
return ((info.getStartKey().length == 0) ||
(Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
((info.getEndKey().length == 0) ||
(Bytes.compareTo(info.getEndKey(), row) > 0));
}
#endif
#ifdef HDP2.3 HDP2.4 CDH5.5
private synchronized void setupChoreService() {
if (s_ChoreService == null) {
s_ChoreService = new ChoreService("Cleanup ChoreService", TrxRegionEndpoint.txnChoreServiceThreadPoolSize);
}
}
#endif
#ifdef CDH5.7 APACHE1.2
private synchronized void setupChoreService() {
if (s_ChoreService == null) {
s_ChoreService = new ChoreService("Cleanup ChoreService", TrxRegionEndpoint.txnChoreServiceThreadPoolSize, true);
}
}
#endif
// Internal support methods
/**
* Checks if the region is closing and needs to block all activity
* @param long transactionId
* @return String
* @throws IOException
*/
private void checkBlockAll(final long transactionId) throws IOException {
if (blockAll.get()) {
if(LOG.isWarnEnabled()) LOG.warn("checkBlockAll - txId " + transactionId + ", No Transactional activity allowed.");
throw new IOException("closing region, no more transactional activity allowed. Region: " + m_regionDetails);
}
}
/**
* Checks if the region is closing and needs to block non phase 2 activity
* @param long transactionId
* @return String
* @throws IOException
*/
private void checkBlockNonPhase2(final long transactionId) throws IOException {
if (blockNonPhase2.get()) {
if(LOG.isWarnEnabled()) LOG.warn("checkBlockNonPhase2 - txId " + transactionId + ", No Transactional activity allowed.");
throw new IOException("closing region, no more non phase 2 transactional activity allowed. Region: " + m_regionDetails);
}
// sometimes we only set the most sever in which case we always need to check the higher up levels
checkBlockAll(transactionId);
}
/**
* Checks if new transactions are disabled
* @param long transactionId
* @return String
* @throws IOException
*/
private void checkBlockNewTrans(final long transactionId) throws IOException {
if (blockNewTrans.get()) {
if(LOG.isWarnEnabled()) LOG.warn("checkNewTrans - txId " + transactionId + ", No more new transactions allowed.");
throw new IOException("closing region, no more new transactions allowed. Region: " + m_regionDetails);
}
// sometimes we only set the most sever in which case we always need to check the higher up levels
checkBlockNonPhase2(transactionId);
}
/**
* Gets the transaction state
* @param long transactionId
* @return TrxTransactionState
* @throws UnknownTransactionException
*/
protected TrxTransactionState getTransactionState(final long transactionId)
throws UnknownTransactionException {
TrxTransactionState state = null;
boolean throwUTE = false;
String key = getTransactionalUniqueId(transactionId);
state = transactionsById.get(key);
if (state == null)
{
if (LOG.isTraceEnabled()) LOG.trace("getTransactionState Unknown transaction: [" + transactionId + "], throwing UnknownTransactionException");
throwUTE = true;
}
else {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionState Found transaction: [" + transactionId + "]");
try {
transactionLeases.renewLease(key);
} catch (LeaseException e) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionState renewLease failed will try to createLease for transaction: [" + transactionId + "]");
try {
transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(transactionId));
} catch (LeaseStillHeldException lshe) {
if (LOG.isTraceEnabled()) LOG.trace("getTransactionState renewLeasefollowed by createLease failed throwing original LeaseException for transaction: [" + transactionId + "]");
throw new RuntimeException(e);
}
}
}
if (throwUTE)
throw new UnknownTransactionException();
return state;
}
/**
* Retires the transaction
* @param TrxTransactionState state
*/
private void retireTransaction(final TrxTransactionState state, final boolean clear) {
long transId = state.getTransactionId();
String key = getTransactionalUniqueId(transId);
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction: [" + state + "]");
try {
transactionLeases.cancelLease(key);
} catch (LeaseException le) {
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction: ["
+ transId + "] LeaseException");
// Ignore
} catch (Exception e) {
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction: ["
+ transId + "] General Lease exception" + e.getMessage() + " " + stackTraceToString(e));
// Ignore
}
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction clearTransactionsToCheck for region: " + m_regionDetails + " from all its TrxTransactionState lists");
if (state.prepareEditSize != state.getEdit().getCells().size() &&
(state.prepareEditSize != 0 ||
(! state.getStatus().equals(Status.ABORTED)))){
if (LOG.isWarnEnabled()) LOG.warn("retireTransaction prepareEditSize (" + state.prepareEditSize
+ ") is different from edit cells size (" + state.getEdit().getCells().size() + ") for: " + state);
}
// Clear out transaction state
if (clear == true) {
state.clearState();
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction clearState for region: " + m_regionDetails + " from all its TrxTransactionState lists");
}
else {
state.clearTransactionsToCheck();
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction clearTransactionsToCheck for : " + m_regionDetails);
}
synchronized (cleanScannersForTransactions) {
cleanScannersForTransactions.add(transId);
}
synchronized (commitPendingTransactions) {
if (commitPendingTransactions.remove(state)) {
LOG.info("TrxRegionEndpoint coprocessor:retireTransaction: Detected and removed read only branch in commitPendingList: " + state
+ " Region: " + m_regionDetails);
}
}
if (LOG.isTraceEnabled()) LOG.trace("retireTransaction calling remove entry for: " + key + " , from transactionById map ");
transactionsById.remove(key);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:retireTransaction " + key + ", looking for retire transaction id " + transId + ", transactionsById " + transactionsById.size() + ", commitedTransactionsBySequenceNumber " + commitedTransactionsBySequenceNumber.size() + ", commitPendingTransactions " + commitPendingTransactions.size());
}
public void choreThreadDetectStaleTransactionBranch() {
synchronized(choreDetectStaleBranchLock) {
List<Integer> staleBranchforTMId = new ArrayList<Integer>();
List<TrxTransactionState> commitPendingCopy = new ArrayList<TrxTransactionState>(commitPendingTransactions);
Map<Long, List<WALEdit>> indoubtTransactionsMap = null;
synchronized(indoubtTransactionsById){
indoubtTransactionsMap = new TreeMap<Long, List<WALEdit>>(indoubtTransactionsById);
}
int tmid, clusterid, tm;
// selected printout for CP
long currentEpoch = controlPointEpoch.get();
if ((currentEpoch < 10) || ((currentEpoch % 20) == 1)) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: Region "
+ m_regionDetails + " ChoreThread CP Epoch " + controlPointEpoch.get());
}
byte [] lv_byte_region_info = regionInfo.toByteArray();
String lv_encoded = regionInfo.getEncodedName();
long transactionId = 0;
if ((indoubtTransactionsById != null) && (indoubtTransactionsById.size() > 0) && (this.regionState != REGION_STATE_START)) {
for (Entry<Long, List<WALEdit>> entry : indoubtTransactionsMap.entrySet()) {
try {
transactionId = entry.getKey();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: indoubt branch Txn id "
+ transactionId + " region info bytes " + new String(lv_byte_region_info));
tmid = (int) TransactionState.getNodeId(transactionId);
LOG.warn("Traf Reco Thread detect stale branch tid " + transactionId + " node " + tmid);
if (!staleBranchforTMId.contains(tmid)) {staleBranchforTMId.add(tmid);}
} catch (Exception xe1) {
LOG.warn("An exception when chore thread detects a stale in-doubt reinstated transaction " + transactionId + " skip ... ");
} // catch
} // for loop
}
else { // region has started
for (TrxTransactionState commitPendingTS : commitPendingCopy) {
try {
if (commitPendingTS.getCPEpoch() < (controlPointEpoch.get() - 1)) {
transactionId = commitPendingTS.getTransactionId();
if (commitPendingTS.getIsRegionTx()){
LOG.warn("Traf Reco Thread detect stale regionTx " + commitPendingTS);
}
else{
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: stale branch Txn id "
+ transactionId + " region info bytes " + new String(lv_byte_region_info));
tmid = (int) commitPendingTS.getNodeId();
LOG.warn("Traf Reco Thread detect stale branch tid " + transactionId + " node " + tmid);
if (!staleBranchforTMId.contains(tmid)) {staleBranchforTMId.add(tmid);}
}
}
} catch (Exception xe1) {
LOG.warn("An exception when chore thread detects a stale in-doubt normal transaction " + transactionId + " skip ... ");
} // catch
} // loop
}
if (!staleBranchforTMId.isEmpty()) {
for (int i = 0; i < staleBranchforTMId.size(); i++) {
try {
tm = staleBranchforTMId.get(i);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: ZKW Create Recovery zNode TM "
+ tm + " region encoded name " + lv_encoded + " region info bytes " + new String(lv_byte_region_info));
createRecoveryzNode(tm, lv_encoded, lv_byte_region_info);
} catch (IOException exp) {
LOG.error("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: ZKW Create recovery zNode failed ", exp);
}
} // for
} // if block
controlPointEpoch.getAndIncrement();
commitPendingCopy.clear();
indoubtTransactionsMap.clear();
staleBranchforTMId.clear();
} //synchronized
}
public void createRecoveryzNode(int node, String encodedName, byte [] data) throws IOException {
synchronized(zkRecoveryCheckLock) {
// default zNodePath for recovery
String zNodeKey = lv_hostName + "," + lv_port + "," + encodedName;
StringBuilder sb = new StringBuilder();
sb.append("TM");
sb.append(node);
String str = sb.toString();
String zNodePathTM = zNodePath + str;
String zNodePathTMKey = zNodePathTM + "/" + zNodeKey;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Post region recovery znode" + node + " zNode Path " + zNodePathTMKey);
// create zookeeper recovery zNode, call ZK ...
try {
if (ZKUtil.checkExists(zkw1, zNodePathTM) == -1) {
// create parent nodename
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW create parent zNodes " + zNodePathTM);
ZKUtil.createWithParents(zkw1, zNodePathTM);
}
ZKUtil.createAndFailSilent(zkw1, zNodePathTMKey, data);
} catch (KeeperException e) {
throw new IOException("Trafodion Recovery Region Observer CP: ZKW Unable to create recovery zNode to TM, throw IOException " + node, e);
}
}
} // end of createRecoveryzNode
public void deleteRecoveryzNode(int node, String encodedName) throws IOException {
synchronized(zkRecoveryCheckLock) {
// default zNodePath
String zNodeKey = lv_hostName + "," + lv_port + "," + encodedName;
StringBuilder sb = new StringBuilder();
sb.append("TM");
sb.append(node);
String str = sb.toString();
String zNodePathTM = zNodePath + str;
String zNodePathTMKey = zNodePathTM + "/" + zNodeKey;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: ZKW Delete region recovery znode" + node + " zNode Path " + zNodePathTMKey);
// delete zookeeper recovery zNode, call ZK ...
try {
ZKUtil.deleteNodeFailSilent(zkw1, zNodePathTMKey);
} catch (KeeperException e) {
throw new IOException("Trafodion Recovery Region Observer CP: ZKW Unable to delete recovery zNode to TM " + node, e);
}
}
} // end of deleteRecoveryzNode
/**
* Starts the region after a recovery
*/
public void startRegionAfterRecovery() throws IOException {
boolean isFlush = false;
try {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Flushing cache in startRegionAfterRecovery " + m_regionDetails);
#ifdef HDP2.3 HDP2.4 APACHE1.1 CDH5.7 APACHE1.2
m_Region.flush(true);
#else
m_Region.flushcache();
#endif
//if (!m_Region.flushcache().isFlushSucceeded()) {
// LOG.trace("Trafodion Recovery: Flushcache returns false !!! " + m_regionDetails);
//}
} catch (IOException e) {
LOG.error("Trafodion Recovery: Flush failed after replay edits" + m_regionDetails + ", Caught exception ", e);
return;
}
//FileSystem fileSystem = m_Region.getFilesystem();
//Path archiveTHLog = new Path (recoveryTrxPath.getParent(),"archivethlogfile.log");
//if (fileSystem.exists(archiveTHLog)) fileSystem.delete(archiveTHLog, true);
//if (fileSystem.exists(recoveryTrxPath))fileSystem.rename(recoveryTrxPath,archiveTHLog);
if (indoubtTransactionsById != null)
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + recoveryTrxPath + " has " + indoubtTransactionsById.size() + " in-doubt transactions and edits are archived.");
else
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + recoveryTrxPath + " has 0 in-doubt transactions and edits are archived.");
regionState = REGION_STATE_START;
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + m_regionDetails + " is STARTED.");
}
public static void WALSync(WAL wal, long transactionId, long txid) throws IOException
{
try {
if (txid == 0)
wal.sync();
else
wal.sync(txid);
} catch (IOException wale) {
wale.fillInStackTrace();
LOG.error("commitRequest txId: " + transactionId + " HLog seq " + txid + " Caught IOException in HLOG sync ", wale );
try {
Thread.sleep(1000); ///1000 milliseconds is one second.
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
throw wale;
}
}
/**
* Commits the transaction
* @param TrxTransactionState state
* @throws IOException
*/
private void commit(final TrxTransactionState state) throws IOException {
long txid = 0;
WALEdit b = null;
int num = 0;
Tag commitTag;
ArrayList<WALEdit> editList;
long transactionId = state.getTransactionId();
long commitId = state.getCommitId();
long startId = state.getStartId();
int size = state.writeSize();
#ifdef CDH5.7 APACHE1.2
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long mvccNum = 0;
#endif
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit -"
+ " txId " + transactionId
+ " commitId " + commitId
+ " startId " + startId
+ ", region " + m_regionDetails
+ ", transactionsById " + transactionsById.size()
+ ", commitedTransactionsBySequenceNumber " + commitedTransactionsBySequenceNumber.size()
+ ", commitPendingTransactions " + commitPendingTransactions.size()
);
if (state.isReinstated() && !this.configuredConflictReinstate) {
if (LOG.isTraceEnabled()) LOG.trace("commit"
+ " Trafodion Recovery: commit reinstated indoubt transaction " + transactionId
+ " with commitId " + commitId
+ " in region " + m_regionDetails
);
synchronized (indoubtTransactionsById) {
editList = (ArrayList<WALEdit>) indoubtTransactionsById.get(transactionId);
}
num = editList.size();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commit -"
+ " txId " + transactionId
+ " commitId " + commitId
+ ", region " + m_regionDetails
+ ", Redrive commit with number of edit kvs list size " + num
);
if (size > 0){
try {
int iPut = 0;
int iDelete = 0;
for ( int i = 0; i < num; i++){
b = editList.get(i);
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId" + transactionId
+ " commitId " + commitId
+ ", Writing " + b.size() + " updates for reinstated transaction"
);
for (Cell kv : b.getCells()) {
synchronized (editReplay) {
Put put;
Delete del;
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit -"
+ " txId " + transactionId + " startId " + startId
+ ", Trafodion Recovery: region " + m_regionDetails
+ ", Replay commit for transaction with Op " + kv.getTypeByte()
);
if (kv.getTypeByte() == KeyValue.Type.Put.getCode()) {
if (this.useCommitIdInCells == false) {
put = new Put(CellUtil.cloneRow(kv), kv.getTimestamp()); // kv.getRow()
put.add(CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
kv.getTimestamp(),
CellUtil.cloneValue(kv));
}
else {
if (LOG.isTraceEnabled()) LOG.trace("commit - creating new put with commitId " + commitId);
put = new Put(CellUtil.cloneRow(kv), commitId);
put.add(CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
commitId,
CellUtil.cloneValue(kv));
}
//state.addWrite(put); // no need to add since add has been done in constructInDoubtTransactions
try {
if (LOG.isTraceEnabled()) LOG.trace("commit - txId "
+ transactionId + " with commitId " + commitId
+ ", Trafodion Recovery executing put index " + i + " durability "
+ put.getDurability().toString() + " directly to region "
+ m_regionDetails);
m_Region.put(put);
}
catch (Exception e) {
LOG.warn("commit -"
+ " txId " + transactionId + " commitId " + commitId
+ ", Trafodion Recovery: Executing put caught an exception ", e);
throw new IOException(e.toString());
}
} else if (CellUtil.isDelete(kv)) {
if (this.useCommitIdInCells == false) {
del = new Delete(CellUtil.cloneRow(kv), kv.getTimestamp());
if (CellUtil.isDeleteFamily(kv)) {
del.deleteFamily(CellUtil.cloneFamily(kv), kv.getTimestamp());
} else if (CellUtil.isDeleteType(kv)) {
del.deleteColumn(CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv), kv.getTimestamp());
}
}
else {
if (LOG.isTraceEnabled()) LOG.trace("commit - creating new delete with commitId " + commitId);
del = new Delete(CellUtil.cloneRow(kv), commitId);
if (CellUtil.isDeleteFamily(kv)) {
del.deleteFamily(CellUtil.cloneFamily(kv), commitId);
} else if (CellUtil.isDeleteType(kv)) {
del.deleteColumn(CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv), commitId);
}
}
//state.addDelete(del); // no need to add since add has been done in constructInDoubtTransactions
try {
m_Region.delete(del);
}
catch (Exception e) {
LOG.warn("commit -"
+ " txId " + transactionId + " commitId " + commitId
+ ", Trafodion Recovery: Executing delete caught an exception ", e);
throw new IOException(e.toString());
}
}
} // synchronized reply edits
} // for WALEdit
} // for ediList
} catch(IOException e) {
LOG.error("commit - exception: ", e);
}
} // size > 0
} // an in-doubt reinstated transaction, but no ts constructed
else { // either non-reinstated transaction, or reinstate transaction with conflict reinstate TRUE (write from TS write ordering)
// If useCommitIdInCells is true, we use the commitId as the timestamp, otherwise
// we perform write operations timestamped to right now
// maybe we can turn off WAL here for HLOG since THLOG has contained required edits in phase 1
ListIterator<WriteAction> writeOrderIter = null;
if (size > 0){
try {
int writeOrderIndex = 0;
for (writeOrderIter = state.getWriteOrderingIter();
writeOrderIter.hasNext();) {
WriteAction action =(WriteAction) writeOrderIter.next();
Put put = action.getPut();
if (null != put) {
// Process Put
if (this.useCommitIdInCells == true) {
// We are using the startId and commitId, so update the put timestamp using the commitId
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit - updating put for"
+ " txId " + transactionId
+ " using commitId " + commitId
+ " writeOrderIndex " + writeOrderIndex
+ ", Trafodion region: " + m_regionDetails);
byte[] rowkey = put.getRow();
Put updatedPut = new Put(rowkey, commitId);
NavigableMap<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
if (cell.getTimestamp() == (startId - 1)) {
// startId -1 indicates this cell has been subsequently updated in the same transaction
// so we do NOT want to put it into the region, otherwise the update will get lost since
// the region will discard a second cell with the same timestamp and MVCC version
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit - Skipping stale Cell found in put for"
+ " txId " + transactionId
+ " using commitId " + commitId
+ " writeOrderIndex " + writeOrderIndex
+ ", Trafodion region: " + m_regionDetails);
continue;
}
// This is a cell we want to add to the put, so add the cell with the correct commitID
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
updatedPut.add(family,qualifier,commitId,value);
}
}
put = updatedPut;
}
if (this.skipWal){
put.setDurability(Durability.SKIP_WAL);
}
else {
if (this.asyncWal != 0) {
put.setDurability(Durability.ASYNC_WAL);
if ((writeOrderIndex == 0) && ((this.asyncWal == 3) || (this.asyncWal == 4))) {
put.setDurability(Durability.SYNC_WAL);
}
if ((writeOrderIndex == (size - 1)) && ((this.asyncWal == 2) || (this.asyncWal == 4))) {
put.setDurability(Durability.SYNC_WAL);
}
}
}
if (LOG.isTraceEnabled()) LOG.trace("commit - txId "
+ transactionId + " with commitId " + commitId
+ ", Executing put writeOrderIndex " + writeOrderIndex + " durability "
+ put.getDurability().toString() + " directly to region "
+ m_regionDetails
);
try {
m_Region.put(put);
}
catch (Exception e) {
LOG.warn("commit -" + " txId " + transactionId + " with commitId " + commitId
+ ", Executing put caught an exception ",e);
throw new IOException("commit -" + " txId " + transactionId + " with commitId " + commitId
+ ", Executing put caught an exception " + e);
}
}
// Process Delete
Delete delete = action.getDelete();
if (null != delete){
if (this.useCommitIdInCells == true) {
// We are using the startId and commitId, so clone the delete
// and use the commitId as the timestamp
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit - cloning delete for"
+ " txId " + transactionId
+ " with commitId " + commitId
+ ", Trafodion Recovery: region " + m_regionDetails);
byte[] deleteKey = delete.getRow();
Delete updatedDelete = new Delete(deleteKey, commitId);
delete = updatedDelete;
// state.unconditionalUpdateLatestTimestamp(delete.getFamilyCellMap().values(), commitId);
}
if (this.skipWal){
delete.setDurability(Durability.SKIP_WAL);
}
else {
if (this.asyncWal != 0) {
delete.setDurability(Durability.ASYNC_WAL);
if ((writeOrderIndex == 0) && ((this.asyncWal == 3) || (this.asyncWal == 4))) {
delete.setDurability(Durability.SYNC_WAL);
}
if ((writeOrderIndex == (size - 1)) && ((this.asyncWal == 2) || (this.asyncWal == 4))) {
delete.setDurability(Durability.SYNC_WAL);
}
}
}
if (LOG.isTraceEnabled()) LOG.trace("commit - txId "
+ transactionId + " commitId " + commitId
+ ", Executing delete writeOrderIndex " + writeOrderIndex + " durability "
+ delete.getDurability().toString() + " directly to region "
+ m_regionDetails);
try {
m_Region.delete(delete);
}
catch (Exception e) {
LOG.error("commit -" + " txId " + transactionId + " with commitId " + commitId
+ ", Executing delete caught an exception ", e);
throw new IOException("commit -" + " txId " + transactionId + " with commitId " + commitId
+ ", Executing delete caught an exception " + e.toString());
}
}
writeOrderIndex++;
}
} catch(IOException e) {
LOG.fatal("Unable to write mutation capture ", e);
throw new RuntimeException(e);
}
} // size > 0
} // normal transactions
if (size > 0){
if (state.endEditSize != 0){
if (LOG.isInfoEnabled()) LOG.info("commit duplicate request for: " + state);
}
else {
state.endEditSize = 1;
}
// Now write a commit edit to HLOG
List<Tag> tagList = new ArrayList<Tag>();
if (state.hasWrite() || state.isReinstated()) {
if (!state.getFullEditInCommit()) {
if (state.getIsRegionTx()){
commitTag = state.formTransactionalContextTag(TS_REGION_TX_COMMIT, commitId);
}
else {
commitTag = state.formTransactionalContextTag(TS_COMMIT, commitId);
}
tagList.add(commitTag);
WALEdit e1 = state.getEdit();
WALEdit e = new WALEdit();
if (e1.isEmpty() || e1.getCells().size() <= 0) {
if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV endpoint CP: commit - txId "
+ transactionId + " regionTx: " + state.getIsRegionTx()
+ ", Encountered empty TS WAL Edit list during commit, HLog txid "
+ txid);
}
else {
Cell c = e1.getCells().get(0);
KeyValue kv = new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), tagList);
e.add(kv);
try {
long now;
if (this.useCommitIdInCells == false) {
now = EnvironmentEdgeManager.currentTime();
}
else {
now = commitId;
}
#ifdef CDH5.7 APACHE1.2
WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
WALKey.NO_SEQUENCE_ID,
now,
WALKey.EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
this.t_Region.getMVCC());
txid = this.tHLog.append(this.m_Region.getTableDesc(), this.regionInfo, wk, e, false);
writeEntry = wk.getWriteEntry();
mvccNum = writeEntry.getWriteNumber();
#else
final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
now);
AtomicLong lv_seqid = this.m_Region.getSequenceId();
txid = this.tHLog.append(this.m_Region.getTableDesc(),
this.regionInfo,
wk ,
e,
lv_seqid,
false,
null);
#endif
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId " + transactionId + " with commitId " + commitId
+ " regionTx: " + state.getIsRegionTx()
+ ", Write commit HLOG seq " + txid);
}
catch (IOException exp1) {
LOG.error("TRAF RCOV endpoint CP: commit -" + " txId " + transactionId + " with commitId " + commitId
+ " regionTx: " + state.getIsRegionTx() + ", Writing to HLOG : Threw an exception ", exp1);
throw exp1;
}
#ifdef CDH5.7 APACHE1.2
finally {
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
} // finally
#endif
} // e1 is not empty
} // not full edit write in commit record during phase 2
else { //do this for rollover case
if (LOG.isTraceEnabled()) LOG.trace("TRAF RCOV endpoint CP:commit -- HLOG rollover txId: "
+ transactionId + " regionTx: " + state.getIsRegionTx());
commitTag = state.formTransactionalContextTag(TS_CONTROL_POINT_COMMIT, commitId);
tagList.add(commitTag);
WALEdit e1 = state.getEdit();
WALEdit e = new WALEdit();
for (Cell c : e1.getCells()) {
KeyValue kv = new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), tagList);
e.add(kv);
}
try {
long now;
if (this.useCommitIdInCells == false) {
now = EnvironmentEdgeManager.currentTime();
}
else {
now = commitId;
}
#ifdef CDH5.7 APACHE1.2
WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
WALKey.NO_SEQUENCE_ID,
now,
WALKey.EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
this.t_Region.getMVCC());
txid = this.tHLog.append(this.m_Region.getTableDesc(), this.regionInfo, wk, e, false);
writeEntry = wk.getWriteEntry();
mvccNum = writeEntry.getWriteNumber();
#else
final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
now);
txid = this.tHLog.append(this.m_Region.getTableDesc(),
this.regionInfo,
wk ,
e,
this.m_Region.getSequenceId(),
false,
null);
#endif
WALSync(tHLog, transactionId, txid);
#ifdef CDH5.7 APACHE1.2
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
#endif
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId " + transactionId + " with commitId " + commitId
+ " regionTx: " + state.getIsRegionTx()
+ ", Y11 write commit HLOG seq " + txid);
}
catch (IOException exp1) {
LOG.error("TRAF RCOV endpoint CP: commit -" + " txId " + transactionId + " with commitId " + commitId
+ " regionTx: " + state.getIsRegionTx() + ", Writing to HLOG : Threw an exception ", exp1);
throw exp1;
}
#ifdef CDH5.7 APACHE1.2
finally {
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
} // finally
#endif
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commit -- EXIT"
+ " txId: " + transactionId + " with commitId " + commitId
+ " regionTx: " + state.getIsRegionTx()
+ " HLog seq " + txid);
// if (this.fullEditInCommit) this.fullEditInCommit = false;
} // else -- full edit write in commit record during phase 2
} // write or reinstated
} // size > 0
if (state.hasWrite()) {
state.setStatus(Status.COMMITED);
}
else {
state.setStatus(Status.COMMIT_READONLY);
}
if (state.hasWrite() || state.isReinstated() || state.getNeverReadOnly()) {
synchronized (commitPendingTransactions) {
if (!commitPendingTransactions.remove(state)) {
LOG.fatal("commit -" + " txid: " + transactionId + "with startId " + startId
+ " regionTx: " + state.getIsRegionTx() + ", Commiting a non-query transaction that is not in commitPendingTransactions");
// synchronized statements are cleared for a throw
throw new IOException("commit failure "+ " txid: " + transactionId + "with startId " + startId
+ " regionTx: " + state.getIsRegionTx() + " is not in commitPendingTransactions");
}
}
}
else {
if(LOG.isDebugEnabled()) LOG.debug("Commit: Did not delete ts " + state.getTransactionId() + " commitPendingTransactions " + state.hasWrite() + " size " + state.writeSize()
+ " reinstated " + state.isReinstated() + "Region : " + m_regionDetails);
}
if (LOG.isDebugEnabled()) LOG.debug("commit(tstate) -- EXIT TrxTransactionState: " +
state.toString());
if (state.isReinstated()) {
synchronized(indoubtTransactionsById) {
indoubtTransactionsById.remove(state.getTransactionId());
int tmid = (int) state.getNodeId();
int count = 0;
if (indoubtTransactionsCountByTmid.containsKey(tmid)) {
count = (int) indoubtTransactionsCountByTmid.get(tmid) - 1;
if (count > 0) indoubtTransactionsCountByTmid.put(tmid, count);
}
if (count == 0) {
indoubtTransactionsCountByTmid.remove(tmid);
try {
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId " + transactionId + " with startId " + startId
+ ", Trafodion Recovery: delete in commit recovery zNode TM " + tmid
+ " region " + m_regionDetails + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmid, regionInfo.getEncodedName());
} catch (IOException e) {
LOG.error("commit -" + " txId " + transactionId + " with startId " + startId
+ ", Trafodion Recovery: delete recovery zNode failed. Caught exception ", e);
}
}
if ((indoubtTransactionsById == null) || (indoubtTransactionsById.size() == 0)) {
if (indoubtTransactionsById == null)
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId " + transactionId + " with startId " + startId
+ ", Trafodion Recovery: start region in commit with indoubtTransactionsById null");
else
if (LOG.isTraceEnabled()) LOG.trace("commit -"
+ " txId " + transactionId + " with startId " + startId
+ ", Trafodion Recovery: start region in commit with indoubtTransactionsById size "
+ indoubtTransactionsById.size());
startRegionAfterRecovery();
}
}
}
state.setCommitProgress(CommitProgress.COMMITED);
retireTransaction(state, false);
}
/**
* Resolves the transaction from the log
* @param TrxTransactionState transactionState
* @throws IOException
*/
private void resolveTransactionFromLog(
final TrxTransactionState transactionState) throws IOException {
LOG.error("Global transaction log is not Implemented. (Optimisticly) assuming transaction commit!");
commit(transactionState);
}
/**
* TransactionLeaseListener
*/
private class TransactionLeaseListener implements LeaseListener {
//private final long transactionName;
private final String transactionName;
TransactionLeaseListener(final long n) {
this.transactionName = getTransactionalUniqueId(n);
}
public void leaseExpired() {
long transactionId = 0L;
if (LOG.isInfoEnabled()) LOG.info("leaseExpired Transaction [" + this.transactionName
+ "] expired in region [" + m_regionName + "]");
TrxTransactionState s = null;
synchronized (transactionsById) {
s = transactionsById.remove(transactionName);
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired Removing transaction: " + this.transactionName + " from list");
}
if (s == null) {
LOG.warn("leaseExpired Unknown transaction expired " + this.transactionName);
return;
}
transactionId = s.getTransactionId();
switch (s.getStatus()) {
case PENDING:
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired transaction " + transactionId + " was PENDING, calling retireTransaction");
s.setStatus(Status.ABORTED);
retireTransaction(s, true);
break;
case COMMIT_PENDING:
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired Transaction " + transactionId
+ " expired in COMMIT_PENDING state");
String key = getTransactionalUniqueId(transactionId);
try {
if (s.getCommitPendingWaits() > MAX_COMMIT_PENDING_WAITS) {
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired Checking transaction status in transaction log");
resolveTransactionFromLog(s);
break;
}
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired renewing lease and hoping for commit");
s.incrementCommitPendingWaits();
synchronized (transactionsById) {
transactionsById.put(key, s);
if (LOG.isTraceEnabled()) LOG.trace("leaseExpired Adding transaction: " + transactionId + " to list");
}
try {
transactionLeases.createLease(key, transactionLeaseTimeout, this);
} catch (LeaseStillHeldException e) {
transactionLeases.renewLease(key);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
break;
default:
LOG.warn("leaseExpired Unexpected status on expired lease");
}
}
}
/**
* Processes a delete using a regional transaction
* @param long tid
* @param Delete delete
* @param Boolean autoCommit
* @throws IOException
*/
public void deleteRegionTx(final long tid, final long startId, final long commitId,
final Delete delete, final Boolean autoCommit)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx -- ENTRY tid: " + tid);
checkBlockNonPhase2(tid);
// if(this.checkRowBelongs){
// if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx -- checkRow tid: " + tid);
// checkRow(delete.getRow(), "DeleteRegionTx");
// }
TrxTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
state.addDelete(delete, this.useCommitIdInCells);
// Now we must perform conflict checking.
boolean success = commitIfPossible(tid, tid, tid, 1, true, autoCommit);
if (! success) {
LOG.error("deleteRegionTx - tid " + tid + " was not successful. Throwing IOException ");
throw new IOException("deleteRegionTx - tid " + tid + " was not successful.");
}
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx -- EXIT tid: " + tid);
}
/**
* Processes a transactional delete
* @param long transactionId
* @param Delete delete
* @throws IOException
*/
public void delete(final long transactionId, final long startId, final Delete delete)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("delete -- ENTRY txId: " + transactionId);
checkBlockNonPhase2(transactionId);
// if(this.checkRowBelongs)
// checkRow(delete.getRow(), "Delete");
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
throw new IOException("delete late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
if (this.useCommitIdInCells == true) {
//clone the delete to change the timestamp.
byte[] rowkey = delete.getRow();
Delete newDelete = new Delete( rowkey,startId );
NavigableMap<byte[], List<Cell>> familyCellMap = delete.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
newDelete.deleteColumns(family,qualifier,startId);
//NOTE: HBase 1.0 this API will change ...
//here use deleteColumns with timestamp, so it will delete all history version of this row
}
}
state.addDelete(newDelete, this.useCommitIdInCells);
}
else {
// Just add the existing delete
state.addDelete(delete, this.useCommitIdInCells);
}
}
/**
* Processes multiple transactional deletes
* @param long transactionId
* @param Delete[] deletes
* @throws IOException
*/
public synchronized void delete(final long transactionId, final long startId, Delete[] deletes)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter deletes[], txid: " + transactionId);
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
throw new IOException("delete[] late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
for (Delete del : deletes) {
// if(this.checkRowBelongs)
// checkRow(del.getRow(), "Delete");
if (this.useCommitIdInCells == true) {
//clone the delete to change the timestamp.
byte[] rowkey = del.getRow();
Delete newDelete = new Delete( rowkey,startId );
NavigableMap<byte[], List<Cell>> familyCellMap = del.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
newDelete.deleteColumns(family,qualifier,startId);
//NOTE: HBase 1.0 this API will change ...
//here use deleteColumns with timestamp, so it will delete all history version of this row
}
}
state.addDelete(newDelete, this.useCommitIdInCells);
}
else {
// Just add the existing delete
state.addDelete(del, this.useCommitIdInCells);
}
}
}
/**
* Processes a checkAndDelete operation using a region transaction
* @param long tid
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Delete delete
* @return boolean
* @throws IOException
*/
public boolean checkAndDeleteRegionTx(final long tid, final long startId,
final long commitId,
final byte[] row, byte[] family,
final byte[] qualifier, final byte[] value,
final Delete delete,
final boolean autoCommit) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter checkAndDeleteRegionTx, tid: "
+ tid + ", on HRegion " + this);
TrxTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
boolean result = false;
byte[] rsValue = null;
byte[] startKey = null;
byte[] endKey = null;
// if(this.checkRowBelongs)
// checkRow(row, "checkAndDelete");
try {
Get get = new Get(row);
get.addColumn(family, qualifier);
Result rs = this.get(tid, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
if (rs.isEmpty() && valueIsNull) {
this.deleteRegionTx(tid, startId, commitId, delete, autoCommit);
result = true;
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
this.deleteRegionTx(tid, startId, commitId, delete, autoCommit);
result = true;
}
else
result = false;
} else if ((!rs.isEmpty())
&& !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
this.deleteRegionTx(tid, startId, commitId, delete, autoCommit);
result = true;
} else {
result = false;
}
} catch (Exception e) {
LOG.error("checkAndDeleteRegionTx - tid " + tid + ", Caught internal exception returning false ", e);
throw new IOException("checkAndDeleteRegionTx - " + e.toString());
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDeleteRegionTx EXIT - returns " + result + ", tid " + tid
+ ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row));
return result;
}
/**
* Processes a transactional checkAndDelete
* @param long transactionId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Delete delete
* @return boolean
* @throws IOException
*/
public boolean checkAndDelete(final long transactionId,
final long startId,
byte[] row, byte[] family,
byte[] qualifier, byte[] value, Delete delete)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter checkAndDelete, txid: "
+ transactionId + ", on HRegion " + this);
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
throw new IOException("checkAndDelete late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
boolean result = false;
byte[] rsValue = null;
byte[] startKey = null;
byte[] endKey = null;
// if(this.checkRowBelongs)
// checkRow(row, "checkAndDelete");
try {
Get get = new Get(row);
get.addColumn(family, qualifier);
// this will call getScanner (and then add a state scan range)
Result rs = this.get(transactionId, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
if (rs.isEmpty() && valueIsNull) {
this.delete(transactionId, startId, delete);
result = true;
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
this.delete(transactionId, startId, delete);
result = true;
}
else
result = false;
} else if ((!rs.isEmpty())
&& !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
this.delete(transactionId, startId, delete);
result = true;
} else {
result = false;
}
} catch (Exception e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndDelete - txid " + transactionId + ", Caught internal exception ", e);
throw new IOException("checkAndDelete - " + e.toString());
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete EXIT - returns " + result + ", transId " + transactionId + ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row));
return result;
}
/**
* Processes a transactional checkAndPut
* @param long transactionId
* @param long startId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Put put
* @return boolean
* @throws IOException
*/
public boolean checkAndPut(final long transactionId, final long startId, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Put put)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter checkAndPut, txid: "
+ transactionId + ", on HRegion " + m_regionDetails);
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
throw new IOException("checkAndPut late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
boolean result = false;
byte[] rsValue = null;
byte[] startKey = null;
byte[] endKey = null;
// if(this.checkRowBelongs)
// checkRow(row, "checkAndPut");
try {
Get get = new Get(row);
get.addColumn(family, qualifier);
// this will call getScanner (and then add a state scan range), and later put will call addWrite and then addRead
// would also add a scan range, maybe leave it now and optimize later
Result rs = this.get(transactionId, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
if (rs.isEmpty() && valueIsNull) {
this.put(transactionId, startId, put);
result = true;
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
this.put(transactionId, startId, put);
result = true;
}
else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - txid " + transactionId + ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + ", first check setting result to false in region " + m_regionDetails);
result = false;
}
} else if ((!rs.isEmpty()) && !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
this.put(transactionId, startId, put);
result = true;
} else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - txid " + transactionId + ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + ", second check setting result to false in region " + m_regionDetails);
result = false;
}
} catch (Exception e) {
if (LOG.isWarnEnabled()) LOG.warn("checkAndPut - txid " + transactionId + ", Caught internal exception ", e);
throw new IOException("checkAndPut - " + e.toString());
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPut EXIT - returns " + result + ", transId " + transactionId + ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + " in region " + m_regionDetails);
return result;
}
/**
* Processes a checkAndPut using a region transaction
* @param long tid
* @param long startId
* @param byte[] row
* @param byte[] family
* @param byte[] qualifier
* @param byte[] value
* @param Put put
* @param boolean autoCommit
* @return boolean
* @throws IOException
*/
public boolean checkAndPutRegionTx(final long tid, final long startId, final long commitId, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Put put, final boolean autoCommit)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter checkAndPutRegionTx, tid: "
+ tid + " autoCommit " + autoCommit + ", on HRegion " + this);
TrxTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
boolean result = false;
byte[] rsValue = null;
byte[] startKey = null;
byte[] endKey = null;
// if(this.checkRowBelongs)
// checkRow(row, "checkAndPutRegionTx");
try {
Get get = new Get(row);
get.addColumn(family, qualifier);
Result rs = this.get(tid, startId, get);
boolean valueIsNull = value == null ||
value.length == 0;
if (rs.isEmpty() && valueIsNull) {
this.putRegionTx(tid, startId, commitId, put, autoCommit);
result = true;
} else if (!rs.isEmpty() && valueIsNull) {
rsValue = rs.getValue(family, qualifier);
if (rsValue != null && rsValue.length == 0) {
this.putRegionTx(tid, startId, commitId, put, autoCommit);
result = true;
}
else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx - tid " + tid
+ ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row)
+ ", first check setting result to false");
result = false;
}
} else if ((!rs.isEmpty()) && !valueIsNull
&& (Bytes.equals(rs.getValue(family, qualifier), value))) {
this.putRegionTx(tid, startId, commitId, put, autoCommit);
result = true;
} else {
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx - tid " + tid
+ ", row " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row)
+ ", second check setting result to false");
result = false;
}
} catch (Exception e) {
LOG.error("checkAndPutRegionTx - tid " + tid + ", Caught internal exception ", e);
throw new IOException("checkAndPutRegionTx - " + e.toString());
}
if (LOG.isTraceEnabled()) LOG.trace("checkAndPutRegionTx EXIT - returns "
+ result + ", tid " + tid + ", row " + Bytes.toStringBinary(row)
+ ", row in hex " + Hex.encodeHexString(row));
return result;
}
/**
* Obtains a transactional Result for Get
* @param long transactionId
* @param long startId
* @param Get get
* @return Result
* @throws IOException
*/
public Result get(final long transactionId, final long startId, final Get get)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("get -- ENTRY txId: " + transactionId );
/*
if (LOG.isTraceEnabled()) {
Map<byte[],NavigableSet<byte[]>> lv_fm = get.getFamilyMap();
byte [][] lv_fms = lv_fm.keySet().toArray(new byte[0][0]);
byte[] lv_f = lv_fms[0];
if (LOG.isTraceEnabled()) LOG.trace("family: " + new String(lv_f));
NavigableSet<byte []> lv_set = lv_fm.get(lv_f);
if (LOG.isTraceEnabled()) LOG.trace("lv_set size: " + lv_set.size());
}
*/
checkBlockNonPhase2(transactionId);
Scan scan = new Scan(get);
List<Cell> results = new ArrayList<Cell>();
RegionScanner scanner = null;
try {
scanner = getScanner(transactionId, startId, scan);
if (scanner != null)
scanner.next(results);
} catch(Exception e) {
LOG.warn("get - txId " + transactionId + ", Caught internal exception " + e.getMessage() + " " + stackTraceToString(e));
}
finally {
if (scanner != null) {
scanner.close();
}
}
if (LOG.isTraceEnabled()) LOG.trace("get -- EXIT txId: " + transactionId);
return Result.create(results);
}
/**
* Obtain a RegionScanner
* @param long transactionId
* @param Scan scan
* @return RegionScanner
* @throws IOException
*/
public RegionScanner getScanner(final long transactionId, final long startId, final Scan scan)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("RegionScanner getScanner -- ENTRY txId: " + transactionId
+ " with scan range startKey=" + (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) ? "null" : Hex.encodeHexString(scan.getStartRow()))
+ " stopRow=" + (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_START_ROW) ? "null" : Hex.encodeHexString(scan.getStopRow()))
+ " in region " + m_regionDetails );
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
LOG.error("getScanner late checkin for transaction " + transactionId + " in region " + m_regionDetails);
throw new IOException("getScanner late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
state.addScan(scan);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
if (LOG.isTraceEnabled()) LOG.trace("RegionScanner getScanner -- adding scan to scanners, txId: " + transactionId);
scanners.add(state.getScanner(scan));
Scan deleteWrapScan = wrapWithDeleteFilter(scan, state);
if (LOG.isTraceEnabled()) LOG.trace("RegionScanner getScanner -- Calling t_Region.getScanner txId: " + transactionId );
RegionScanner gotScanner = this.t_Region.getScanner(deleteWrapScan, scanners);
if (gotScanner != null)
if (LOG.isTraceEnabled()) LOG.trace("RegionScanner getScanner -- obtained scanner was not null, txId: " + transactionId );
else
if (LOG.isTraceEnabled()) LOG.trace("RegionScanner getScanner -- obtained scanner was null, txId: " + transactionId );
return gotScanner;
}
/**
* Wraps the transactional scan with a delete filter
* @param Scan scan
* @param TrxTransactionState state
* @return Scan
*/
private Scan wrapWithDeleteFilter(final Scan scan,
final TrxTransactionState state) {
if (LOG.isTraceEnabled()) LOG.trace("wrapWithDeleteFilter -- ENTRY");
FilterBase deleteFilter = new FilterBase() {
private boolean rowFiltered = false;
@Override
public void reset() {
rowFiltered = false;
}
@Override
public boolean hasFilterRow() {
return true;
}
@Override
public ReturnCode filterKeyValue(final Cell v){
return ReturnCode.INCLUDE;
}
@Override
public void filterRowCells(final List<Cell> kvs) {
state.applyDeletes(kvs, scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
rowFiltered = kvs.isEmpty();
}
public boolean filterRow() {
return rowFiltered;
}
};
if (scan.getFilter() == null) {
scan.setFilter(deleteFilter);
if (LOG.isTraceEnabled()) LOG.trace("no previous filter, wrapWithDeleteFilter -- EXIT");
return scan;
}
FilterList wrappedFilter = new FilterList(Arrays.asList(deleteFilter,
scan.getFilter()));
scan.setFilter(wrappedFilter);
if (LOG.isTraceEnabled()) LOG.trace("new filter array, wrapWithDeleteFilter -- EXIT");
return scan;
}
public void putRegionTx(RpcController controller,
PutRegionTxRequest request,
RpcCallback<PutRegionTxResponse> done) {
PutRegionTxResponse response = PutRegionTxResponse.getDefaultInstance();
byte [] row = null;
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
Put put = null;
Throwable t = null;
MemoryUsageException mue = null;
long tid = request.getTid();
long commitId = request.getCommitId();
boolean autoCommit = request.getAutoCommit();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
LOG.warn("TrxRegionEndpoint coprocessor: putRegionTx - performing memoryPercentage " + memoryPercentage
+ ", warning memory usage exceeds indicated percentage");
}
else {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: putRegionTx - performing memoryPercentage "
+ memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("putRegionTx memory usage exceeds " + memoryUsageThreshold
+ " percent, tid is " + tid);
}
}
else{
try {
put = ProtobufUtil.toPut(proto);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putRegionTx - tid " + tid
+ ", Caught exception ", e);
t = e;
}
if ((mue == null && type == MutationType.PUT && proto.hasRow()) && (put != null)){
// Process in local memory
try {
putRegionTx(tid, tid, commitId, put, autoCommit);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("putRegionTx - tid " + tid
+ ", Caught exception after internal put - ", e);
t = e;
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: putRegionTx - tid " + tid + ", region " + m_regionDetails
+ ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray())
+ ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
else{
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: putRegionTx - tid " + tid + ", region " + m_regionDetails
+ "- no valid PUT type or does not contain a row");
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutRegionTxResponse.Builder putRegionTxResponseBuilder = PutRegionTxResponse.newBuilder();
putRegionTxResponseBuilder.setHasException(false);
if (t != null)
{
putRegionTxResponseBuilder.setHasException(true);
putRegionTxResponseBuilder.setException(t.toString());
}
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: putRegionTx - performing memoryPercentage " + memoryPercentage
+ ", posting memory usage exceeds indicated percentage exception");
putRegionTxResponseBuilder.setHasException(true);
putRegionTxResponseBuilder.setException(mue.toString());
}
PutRegionTxResponse presponse = putRegionTxResponseBuilder.build();
done.run(presponse);
}
/**
* Do a transactional put using a region transaction
*
* @param long tid
* @param boolean autoCommit
* @param Put put
* @throws IOException
*/
public void putRegionTx(final long tid, final long startId, final long commitId, final Put put, final boolean autoCommit)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx Enter, tid: " + tid);
checkBlockNonPhase2(tid);
// if(this.checkRowBelongs)
// checkRow(put.getRow(),"putRegionTx");
TrxTransactionState state = this.beginTransIfNotExist(tid, startId, true); // This is a Region TX
state.addWrite(put, this.useCommitIdInCells);
// Now we must perform conflict checking.
boolean success = commitIfPossible(tid, tid, tid, 1, true, autoCommit);
if (! success) {
LOG.error("putRegionTx - tid " + tid + " was not successful. Throwing IOException ");
throw new IOException("putRegionTx - tid " + tid + " was not successful.");
}
if (LOG.isTraceEnabled()) LOG.trace("putRegionTx - tid " + tid + " EXIT successful.");
}
/**
* Add a write to the transaction. Does not get applied until commit
* process.
* @param long transactionId
* @param Put put
* @throws IOException
*/
public void put(final long transactionId, final long startId, final Put put)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter put, txid: "
+ transactionId + " startId " + startId);
checkBlockNonPhase2(transactionId);
// if(this.checkRowBelongs)
// checkRow(put.getRow(),"Put");
TrxTransactionState state = this.beginTransIfNotExist(transactionId, startId);
if (!state.getStatus().equals(Status.PENDING)) { // Active
throw new IOException("put late checkin for transaction " + transactionId + " in region " + m_regionDetails);
}
// If the useCommitIdInCells is true, then we want to use the startId as the timestamp for the put.
// Later, this timestamp will be replaced by the commitId. So we need to make a copy of the put
// before adding it to the write ordering list. Try to use getFamilyCellMap to get out all data
// from the put object and generate a new one
if (this.useCommitIdInCells == true) {
byte[] rowkey = put.getRow();
for (WriteAction wa : state.getWriteOrdering()) {
if(wa.getPut() != null) {
Put waPut = wa.getPut();
byte[] waRowKey = waPut.getRow();
if (Arrays.equals(rowkey, waRowKey)) {
if (LOG.isTraceEnabled()) LOG.trace("put, found an update for the same row in txid: "
+ transactionId + " rowkey " + rowkey + " in region: " + m_regionDetails);
NavigableMap<byte[], List<Cell>> putFamilyCellMap = put.getFamilyCellMap();
NavigableMap<byte[], List<Cell>> waFamilyCellMap = waPut.getFamilyCellMap();
for (Entry<byte[], List<Cell>> putEntry : putFamilyCellMap.entrySet()) {
for (Iterator<Cell> putIterator = putEntry.getValue().iterator(); putIterator.hasNext();) {
Cell cell = putIterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
// Now look for this Cell in the wa put and if we find it we overwrite the wa put
for (Entry<byte[], List<Cell>> waEntry : waFamilyCellMap.entrySet()) {
for (Iterator<Cell> waIterator = waEntry.getValue().iterator(); waIterator.hasNext();) {
Cell waCell = waIterator.next();
byte[] waFamily = CellUtil.cloneFamily(waCell);
byte[] waQualifier = CellUtil.cloneQualifier(waCell);
byte[] waValue = CellUtil.cloneValue(waCell);
if (Arrays.equals(family, waFamily) && Arrays.equals(qualifier, waQualifier)) {
if (LOG.isTraceEnabled()) LOG.trace("put, "
+ " found the same family and qualifier in txid: "
+ transactionId + " qualifier " + qualifier + " in region: " + m_regionDetails);
CellUtil.setTimestamp(waCell, (startId-1));
if (LOG.isTraceEnabled()) LOG.trace("put, attempting to set cell timestamp to: " + (startId-1)
+ " actual timestamp: " + waCell.getTimestamp());
// A matching cell was found so we can stop the search
break;
}
}
}
}
}
}
}
}
Put newPut = new Put(rowkey, startId);
NavigableMap<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
for (Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
for (Iterator<Cell> iterator = entry.getValue().iterator(); iterator.hasNext();) {
Cell cell = iterator.next();
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
newPut.add(family,qualifier,startId,value);
}
}
state.addWrite(newPut, this.useCommitIdInCells);
}
else {
// Just add the existing put
byte[] putRow = put.getRow();
if (LOG.isTraceEnabled()) LOG.trace("put, txid: "
+ transactionId + " startId " + startId + " adding put to writeOrdering" +
", putRow = " + Bytes.toStringBinary(putRow) + ", row in hex " + Hex.encodeHexString(putRow));
state.addWrite(put, this.useCommitIdInCells);
}
}
public void constructIndoubtTransactions() {
#ifdef CDH5.7 APACHE1.2
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long mvccNum = 0;
#endif
synchronized (recoveryCheckLock) {
if ((indoubtTransactionsById == null) || (indoubtTransactionsById.size() == 0)) {
if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV endpoint CP: Region " + m_regionDetails + " has no in-doubt transaction, set region START ");
regionState = REGION_STATE_START; // region is started for transactional access
reconstructIndoubts = 1;
try {
startRegionAfterRecovery();
} catch (IOException exp1) {
if (LOG.isErrorEnabled()) LOG.error("Trafodion Recovery: flush error during region start ", exp1);
}
return;
}
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled()))
LOG.info("Trafodion Recovery Endpoint Coprocessor: Trafodion Recovery RegionObserver to Endpoint coprocessor "
+ "data exchange test try to access indoubt transaction list with size "
+ indoubtTransactionsById.size());
if (reconstructIndoubts == 0) {
//Retrieve (tid,Edits) from indoubt Transaction and construct/add into desired transaction data list
for (Entry<Long, List<WALEdit>> entry : indoubtTransactionsById.entrySet()) {
long txid = 0;
long transactionId = entry.getKey();
String key = String.valueOf(transactionId);
ArrayList<WALEdit> editList = (ArrayList<WALEdit>) entry.getValue();
//editList = (ArrayList<WALEdit>) indoubtTransactionsById.get(transactionId);
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled()))
LOG.info("Trafodion endpoint CP: reconstruct transaction in Region "
+ m_regionDetails + " process in-doubt transaction " + transactionId);
TrxTransactionState state = new TrxTransactionState(transactionId, /* 1L my_Region.getLog().getSequenceNumber()*/
nextLogSequenceId.getAndIncrement(),
nextLogSequenceId,
regionInfo,
m_Region.getTableDesc(),
tHLog,
#ifdef CDH5.7 APACHE1.2
false, this.t_Region,
#else
false,
#endif
-1); // don't worry about startId here
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled()))
LOG.info("Trafodion endpointCP: reconstruct transaction in Region "
+ m_regionDetails + " create transaction state for " + transactionId);
state.setFullEditInCommit(true);
state.setStartSequenceNumber(nextSequenceId.get());
transactionsById.put(getTransactionalUniqueId(transactionId), state);
// Re-establish write ordering (put and get) for in-doubt transactional
int num = editList.size();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP: reconstruct transaction " + transactionId + ", region " + m_regionName +
" with number of edit list kvs size " + num);
for (int i = 0; i < num; i++){
WALEdit b = editList.get(i);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP: reconstruction transaction " + transactionId + ", region " + m_regionName +
" with " + b.size() + " kv in WALEdit " + i);
for (Cell kv : b.getCells()) {
Put put;
Delete del;
synchronized (editReplay) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP:reconstruction transaction " + transactionId + ", region " + m_regionName +
" re-establish write ordering Op Code " + kv.getTypeByte());
if (kv.getTypeByte() == KeyValue.Type.Put.getCode()) {
put = new Put(CellUtil.cloneRow(kv)); // kv.getRow()
put.add(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp(), CellUtil.cloneValue(kv));
state.addWrite(put, this.useCommitIdInCells);
}
else if (CellUtil.isDelete(kv)) {
del = new Delete(CellUtil.cloneRow(kv));
if (CellUtil.isDeleteFamily(kv)) {
del.deleteFamily(CellUtil.cloneFamily(kv));
} else if (CellUtil.isDeleteType(kv)) {
del.deleteColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
}
state.addDelete(del, this.useCommitIdInCells);
} // handle put/delete op code
} // sync editReplay
} // for all kv in edit b
} // for all edit b in ediList
state.setReinstated();
state.setStatus(Status.COMMIT_PENDING);
commitPendingTransactions.add(state);
state.setSequenceNumber(nextSequenceId.getAndIncrement());
commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state);
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled()))
LOG.info("TrxRegion endpoint CP: reconstruct transaction "
+ transactionId + ", region " + m_regionDetails + " complete in prepared state");
// Rewrite HLOG for prepared edit (this method should be invoked in postOpen Observer ??
try {
//txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
//state.getEdit(), new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.m_Region.getTableDesc(),
//nextLogSequenceId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
#ifdef CDH5.7 APACHE1.2
WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
WALKey.NO_SEQUENCE_ID,
EnvironmentEdgeManager.currentTime(),
WALKey.EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
this.t_Region.getMVCC());
txid = this.tHLog.append(this.m_Region.getTableDesc(), this.regionInfo, wk, state.getEdit(), false);
writeEntry = wk.getWriteEntry();
mvccNum = writeEntry.getWriteNumber();
#else
final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
EnvironmentEdgeManager.currentTime());
txid = this.tHLog.append(this.m_Region.getTableDesc(),
this.regionInfo,
wk ,
state.getEdit(),
this.m_Region.getSequenceId(),
false,
null);
#endif
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled()))
LOG.info("commitRequest COMMIT_OK -- EXIT txId: "
+ transactionId
+ " HLog seq "
+ txid
);
WALSync(tHLog, transactionId, txid);
#ifdef CDH5.7 APACHE1.2
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
#endif
}
catch (IOException exp) {
LOG.warn("TrxRegion endpoint CP: reconstruct transaction " + transactionId
+ " HLog seq " + txid + " Caught IOException in HLOG appendNoSync ", exp);
//throw exp;
}
#ifdef CDH5.7 APACHE1.2
finally {
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
} // finally
#endif
if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP: reconstruct transaction: rewrite to HLOG CR edit for transaction " + transactionId);
int tmid = (int) state.getNodeId();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP " + m_regionDetails + " reconstruct transaction " + transactionId + " for TM " + tmid);
} // for all txns in indoubt transcation list
} // not reconstruct indoubtes yet
reconstructIndoubts = 1;
if (this.configuredConflictReinstate) {
regionState = REGION_STATE_START; // set region state START , so new transaction can start with conflict re-established
}
} // synchronized
}
/**
* Begin a transaction
* @param long transactionId
* @param long startId
* @throws IOException
*/
public void beginTransaction(final long transactionId, final long startId)
throws IOException {
beginTransaction(transactionId, startId, false);
}
/**
* Begin a transaction
* @param longtransactionId
* @throws IOException
*/
public void beginTransaction(final long transactionId, final long startId, final boolean regionTransaction)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction -- ENTRY txId: " + transactionId
+", startId " + startId
+ " regionTransaction " + regionTransaction);
checkBlockNonPhase2(transactionId);
// TBD until integration with recovery
if (reconstructIndoubts == 0) {
if (LOG.isTraceEnabled()) LOG.trace("RECOV beginTransaction -- ENTRY txId: " + transactionId);
constructIndoubtTransactions();
}
if (regionState != REGION_STATE_START) {
//print out all the in-doubt transaction at this moment
if ((indoubtTransactionsById == null) || (indoubtTransactionsById.size() == 0))
regionState = REGION_STATE_START;
else {
LOG.warn("TRAF RCOV coprocessor: RECOVERY WARN beginTransaction while the region is still in recovering state " + regionState + " indoubt tx size " + indoubtTransactionsById.size() );
for (Entry<Long, List<WALEdit>> entry : indoubtTransactionsById.entrySet()) {
long tid = entry.getKey();
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: region " + m_regionDetails + " still has in-doubt transaction " + tid + " when new transaction arrives ");
}
throw new IOException("NewTransactionStartedBeforeRecoveryCompleted");
}
}
TrxTransactionState state;
synchronized (transactionsById) {
// if (transactionsById.get(getTransactionalUniqueId(transactionId)) != null) {
// TrxTransactionState alias = getTransactionState(transactionId);
// LOG.error("beginTransaction - Ignoring - Existing transaction with id ["
// + transactionId + "] in region [" + m_regionName + "]");
// if (LOG.isDebugEnabled()) LOG.debug("beginTransaction -- EXIT txId: " + transactionId);
// return;
// }
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction -- creating new TrxTransactionState without coprocessorHost txId: " + transactionId);
state = new TrxTransactionState(transactionId,
nextLogSequenceId.getAndIncrement(),
nextLogSequenceId,
m_Region.getRegionInfo(),
m_Region.getTableDesc(),
tHLog,
#ifdef CDH5.7 APACHE1.2
configuredEarlyLogging, this.t_Region,
#else
configuredEarlyLogging,
#endif
startId,
regionTransaction);
state.setFullEditInCommit(this.fullEditInCommit);
if(this.useCommitIdInCells == false) {
state.setStartSequenceNumber(nextSequenceId.get());
}
else {
state.setStartSequenceNumber(state.getStartId());
}
}
List<TrxTransactionState> commitPendingCopy =
new ArrayList<TrxTransactionState>(commitPendingTransactions);
for (TrxTransactionState commitPending : commitPendingCopy) {
state.addTransactionToCheck(commitPending);
}
String key = getTransactionalUniqueId(transactionId);
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction -- new TrxTransactionState for txId: "
+ transactionId + " for key: " + key);
synchronized (transactionsById) {
transactionsById.put(key, state);
}
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - Adding transaction: [" + transactionId + "] in region ["
+ m_regionDetails + "]" + " to list");
try {
transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(transactionId));
} catch (LeaseStillHeldException e) {
LOG.error("beginTransaction - Lease still held for [" + transactionId + "] in region ["
+ m_regionDetails + "]");
throw new RuntimeException(e);
}
if (LOG.isDebugEnabled()) LOG.debug("beginTransaction -- EXIT txId: " + transactionId + " transactionsById size: " + transactionsById.size()
+ " region ID: " + this.regionInfo.getRegionId());
}
/**
* Obtains a scanner lease id
* @param long scannerId
* @return String
*/
private String getScannerLeaseId(final long scannerId) {
if (LOG.isTraceEnabled()) LOG.trace("getScannerLeaseId -- EXIT txId: "
+ scannerId + " lease string " + m_regionName + scannerId);
return m_regionName + scannerId;
}
/**
* Obtains a transactional lease id
* @param long transactionId
* @return String
*/
private String getTransactionalUniqueId(final long transactionId) {
if (LOG.isTraceEnabled()) {
LOG.trace("getTransactionalUniqueId -- EXIT txId: "
+ transactionId + " transactionsById size: "
+ transactionsById.size() + " name " + m_regionName + transactionId);
}
return m_regionName + transactionId;
}
/**begin transaction if not yet
* @param transactionId
* @return true: begin; false: not necessary to begin
* @throws IOException
*/
private TrxTransactionState beginTransIfNotExist(final long transactionId, final long startId) throws IOException{
return beginTransIfNotExist(transactionId, startId, false);
}
/**begin transaction if not yet
* @param transactionId
* @return true: begin; false: not necessary to begin
* @throws IOException
*/
private TrxTransactionState beginTransIfNotExist(final long transactionId, final long startId, final boolean regionTx) throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: beginTransIfNotExist, txid: "
+ transactionId + " startId: " + startId + ", regionTx: " + regionTx
+ ", regionName " + m_regionDetails
+ " transactionsById size: " + transactionsById.size());
checkBlockNewTrans(transactionId);
String key = getTransactionalUniqueId(transactionId);
synchronized (transactionsById) {
TrxTransactionState state = transactionsById.get(key);
if (state == null) {
if (LOG.isTraceEnabled()) LOG.trace(": Begin transaction for txid: " + transactionId
+ " in beginTransIfNotExist beginning the transaction internally as state was null for key " + key);
this.beginTransaction(transactionId, startId, regionTx);
state = transactionsById.get(key);
}
else {
if (LOG.isTraceEnabled()) LOG.trace("Exit beginTransIfNotExist with existing transaction for txid: " + transactionId);
}
return state;
}
}
/**
* Determines if this transaction has been committed previously
* @param long TransactionId
*/
TrxTransactionState isTransactionOnCommittedList(final long transactionId)
{
// unforunately we do not have the transaction state so we do
// not have the seqence number. This should not be a common occurrence
// and therefore should not negatively affect performance
synchronized (commitedTransactionsBySequenceNumber) {
for(Map.Entry<Long, TrxTransactionState> entry :
commitedTransactionsBySequenceNumber.entrySet()) {
if (entry.getValue().getTransactionId() == transactionId)
return entry.getValue();
}
}
return null;
}
/**
* Commits the transaction
* @param long TransactionId
* @throws IOException
*/
public void commit(final long transactionId, final long commitId, final int participantNum) throws IOException {
commit(transactionId, commitId, participantNum, false /* IgnoreUnknownTransactionException */);
}
/**
* Commits the transaction
* @param long TransactionId
* @param boolean ignoreUnknownTransactionException
* @throws IOException
*/
public void commit(final long transactionId, final long commitId, final int participantNum, final boolean ignoreUnknownTransactionException) throws IOException {
if (ignoreUnknownTransactionException)
{
LOG.warn("commit(txId) -- ENTRY txId: " + transactionId +
" commitId " + commitId + " ignoreUnknownTransaction: " + ignoreUnknownTransactionException
+ "Region : " + m_regionDetails);
}
CommitProgress commitProgress = CommitProgress.NONE;
TrxTransactionState state;
checkBlockAll(transactionId);
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
if (ignoreUnknownTransactionException == true) {
if (LOG.isDebugEnabled()) LOG.debug("Participant " + participantNum + " ignoring UnknownTransactionException in commit : "
+ transactionId + " in region " + m_regionDetails);
return;
}
state = null;
if ((state = isTransactionOnCommittedList(transactionId)) != null) {
LOG.warn("Participant " + participantNum
+ " commit [DUPLICATE] Asked to commit a committed transaction: " + transactionId
+ " state: " + state + " in region " + m_regionDetails);
}
else {
LOG.fatal("Participant " + participantNum
+ " Asked to commit unknown transaction: " + transactionId
+ " in region " + m_regionDetails);
}
throw new IOException("UnknownTransactionException, Participant "
+ participantNum + " transId: " + transactionId);
}
state.setCommitId(commitId);
if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
if (state.getStatus().equals(Status.COMMITED) || state.getStatus().equals(Status.COMMIT_READONLY)) {
LOG.warn("Participant " + participantNum +
" commit [DUPLICATE]- Asked to commit a committed transaction, transid: " + transactionId +
" state: " + state + " in region " + m_regionDetails);
throw new IOException("Participant " + participantNum + " commit [DUPLICATE] Asked to commit a non-pending transaction, transid: " + transactionId +
" state: " + state + " in region " + m_regionDetails);
}
else {
LOG.fatal("Participant " + participantNum +
" commit - Asked to commit a non pending transaction, transid: " + transactionId +
" state: " + state + " in region " + m_regionDetails);
throw new IOException("NonPendingTransactionException: Participant " + participantNum + " Asked to commit a non-pending transaction, transid: " + transactionId +
" state: " + state + " in region " + m_regionDetails);
}
}
if (LOG.isTraceEnabled()) LOG.trace("commit(txId) -- EXIT txId: " + transactionId);
// manage concurrent duplicate commit requests through TS.xaOperation object
synchronized(state.getXaOperationObject()) {
commitProgress = state.getCommitProgress();
long transId = state.getTransactionId();
//if (LOG.isTraceEnabled()) LOG.trace("commit HHH " + commitStatus);
if (commitProgress.equals(CommitProgress.COMMITED)) {
// already committed, this is likely unnecessary due to Status check above
if (LOG.isWarnEnabled()) LOG.warn("commit - duplicate commit for committed transaction " + transId + " state is " + state);
}
else if (commitProgress.equals(CommitProgress.COMMITTING)) {
if (LOG.isWarnEnabled()) LOG.warn("commit - sleeping 1 second for committing transaction" + transId + " state is " + state);
try {
Thread.sleep(1000); ///1000 milliseconds is one second.
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
CommitProgress tmpProgress = state.getCommitProgress();
if (tmpProgress.equals(CommitProgress.COMMITTING)){
if (LOG.isWarnEnabled()) LOG.warn("commit - transaction " + transId + " is still committing "
+ " state is " + state);
}
else{
if (LOG.isWarnEnabled()) LOG.warn("commit - transaction " + transId + " commitProgress is " + tmpProgress
+ " state is " + state);
}
}
else if (commitProgress.equals(CommitProgress.NONE)) {
if (LOG.isTraceEnabled()) LOG.trace("commit " + commitProgress);
state.setCommitProgress(CommitProgress.COMMITTING);
commit(state);
}
}
if (LOG.isTraceEnabled()) LOG.trace("commit(txId) -- EXIT txId: " + transactionId
+ " commitId " + commitId);
}
/**
* @param transactionId
* @return TransactionRegionInterface commit code
* @throws IOException
*/
public int commitRequest(final long transactionId, final long startEpoch, final int participantNum)
throws IOException,
CommitConflictException,
UnknownTransactionException {
return commitRequest(transactionId, startEpoch, participantNum, true, false);
}
public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, final boolean dropTableRecorded)
throws IOException,
CommitConflictException,
UnknownTransactionException {
return commitRequest(transactionId, startEpoch, participantNum, true, dropTableRecorded);
}
public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, boolean flushHLOG,
boolean dropTableRecorded)
throws IOException,
CommitConflictException,
UnknownTransactionException {
long txid = 0;
if (LOG.isDebugEnabled()) LOG.debug("commitRequest -- ENTRY txId: "
+ transactionId + " participantNum " + participantNum + ", region " + m_regionDetails
+ " start key: " + Hex.encodeHexString(regionInfo.getStartKey())
+ " end key: " + Hex.encodeHexString(regionInfo.getEndKey()));
checkBlockNonPhase2(transactionId);
TrxTransactionState state;
if (startEpoch < onlineEpoch) {
LOG.error("commitRequest txId: "
+ transactionId + " startEpoch " + startEpoch + " is less than region's onlineEpoch " + onlineEpoch
+ " for regionName " + m_regionDetails
+ " must return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR ");
return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
}
int lv_totalCommits = 0;
int lv_timeIndex = 0;
if (LOG.isInfoEnabled()) {
synchronized (totalCommits){
lv_totalCommits = totalCommits.incrementAndGet();
lv_timeIndex = (timeIndex.getAndIncrement() % 50 );
}
}
boolean returnPending = false;
long commitCheckEndTime = 0;
long hasConflictStartTime = 0;
long hasConflictEndTime = 0;
long putBySequenceStartTime = 0;
long putBySequenceEndTime = 0;
long writeToLogEndTime = 0;
long commitCheckStartTime = System.nanoTime();
#ifdef CDH5.7 APACHE1.2
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long mvccNum = 0;
#endif
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
if (LOG.isWarnEnabled()) LOG.warn("commitRequest Unknown transaction ["
+ transactionId + "] in region [" + m_regionDetails
+ "], participantNum " + participantNum + " ignoring");
state = null;
return COMMIT_OK_READ_ONLY;
}
if (LOG.isInfoEnabled())
hasConflictStartTime = System.nanoTime();
synchronized (commitCheckLock) {
try{
checkConflict(state);
} catch (IOException e) {
if (LOG.isInfoEnabled()) {
hasConflictEndTime = System.nanoTime();
hasConflictTimes[lv_timeIndex] = hasConflictEndTime - hasConflictStartTime;
totalConflictTime += hasConflictTimes[lv_timeIndex];
}
state.setStatus(Status.ABORTED);
retireTransaction(state, true);
if (LOG.isTraceEnabled()) LOG.trace("commitRequest encountered conflict txId: "
+ transactionId + "returning COMMIT_CONFLICT", e);
throw new CommitConflictException(e);
}
if (LOG.isInfoEnabled())
hasConflictEndTime = System.nanoTime();
// No conflicts, we can commit.
if (LOG.isTraceEnabled()) LOG.trace("No conflicts for transaction " + transactionId
+ " regionTx: " + state.getIsRegionTx() + " found in region " + m_regionDetails
+ ". Participant " + participantNum + " votes to commit");
// If there are writes we must keep record of the transaction
putBySequenceStartTime = System.nanoTime();
if (state.hasWrite() || state.getNeverReadOnly()) {
if (LOG.isInfoEnabled()) {
// Only increment this counter if we are logging the statistics
putBySequenceOperations.getAndIncrement();
}
// Order is important
state.setDropTableRecorded(dropTableRecorded);
state.setStatus(Status.COMMIT_PENDING);
state.setCPEpoch(controlPointEpoch.get());
commitPendingTransactions.add(state);
state.setSequenceNumber(nextSequenceId.getAndIncrement());
commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state);
returnPending = true;
if (LOG.isTraceEnabled()) LOG.trace("Transaction " + transactionId
+ " found in region " + m_regionDetails
+ ". Adding to commitedTransactionsBySequenceNumber for sequence number " + state.getSequenceNumber());
}
commitCheckEndTime = putBySequenceEndTime = System.nanoTime();
} // exit sync block of commitCheckLock
if (state.hasWrite()) {
if (LOG.isTraceEnabled()) LOG.trace("write commitRequest edit to HLOG");
//call HLog by passing tagged WALEdits and associated fields
try {
// Once we append edit into HLOG during DML operation, there is no need to do any HLOG write in phase 1.
// Likely all the edits have been synced, so just do a sync(state.getLargestFlushTxId()) --> most likely,
// it is a no-op. This is to leverage the time between last DML and phase 1, so there is no need to do any
// logging (and waited) in phase 1. And there is no need to write "prepared" record since we try to optimize
// normal running mode (99.99% commit). We can append "commit" or "abort" into HLOG in phase 2 in a no-sync
// mode, but it can also be eliminated if necessary. All the transaction reinstated during recovery will be
// treated as "in-doubt" and need to use TLOG to resolve. So TLOG must be kept for a while (likely a few CP),
// besides, HRegion chore thread can perform ~ CP action by writing the smallest sequenceId (not logSeqid),
// so we don't need to deal with commit case (but we did write abort edit since the number of abort is
// < 0.1% -- we can set this as a configurable property).
if (!state.getEarlyLogging()) {
//txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
//state.getEdit(), new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.m_Region.getTableDesc(),
//nextLogSequenceId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
#ifdef CDH5.7 APACHE1.2
WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
WALKey.NO_SEQUENCE_ID,
EnvironmentEdgeManager.currentTime(),
WALKey.EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
this.t_Region.getMVCC());
#else
final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
EnvironmentEdgeManager.currentTime());
#endif
if (state.prepareEditSize != 0){
if (LOG.isInfoEnabled()) LOG.info("commitRequest duplicate request for: " + state);
}
else {
state.prepareEditSize = state.getEdit().getCells().size();
}
/*
ArrayList<Cell> wkvs = state.getEdit().getCells();
LOG.info("PH1 number of cells " + wkvs.size() + " for tid " + transactionId);
int wkvi = 0;
for (Cell wkv : state.getEdit().getCells()) {
LOG.info("PH1 cell " + wkvi +
" tag hex dump " + Hex.encodeHexString(wkv.getTagsArray()) + " tag offset " + wkv.getTagsOffset() + " tag len " + wkv.getTagsLength() +
" row hex dump " + Hex.encodeHexString(wkv.getRowArray()) + " row offset " + wkv.getRowOffset() + " row len " + wkv.getRowLength() +
" value hex dump " + Hex.encodeHexString(wkv.getValueArray()) + " value offset " + wkv.getValueOffset() + " value len " + wkv.getValueLength());
wkvi++;
}
*/
#ifdef CDH5.7 APACHE1.2
txid = this.tHLog.append(this.m_Region.getTableDesc(), this.regionInfo, wk, state.getEdit(), false);
writeEntry = wk.getWriteEntry();
mvccNum = writeEntry.getWriteNumber();
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled())) LOG.info("PH1 commitRequest COMMIT_OK -- EXIT "
+ m_regionDetails
+ " isTrafodionMD: " + m_isTrafodionMetadata
+ " txId: " + transactionId
+ " HLog seq: " + txid
+ " regionTx: " + state.getIsRegionTx()
+ " flushHLOG: " + flushHLOG
);
#else
AtomicLong lv_seqid = this.m_Region.getSequenceId();
txid = this.tHLog.append(this.m_Region.getTableDesc(),
this.regionInfo,
wk,
state.getEdit(),
lv_seqid,
false,
null);
if ((m_isTrafodionMetadata) ||
(LOG.isTraceEnabled())) LOG.info("PH1 commitRequest COMMIT_OK -- EXIT "
+ m_regionDetails
+ " isTrafodionMD: " + m_isTrafodionMetadata
+ " txId: " + transactionId
+ " seqId: " + lv_seqid
+ " HLog seq: " + txid
+ " regionTx: " + state.getIsRegionTx()
+ " flushHLOG: " + flushHLOG
);
#endif
if (flushHLOG) WALSync(tHLog, transactionId, txid);
}
else {
if (flushHLOG) WALSync(tHLog, transactionId, state.getFlushTxId());
}
#ifdef CDH5.7 APACHE1.2
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
#endif
if (LOG.isInfoEnabled()) {
writeToLogEndTime = System.nanoTime();
writeToLogTimes[lv_timeIndex] = writeToLogEndTime - commitCheckEndTime;
writeToLogOperations.getAndIncrement();
}
//if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:commitRequest COMMIT_OK -- EXIT txId: "
// + transactionId + " regionTx: " + state.getIsRegionTx() + " HLog seq " + txid);
} catch (IOException exp) {
exp.fillInStackTrace();
LOG.error("commitRequest txId: " + transactionId + " HLog seq " + txid + " Caught IOException in HLOG operations", exp );
throw exp;
}
#ifdef CDH5.7 APACHE1.2
finally {
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
} // finally
#endif
if (LOG.isDebugEnabled()) LOG.debug("commitRequest COMMIT_OK -- EXIT txId: "
+ transactionId + " regionTx " + state.getIsRegionTx());
}
// No write pending
else {
if (LOG.isInfoEnabled())
writeToLogTimes[lv_timeIndex] = 0;
}
if (LOG.isInfoEnabled()) {
commitCheckTimes[lv_timeIndex] = commitCheckEndTime - commitCheckStartTime;
hasConflictTimes[lv_timeIndex] = hasConflictEndTime - hasConflictStartTime;
putBySequenceTimes[lv_timeIndex] = putBySequenceEndTime - putBySequenceStartTime;
totalCommitCheckTime += commitCheckTimes[lv_timeIndex];
totalConflictTime += hasConflictTimes[lv_timeIndex];
totalPutTime += putBySequenceTimes[lv_timeIndex];
totalWriteToLogTime += writeToLogTimes[lv_timeIndex];
if (commitCheckTimes[lv_timeIndex] > maxCommitCheckTime) {
maxCommitCheckTime = commitCheckTimes[lv_timeIndex];
}
if (commitCheckTimes[lv_timeIndex] < minCommitCheckTime) {
minCommitCheckTime = commitCheckTimes[lv_timeIndex];
}
if (hasConflictTimes[lv_timeIndex] > maxConflictTime) {
maxConflictTime = hasConflictTimes[lv_timeIndex];
}
if (hasConflictTimes[lv_timeIndex] < minConflictTime) {
minConflictTime = hasConflictTimes[lv_timeIndex];
}
if (putBySequenceTimes[lv_timeIndex] > maxPutTime) {
maxPutTime = putBySequenceTimes[lv_timeIndex];
}
if (putBySequenceTimes[lv_timeIndex] < minPutTime) {
minPutTime = putBySequenceTimes[lv_timeIndex];
}
if (writeToLogTimes[lv_timeIndex] > maxWriteToLogTime) {
maxWriteToLogTime = writeToLogTimes[lv_timeIndex];
}
if (writeToLogTimes[lv_timeIndex] < minWriteToLogTime) {
minWriteToLogTime = writeToLogTimes[lv_timeIndex];
}
if (lv_timeIndex == 49) {
timeIndex.set(1); // Start over so we do not exceed the array size
}
if (lv_totalCommits == 9999) {
avgCommitCheckTime = (double) (totalCommitCheckTime/lv_totalCommits);
avgConflictTime = (double) (totalConflictTime/lv_totalCommits);
avgPutTime = (double) (totalPutTime/lv_totalCommits);
avgWriteToLogTime = (double) ((double)totalWriteToLogTime/(double)lv_totalCommits);
if(LOG.isDebugEnabled()) LOG.debug("commitRequest Report\n" +
" Region: " + m_regionDetails + "\n" +
" Total commits: "
+ lv_totalCommits + "\n" +
" commitCheckLock time:\n" +
" Min: "
+ minCommitCheckTime / 1000 + " microseconds\n" +
" Max: "
+ maxCommitCheckTime / 1000 + " microseconds\n" +
" Avg: "
+ avgCommitCheckTime / 1000 + " microseconds\n" +
" hasConflict time:\n" +
" Min: "
+ minConflictTime / 1000 + " microseconds\n" +
" Max: "
+ maxConflictTime / 1000 + " microseconds\n" +
" Avg: "
+ avgConflictTime / 1000 + " microseconds\n" +
" putBySequence time:\n" +
" Min: "
+ minPutTime / 1000 + " microseconds\n" +
" Max: "
+ maxPutTime / 1000 + " microseconds\n" +
" Avg: "
+ avgPutTime / 1000 + " microseconds\n" +
" Ops: "
+ putBySequenceOperations.get() + "\n" +
" writeToLog time:\n" +
" Min: "
+ minWriteToLogTime / 1000 + " microseconds\n" +
" Max: "
+ maxWriteToLogTime / 1000 + " microseconds\n" +
" Avg: "
+ avgWriteToLogTime / 1000 + " microseconds\n" +
" Ops: "
+ writeToLogOperations.get() + "\n\n");
totalCommits.set(0);
writeToLogOperations.set(0);
putBySequenceOperations.set(0);
totalCommitCheckTime = 0;
totalConflictTime = 0;
totalPutTime = 0;
totalWriteToLogTime = 0;
minCommitCheckTime = 1000000000;
maxCommitCheckTime = 0;
avgCommitCheckTime = 0;
minConflictTime = 1000000000;
maxConflictTime = 0;
avgConflictTime = 0;
minPutTime = 1000000000;
maxPutTime = 0;
avgPutTime = 0;
minWriteToLogTime = 1000000000;
maxWriteToLogTime = 0;
avgWriteToLogTime = 0;
}
} // end of LOG.Info
if (returnPending) {
return COMMIT_OK;
}
// Otherwise we were read-only and commitable, so we can forget it.
state.setStatus(Status.COMMIT_READONLY);
if(state.getSplitRetry())
return COMMIT_RESEND;
retireTransaction(state, true);
if (LOG.isDebugEnabled()) LOG.debug("commitRequest READ ONLY for participant "
+ participantNum + " -- EXIT txId: " + transactionId + ", regionName " + m_regionDetails);
return COMMIT_OK_READ_ONLY;
}
/**
* Determines if the transaction has any conflicts
* @param TrxTransactionState state
* @return boolean
*/
private void checkConflict(final TrxTransactionState state)
throws IOException {
// Check transactions that were committed while we were running
synchronized (commitedTransactionsBySequenceNumber) {
for (long i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++)
{
TrxTransactionState other = commitedTransactionsBySequenceNumber.get(i);
if (other == null) {
continue;
}
if (LOG.isTraceEnabled()) LOG.trace("hasConflict state.getStartSequenceNumber is " + i + ", nextSequenceId.get() is " + nextSequenceId.get() + ", state object is " + state.toString() + ", calling addTransactionToCheck");
state.addTransactionToCheck(other);
}
}
state.checkConflict();
return;
}
public void abortTransaction(final long transactionId) throws IOException, UnknownTransactionException {
abortTransaction(transactionId, false, false);
}
/**
* Abort the transaction.
*
* @param transactionId
* @throws IOException
* @throws UnknownTransactionException
*/
public void abortTransaction(final long transactionId, final boolean dropTableRecorded, final boolean ignoreUnknownTransaction) throws IOException, UnknownTransactionException {
long txid = 0;
#ifdef CDH5.7 APACHE1.2
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long mvccNum = 0;
#endif
if (LOG.isTraceEnabled()) LOG.trace("abort transactionId: " + transactionId + " " + m_regionDetails);
TrxTransactionState state;
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
if (ignoreUnknownTransaction){
if (LOG.isDebugEnabled()) LOG.debug("ignoring UnknownTransactionException in abortTransaction : " + transactionId
+ " in region " + m_regionDetails);
return;
}
IOException ioe = new IOException("UnknownTransactionException Unknown transaction [" + transactionId
+ "] in region [" + m_regionDetails + "] ");
LOG.error("abortTransaction", ioe);
throw ioe;
}
synchronized(state.getXaOperationObject()) {
if (state.getStatus().equals(Status.ABORTED)) { // already aborted, duplicate abort requested
LOG.warn("abortTransaction - DUPLICATE abort transaction Id: " + transactionId + " " + m_regionDetails);
return;
}
state.setStatus(Status.ABORTED);
}
if(dropTableRecorded)
state.resetDropTableRecorded();
if (state.hasWrite()) {
// TODO log
// this.transactionLog.writeAbortToLog(m_Region.getRegionInfo(),
// state.getTransactionId(),
// m_Region.getTableDesc());
if (LOG.isTraceEnabled()) LOG.trace("abortTransaction - abort write to HLOG");
Tag abortTag = state.formTransactionalContextTag(TS_ABORT, state.getStartId());
List<Tag> tagList = new ArrayList<Tag>();
tagList.add(abortTag);
WALEdit e1 = state.getEdit();
WALEdit e = new WALEdit();
if (state.endEditSize != 0){
if (LOG.isInfoEnabled()) LOG.info("abort duplicate request for: " + state);
}
else {
state.endEditSize = 1;
}
if (e1.getCells().size() > 0) {
// get 1st Cell to associated with the abort record as a workaround through HLOG async append
Cell c = e1.getCells().get(0);
KeyValue kv = new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), tagList);
e.add(kv);
try {
//txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
// e, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.m_Region.getTableDesc(),
// nextLogSequenceId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
#ifdef CDH5.7 APACHE1.2
WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
WALKey.NO_SEQUENCE_ID,
EnvironmentEdgeManager.currentTime(),
WALKey.EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
this.t_Region.getMVCC());
txid = this.tHLog.append(this.m_Region.getTableDesc(), this.regionInfo, wk, e, false);
writeEntry = wk.getWriteEntry();
mvccNum = writeEntry.getWriteNumber();
#else
final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
this.regionInfo.getTable(),
EnvironmentEdgeManager.currentTime());
txid = this.tHLog.append(this.m_Region.getTableDesc(),
this.regionInfo,
wk ,
e,
this.m_Region.getSequenceId(),
false,
null);
#endif
WALSync(tHLog, transactionId, txid);
#ifdef CDH5.7 APACHE1.2
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
#endif
if (LOG.isTraceEnabled()) LOG.trace("write abort HLOG " + transactionId + " HLog seq " + txid);
}
catch (IOException exp1) {
LOG.error("abortTransaction - abort writing to HLOG : Caught an exception ", exp1);
throw exp1;
}
#ifdef CDH5.7 APACHE1.2
finally {
if (writeEntry != null) {
this.t_Region.getMVCC().completeAndWait(writeEntry);
writeEntry = null;
}
} // finally
#endif
if (LOG.isTraceEnabled()) LOG.trace("abortTransaction -- EXIT txId: " + transactionId + " HLog seq " + txid);
}
}
synchronized (commitPendingTransactions) {
commitPendingTransactions.remove(state);
}
if (state.isReinstated()) {
synchronized(indoubtTransactionsById) {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: abort reinstated indoubt transactions " + transactionId);
indoubtTransactionsById.remove(state.getTransactionId());
int clusterid = (int) state.getClusterId();
int tmid = (int) state.getNodeId();
int count = 0;
// indoubtTransactionsCountByTmid protected by
// indoubtTransactionsById synchronization
if (indoubtTransactionsCountByTmid.containsKey(tmid)) {
count = (int) indoubtTransactionsCountByTmid.get(tmid) - 1;
if (count > 0) indoubtTransactionsCountByTmid.put(tmid, count);
}
// if all reinstated txns are resolved from a TM, remove it and delete associated zNode
if (count == 0) {
indoubtTransactionsCountByTmid.remove(tmid);
String lv_encoded = m_Region.getRegionInfo().getEncodedName();
try {
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: delete in abort recovery zNode TM " + tmid + " region encoded name " + lv_encoded + " for 0 in-doubt transaction");
deleteRecoveryzNode(tmid, lv_encoded);
} catch (IOException e) {
LOG.error("Trafodion Recovery: delete recovery zNode failed ", e);
}
}
if ((indoubtTransactionsById == null) ||
(indoubtTransactionsById.size() == 0)) {
// change region state to STARTED, and archive the split-thlog
if (indoubtTransactionsById == null)
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: start region in abort with indoubtTransactionsById null");
else
if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: start region in abort with indoubtTransactionsById size " + indoubtTransactionsById.size());
startRegionAfterRecovery();
}
}
}
retireTransaction(state, true);
if (LOG.isTraceEnabled()) LOG.trace("abortTransaction looking for abort transaction " + transactionId + ", transactionsById " + transactionsById.size() + ", commitedTransactionsBySequenceNumber " + commitedTransactionsBySequenceNumber.size() + ", commitPendingTransactions" + commitPendingTransactions.size());
}
/**
* Determines if the transaction can be committed, and if possible commits the transaction.
* @param long transactionId
* @return boolean
* @throws IOException
*/
public boolean commitIfPossible(final long transactionId, final long startEpoch, final long commitId, final int participantNum)
throws IOException {
return commitIfPossible(transactionId, startEpoch, commitId, participantNum, /* regionTx */ false, /* autoCommit */ true);
}
/**
* Determines if the transaction can be committed, and if possible commits the transaction.
* @param long transactionId
* @return boolean
* @throws IOException
*/
public boolean commitIfPossible(final long transactionId, final long startEpoch, final long commitId, final int participantNum,
final boolean regionTx, final boolean autoCommit) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("commitIfPossible -- ENTRY txId: "
+ transactionId);
checkBlockNonPhase2(transactionId);
int status = commitRequest(transactionId, startEpoch, participantNum);
if (! autoCommit) {
boolean result = ( status == COMMIT_OK || status == COMMIT_OK_READ_ONLY) ? true : false;
if (LOG.isTraceEnabled()) LOG.trace("commitIfPossible -- autoCommit is false, returning early with result " + result);
return result;
}
if (status == COMMIT_OK) {
// Process local memory
try {
commit(transactionId, commitId, participantNum);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- EXIT txId: " + transactionId + " COMMIT_OK");
return true;
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commitIfPossible - txId " + transactionId
+ ", Caught exception ", e);
throw new IOException(e.toString());
}
} else if (status == COMMIT_OK_READ_ONLY) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- EXIT txId: "
+ transactionId + " COMMIT_OK_READ_ONLY");
return true;
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- EXIT txId: "
+ transactionId + " Commit Unsuccessful");
return false;
}
/**
* Formats a cleanup message for a Throwable
* @param Throwable t
* @param String msg
* @return Throwable
*/
private Throwable cleanup(final Throwable t, final String msg) {
if (t instanceof NotServingRegionException) {
if (LOG.isTraceEnabled()) LOG.trace("cleanup - NotServingRegionException; " + t.getMessage());
return t;
}
if (msg == null) {
LOG.error("cleanup - cleanup message was null");
} else {
LOG.error("cleanup - cleanup message was " + msg);
}
return t;
}
private IOException convertThrowableToIOE(final Throwable t) {
return convertThrowableToIOE(t, null);
}
/*
* @param t
*
* @param msg Message to put in new IOE if passed <code>t</code>
* is not an IOE
*
* @return Make <code>t</code> an IOE if it isn't already.
*/
private IOException convertThrowableToIOE(final Throwable t, final String msg) {
return (t instanceof IOException ? (IOException) t : msg == null
|| msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
}
/**
* Checks if the file system is available
* @return boolean
*/
public boolean checkFileSystem() {
if (this.fs != null) {
try {
FSUtils.checkFileSystemAvailable(this.fs);
} catch (IOException e) {
LOG.error("checkFileSystemAvailable - File System not available threw IOException ", e);
return false;
}
}
return true;
}
/**
* Prepares the family keys if the scan has no families defined
* @param Scan scan
* @throws IOException
*/
public void prepareScanner(Scan scan) throws IOException {
if(!scan.hasFamilies()) {
for(byte[] family: this.m_Region.getTableDesc().getFamiliesKeys()){
scan.addFamily(family);
}
}
}
/**
* Checks if the row is within this region's row range
* @param byte[] row
* @param String op
* @throws IOException
*/
public void checkRow(final byte [] row, String op) throws IOException {
#ifdef APACHE1.2 CDH5.7
if(!rowIsInRange(this.regionInfo, row)) {
#else
if(!this.m_Region.rowIsInRange(this.regionInfo, row)) {
#endif
throw new WrongRegionException("Requested row out of range for " +
op + " on HRegion " + this + ", startKey='" +
Bytes.toStringBinary(this.regionInfo.getStartKey()) + "', getEndKey()='" +
Bytes.toStringBinary(this.regionInfo.getEndKey()) + "', row='" +
Bytes.toStringBinary(row) + "'");
}
}
/**
* Returns the scanner associated with the specified ID.
*
* @param long scannerId
* @param long nextCallSeq
* @return a Scanner or throws UnknownScannerException
* @throws NotServingRegionException
* @throws OutOfOrderscannerNextException
* @throws UnknownScannerException
*/
protected synchronized RegionScanner getScanner(long scannerId,
long nextCallSeq)
throws NotServingRegionException,
OutOfOrderScannerNextException,
UnknownScannerException {
RegionScanner scanner = null;
if (LOG.isTraceEnabled()) LOG.trace("getScanner - scanner id " + scannerId + ", count is " + scanners.size());
TransactionalRegionScannerHolder rsh =
scanners.get(scannerId);
if (rsh != null)
{
if (LOG.isTraceEnabled()) LOG.trace(" getScanner - rsh is " + rsh + "rsh.s is " + rsh.s );
}
else
{
if (LOG.isTraceEnabled()) LOG.trace(" getScanner - rsh is null");
throw new UnknownScannerException(
"TrxRegionEndpoint getScanner - scanner id " + scannerId + ", already closed?");
}
scanner = rsh.s;
if (scanner != null) {
HRegionInfo hri = scanner.getRegionInfo();
if (this.m_Region != rsh.r) { // Yes, should be the same instance
throw new NotServingRegionException("Region was re-opened after the scannerId"
+ scannerId + " was created: " + hri.getRegionNameAsString());
}
}
if (nextCallSeq != rsh.nextCallSeq) {
if (LOG.isTraceEnabled()) LOG.trace("getScanner - scanner id " + scannerId + ", calling OutOfOrderScannerNextException, nextCallSeq is " + nextCallSeq + " rsh.nextCallSeq is " + rsh.nextCallSeq);
throw new OutOfOrderScannerNextException(
"TrxRegionEndpoint coprocessor: getScanner - scanner id " + scannerId + ", Expected nextCallSeq: " + rsh.nextCallSeq +
", But the nextCallSeq received from client: " + nextCallSeq);
}
return scanner;
}
/**
* Removes the scanner associated with the specified ID from the internal
* id->scanner TransactionalRegionScannerHolder map
*
* @param long scannerId
* @return a Scanner or throws UnknownScannerException
* @throws UnknownScannerException
*/
protected synchronized RegionScanner removeScanner(long scannerId)
throws UnknownScannerException {
if (LOG.isTraceEnabled()) LOG.trace("removeScanner - scanner id " + scannerId + ", before count is " + scanners.size() + " in region " + m_regionDetails);
TransactionalRegionScannerHolder rsh =
scanners.remove(scannerId);
if (LOG.isTraceEnabled()) LOG.trace("removeScanner - scanner id " + scannerId + ", after count is " + scanners.size());
if (rsh != null)
{
if (LOG.isTraceEnabled()) LOG.trace("removeScanner - scanner id " + scannerId + ", rsh is " + rsh + ", rsh.s is " + rsh.s );
RegionScanner s = rsh.s;
rsh.cleanHolder();
return s;
}
else
{
if (LOG.isTraceEnabled()) LOG.trace("removeScanner - scanner id " + scannerId + ", rsh is null");
throw new UnknownScannerException(
"ScannerId: " + scannerId + ", already closed?");
}
}
/**
* Adds a region scanner to the TransactionalRegionScannerHolder map
* @param RegionScanner s
* @param HRegion r
* @return long
* @throws LeaseStillHeldException
*/
#ifdef APACHE1.2 CDH5.7
protected synchronized long addScanner(long transId, RegionScanner s, Region r)
#else
protected synchronized long addScanner(long transId, RegionScanner s, HRegion r)
#endif
throws LeaseStillHeldException {
long scannerId = performScannerId.getAndIncrement();
TransactionalRegionScannerHolder rsh =
new TransactionalRegionScannerHolder(transId, scannerId, s, r);
if (rsh != null)
if (LOG.isTraceEnabled()) LOG.trace("addScanner - scanner id " + scannerId + " in region " + m_regionDetails + ", rsh is " + rsh);
else
if (LOG.isTraceEnabled()) LOG.trace("addScanner - scanner id " + scannerId + " in region " + m_regionDetails + ", rsh is null");
TransactionalRegionScannerHolder existing =
scanners.putIfAbsent(scannerId, rsh);
if (LOG.isTraceEnabled()) LOG.trace("addScanner - scanner id " + scannerId + " transId " + transId
+ ", count is " + scanners.size() + " in region " + m_regionDetails);
/*
scannerLeases.createLease(getScannerLeaseId(scannerId),
this.scannerLeaseTimeoutPeriod,
new TransactionalScannerListener(scannerId));
*/
return scannerId;
}
/**
* * Instantiated as a scanner lease. If the lease times out, the scanner is
* * closed
* */
/*
private class TransactionalScannerListener implements LeaseListener {
private final long scannerId;
TransactionalScannerListener(final long id) {
this.scannerId = id;
}
@Override
public void leaseExpired() {
TransactionalRegionScannerHolder rsh = scanners.remove(this.scannerId);
if (rsh != null) {
RegionScanner s = rsh.s;
if (LOG.isTraceEnabled()) LOG.trace("Scanner " + this.scannerId + " lease expired on region "
+ s.getRegionInfo().getRegionNameAsString());
try {
HRegion region = rsh.r;
s.close();
} catch (IOException e) {
LOG.error("Closing scanner for "
+ s.getRegionInfo().getRegionNameAsString(), e);
}
} else {
if (LOG.isTraceEnabled()) LOG.trace("Scanner " + this.scannerId + " lease expired");
}
}
}
*/
/**
* Formats the throwable stacktrace to a string
* @param Throwable e
* @return String
*/
public String stackTraceToString(Throwable e) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : e.getStackTrace()) {
sb.append(element.toString());
sb.append("\n");
}
return sb.toString();
}
/**
* Returns the Scanner Leases for this coprocessor
* @return Leases
*/
//synchronized protected Leases getScannerLeases() {
// return this.scannerLeases;
//}
/**
* Returns the Leases for this coprocessor
* @return Leases
*/
synchronized protected Leases getTransactionalLeases() {
return this.transactionLeases;
}
/**
* Removes unneeded committed transactions
*/
synchronized public void removeUnNeededCommitedTransactions() {
Long minStartSeqNumber = getMinStartSequenceNumber();
TrxTransactionState state = null;
int numRemoved = 0;
long key = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Region Cleanup " + m_regionDetails + "Chore removeUnNeededCommittedTransactions: Iteration "
+ choreCount + " size " + commitedTransactionsBySequenceNumber.size());
}
if (minStartSeqNumber == null) {
minStartSeqNumber = Long.MAX_VALUE;
}
synchronized (commitedTransactionsBySequenceNumber) {
choreCount++;
try {
WALSync(tHLog, -1, 0);
} catch (IOException ioe) {
LOG.error("chore removeUnNeededCommitedTransactions Cleanup " + m_regionDetails + " exception: ", ioe);
}
if ((commitedTransactionsBySequenceNumber.size() != 0) && ((choreCount % 20) == 1)) {
LOG.info("Region Cleanup " + m_regionDetails + "Chore removeUnNeededCommittedTransactions: Iteration "
+ choreCount + " size " + commitedTransactionsBySequenceNumber.size());
}
for (Entry<Long, TrxTransactionState> entry : new LinkedList<Entry<Long, TrxTransactionState>>(
commitedTransactionsBySequenceNumber.entrySet())) {
key = entry.getKey();
if (key >= minStartSeqNumber) {
break;
}
state = commitedTransactionsBySequenceNumber.remove(key);
if (state != null) {
state.clearState();
numRemoved++;
//if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededCommitedTransactions: Transaction - entry key " + key + ", " + state.toString());
}
}
}
if (LOG.isTraceEnabled()) {
StringBuilder traceMessage = new StringBuilder();
if (numRemoved > 0) {
traceMessage.append("removeUnNeededCommitedTransactions: Removed [").append(numRemoved).append("] commited transactions");
if (minStartSeqNumber == Long.MAX_VALUE) {
traceMessage.append(" with any sequence number.");
} else {
traceMessage.append(" with sequence lower than [").append(minStartSeqNumber).append("].");
}
if (!commitedTransactionsBySequenceNumber.isEmpty()) {
traceMessage.append(" Still have [").append(commitedTransactionsBySequenceNumber.size())
.append("] left.");
} else {
traceMessage.append(" None left.");
}
LOG.trace(traceMessage.toString());
} else if (commitedTransactionsBySequenceNumber.size() > 0) {
traceMessage.append("Could not remove any transactions, and still have ")
.append(commitedTransactionsBySequenceNumber.size())
.append(" left");
LOG.trace(traceMessage.toString());
}
}
}
/**
* Removes unneeded TransactionalRegionScannerHolder objects
*/
synchronized public void removeUnNeededStaleScanners() {
long scannerId = 0L;
long transId = 0L;
long listSize = 0L;
long scannerSize = 0L;
Long transactionId = 0L;
TransactionalRegionScannerHolder rsh = null;
Iterator<Long> transIter = null;
Iterator<Map.Entry<Long, TransactionalRegionScannerHolder>> scannerIter = null;
synchronized (cleanScannersForTransactions) {
listSize = cleanScannersForTransactions.size();
if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededStaleScanners - listSize is " + listSize + " in region " + m_regionDetails);
if (listSize == 0)
return;
if (scanners == null || scanners.isEmpty()) {
if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededStaleScanners - scanners is empty, clearing cleanScannersForTransactions in region " + m_regionDetails);
cleanScannersForTransactions.clear();
return;
}
scannerSize = scanners.size();
if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededStaleScanners - transactions list count is " + listSize + ", scanners size is " + scannerSize);
for (transIter = cleanScannersForTransactions.iterator(); transIter.hasNext();) {
transactionId = transIter.next();
scannerIter = scanners.entrySet().iterator();
while(scannerIter.hasNext()){
Map.Entry<Long, TransactionalRegionScannerHolder> entry = scannerIter.next();
rsh = entry.getValue();
if (rsh != null) {
transId = rsh.transId;
scannerId = rsh.scannerId;
if (transId == transactionId) {
if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededStaleScanners - txId " + transactionId + ", scannerId " + scannerId
+ ", Removing stale scanner in region " + m_regionDetails);
try {
if (rsh.s != null) {
if (LOG.isTraceEnabled()) LOG.trace("removeUnNeededStaleScanners - txId " + transactionId
+ ", scannerId " + scannerId + ", Scanner was not previously closed in region " + m_regionDetails);
rsh.s.close();
}
rsh.s = null;
rsh.r = null;
scannerIter.remove();
}
catch (Exception e) {
LOG.warn("removeUnNeededStaleScanners - txId " + transactionId + ", scannerId " + scannerId + ", Caught exception ", e);
}
}
}
} // End of while loop
} // End of for loop
cleanScannersForTransactions.clear();
} // End of synchronization
}
/**
* Returns the minimum start sequence number
* @return Integer
*/
private Long getMinStartSequenceNumber() {
List<TrxTransactionState> transactionStates;
synchronized (transactionsById) {
transactionStates = new ArrayList<TrxTransactionState>(
transactionsById.values());
}
Long min = null;
for (TrxTransactionState transactionState : transactionStates) {
try {
if (min == null || transactionState.getStartSequenceNumber() < min) {
min = transactionState.getStartSequenceNumber();
}
}
catch(NullPointerException npe){
if (LOG.isTraceEnabled()) LOG.trace("getMinStartSequenceNumber ignoring NullPointerException ");
}
}
return min;
}
/**
* Returns the region name as a string
* @return String
*/
public String getRegionNameAsString() {
return regionInfo.getRegionNameAsString();
}
/**
* Simple helper class that just keeps track of whether or not its stopped.
*/
private static class StoppableImplementation implements Stoppable {
private volatile boolean stop = false;
@Override
public void stop(String why) {
LOG.info("Cleanup Chore thread has stopped: Reason:" + why);
this.stop = true;
}
@Override
public boolean isStopped() {
return this.stop;
}
}
synchronized public void checkMemoryUsage() {
long memUsed = 0L;
long memMax = 0L;
if (memoryUsageThreshold < DEFAULT_MEMORY_THRESHOLD &&
memoryBean != null) {
memUsed = memoryBean.getHeapMemoryUsage().getUsed();
memMax = memoryBean.getHeapMemoryUsage().getMax();
memoryPercentage = 0L;
if (memMax != 0) {
memoryPercentage = (memUsed * 100) / memMax;
}
memoryThrottle = false;
if (memoryPercentage > memoryUsageThreshold) {
// If configured to perform a garbage collection,
// try to release memory before throttling the queries.
if (memoryUsagePerformGC == true) {
if (LOG.isTraceEnabled()) LOG.trace("checkMemoryUsage - before GC, memoryPercentage is " + memoryPercentage);
System.gc();
// Calculate the memory usage again before
// setting the throttle value or post a warning.
memUsed = memoryBean.getHeapMemoryUsage().getUsed();
memMax = memoryBean.getHeapMemoryUsage().getMax();
memoryPercentage = 0L;
if (memMax != 0) {
memoryPercentage = (memUsed * 100) / memMax;
}
if (LOG.isTraceEnabled()) LOG.trace("checkMemoryUsage - after GC, memoryPercentage is " + memoryPercentage);
if (memoryPercentage > memoryUsageThreshold) {
if(memoryUsageWarnOnly == false)
memoryThrottle = true;
if (LOG.isTraceEnabled()) LOG.trace("checkMemoryUsage - memoryPercentage is " + memoryPercentage + ", memoryThrottle is "+ memoryThrottle);
}
} else {
if(memoryUsageWarnOnly == false)
memoryThrottle = true;
if (LOG.isTraceEnabled()) LOG.trace("checkMemoryUsage - memoryPercentage is " + memoryPercentage + ", memoryThrottle is "+ memoryThrottle);
}
}
}
}
public void pushOnlineEpoch(RpcController controller,
PushEpochRequest request, RpcCallback<PushEpochResponse> done){
org.apache.hadoop.hbase.client.Result result = null;
long transId = request.getTransactionId();
long tmpEpoch = request.getEpoch();
IOException ioe = null;
if(LOG.isDebugEnabled()) LOG.debug("pushOnlineEpoch -- ENTRY, transId: " + transId
+ " current onlineEpoch " + this.onlineEpoch + " new onlineEpoch " + tmpEpoch
+ " in region: " + m_regionDetails);
if (this.onlineEpoch < tmpEpoch){
if(LOG.isErrorEnabled()) LOG.error("pushOnlineEpoch -- Error: current onlineEpoch " + this.onlineEpoch
+ " is less than new onlineEpoch " + tmpEpoch + ", transId: " + transId
+ " in region: " + m_regionDetails);
ioe = new IOException("pushOnlineEpoch -- Error: current onlineEpoch " + this.onlineEpoch
+ " is less than new onlineEpoch " + tmpEpoch + ", transId: " + transId
+ " in region: " + m_regionDetails);
}
else {
this.onlineEpoch = tmpEpoch;
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse.Builder pushEpochResponseBuilder = PushEpochResponse.newBuilder();
if (ioe != null){
pushEpochResponseBuilder.setHasException(true);
pushEpochResponseBuilder.setException(ioe.toString());
}
else{
pushEpochResponseBuilder.setHasException(false);
}
PushEpochResponse epochResponse = pushEpochResponseBuilder.build();
done.run(epochResponse);
}
public void flushToFS(Path flushPath) throws IOException {
if(LOG.isDebugEnabled()) LOG.debug("flushToFS -- ENTRY, Path: " + flushPath.toString()
+ " onlineEpoch " + this.onlineEpoch + " transactionsById (" + transactionsById.size()
+ "), commitedTransactionsBySequenceNumber (" + commitedTransactionsBySequenceNumber.size() + ")");
TransactionPersist.Builder txnPersistBuilder = TransactionPersist.newBuilder();
fs.delete(flushPath, true);
HFileWriterV2 w =
(HFileWriterV2)
HFile.getWriterFactory(config, new CacheConfig(config))
.withPath(fs, flushPath).withFileContext(context).create();
Map<Long, TrxTransactionState> transactionMap = new HashMap<Long, TrxTransactionState>();
synchronized (transactionsById) {
for(TrxTransactionState ts : transactionsById.values()) {
transactionMap.put(ts.getTransactionId(), ts);
txnPersistBuilder.addTxById(ts.getTransactionId());
}
}
synchronized (commitedTransactionsBySequenceNumber) {
for(Map.Entry<Long, TrxTransactionState> entry :
commitedTransactionsBySequenceNumber.entrySet()) {
transactionMap.put(entry.getValue().getTransactionId(), entry.getValue());
txnPersistBuilder.addSeqNoListSeq(entry.getKey());
txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId());
}
}
Map<Long, TrxTransactionState> transactionMap1 = new HashMap<Long, TrxTransactionState> (transactionMap);
for(TrxTransactionState ts : transactionMap1.values()) {
for(TrxTransactionState ts2 : ts.getTransactionsToCheck()) {
transactionMap.put(ts2.getTransactionId(), ts2);
}
}
txnPersistBuilder.setNextSeqId(nextSequenceId.get());
txnPersistBuilder.setOnlineEpoch(this.onlineEpoch);
ByteArrayOutputStream output = new ByteArrayOutputStream();
for(TrxTransactionState ts : transactionMap.values()) {
TransactionStateMsg.Builder tsBuilder = TransactionStateMsg.newBuilder();
tsBuilder.setTxId(ts.getTransactionId());
tsBuilder.setTxId(ts.getTransactionId());
tsBuilder.setStartSeqNum(ts.getStartSequenceNumber());
tsBuilder.setSeqNum(ts.getHLogStartSequenceId());
tsBuilder.setLogSeqId(ts.getLogSeqId());
tsBuilder.setReinstated(ts.isReinstated());
if(ts.getCommitProgress() == null)
tsBuilder.setCommitProgress(-1);
else
tsBuilder.setCommitProgress(ts.getCommitProgress().ordinal());
tsBuilder.setStatus(ts.getStatus().ordinal());
for (WriteAction wa : ts.getWriteOrdering()) {
if(wa.getPut() != null) {
tsBuilder.addPutOrDel(true);
tsBuilder.addPut(ProtobufUtil.toMutation(MutationType.PUT, wa.getPut()));
}
else {
tsBuilder.addPutOrDel(false);
tsBuilder.addDelete(ProtobufUtil.toMutation(MutationType.DELETE, wa.getDelete()));
}
}
tsBuilder.build().writeDelimitedTo(output);
}
byte [] firstByte = output.toByteArray();
w.append(new KeyValue(Bytes.toBytes(COMMITTED_TXNS_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
firstByte));
byte [] persistByte = txnPersistBuilder.build().toByteArray();
TransactionPersist persistMsg = TransactionPersist.parseFrom(persistByte);
long persistEpoch = persistMsg.getOnlineEpoch();
if (this.onlineEpoch == persistEpoch){
if(LOG.isDebugEnabled()) LOG.debug("flushToFS onlineEpoch read from persistMsg matches our own " + persistEpoch
+ " in region: " + m_regionDetails);
}
else{
if(LOG.isErrorEnabled()) LOG.error("flushToFS onlineEpoch read from persistMsg " + persistEpoch
+ " differs from our own " + this.onlineEpoch + " in region: " + m_regionDetails);
}
w.append(new KeyValue(Bytes.toBytes(TXNS_BY_ID_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
persistByte));
w.close();
}
public void readTxnInfo(Path flushPath) throws IOException {
readTxnInfo(flushPath, false);
}
public void readTxnInfo(Path flushPath, boolean setRetry) throws IOException {
if(LOG.isInfoEnabled()) LOG.info("readTxnInfo -- ENTRY, Path: " + flushPath.toString());
try {
HFile.Reader reader = HFile.createReader(fs, flushPath, new CacheConfig(config), config);
HFileScanner scanner = reader.getScanner(true, false);
scanner.seekTo();
//KeyValue firstVal = scanner.getKeyValue();
Cell firstVal = scanner.getKeyValue();
scanner.next();
//KeyValue persistKV = scanner.getKeyValue();
Cell persistKV = scanner.getKeyValue();
if(firstVal == null || persistKV == null) {
throw new IOException("Invalid values read from HFile in readTxnInfo");
}
Map<Long, TrxTransactionState> txnMap = new HashMap<Long, TrxTransactionState>();
Map<Long, List<Long>> txnsToCheckMap = new HashMap<Long, List<Long>>();
ByteArrayInputStream input = new ByteArrayInputStream(CellUtil.cloneValue(firstVal));
TransactionStateMsg tsm = TransactionStateMsg.parseDelimitedFrom(input);
while (tsm != null) {
TrxTransactionState ts = new TrxTransactionState(tsm.getTxId(),
tsm.getSeqNum(),
new AtomicLong(tsm.getLogSeqId()),
m_Region.getRegionInfo(),
m_Region.getTableDesc(),
tHLog,
#ifdef CDH5.7 APACHE1.2
configuredEarlyLogging, this.t_Region,
#else
configuredEarlyLogging,
#endif
-1); // do not worry about startId here
ts.setStartSequenceNumber(tsm.getStartSeqNum());
ts.setNeverReadOnly(true);
List<Boolean> putOrDel = tsm.getPutOrDelList();
List<MutationProto> puts = tsm.getPutList();
List<MutationProto> deletes = tsm.getDeleteList();
int putIndex = 0;
int deleteIndex = 0;
for (Boolean put : putOrDel) {
if(put) {
Put writePut = ProtobufUtil.toPut(puts.get(putIndex++));
#ifdef APACHE1.2 CDH5.7
if(rowIsInRange(regionInfo, writePut.getRow())) {
#else
if(m_Region.rowIsInRange(regionInfo, writePut.getRow())) {
#endif
ts.addWrite(writePut, this.useCommitIdInCells);
}
}
else {
Delete writeDelete = ProtobufUtil.toDelete(deletes.get(deleteIndex++));
#ifdef APACHE1.2 CDH5.7
if(rowIsInRange(regionInfo, writeDelete.getRow())) {
#else
if(m_Region.rowIsInRange(regionInfo, writeDelete.getRow())) {
#endif
ts.addDelete(writeDelete, this.useCommitIdInCells);
}
}
}
txnsToCheckMap.put(tsm.getTxId(), tsm.getTxnsToCheckList());
if(setRetry)
ts.setSplitRetry(true);
txnMap.put(ts.getTransactionId(), ts);
tsm = TransactionStateMsg.parseDelimitedFrom(input);
}
for(TrxTransactionState ts : txnMap.values()) {
for (Long txid : txnsToCheckMap.get(ts.getTransactionId())) {
TrxTransactionState mapTS = txnMap.get(txid);
if(mapTS != null)
ts.addTransactionToCheck(mapTS);
}
}
TransactionPersist txnPersistMsg = TransactionPersist.parseFrom(CellUtil.cloneValue(persistKV));
if(txnPersistMsg == null) {
throw new IOException("Invalid protobuf, message is null.");
}
for (Long txid : txnPersistMsg.getTxByIdList()) {
String key = getTransactionalUniqueId(txid);
TrxTransactionState ts = txnMap.get(txid);
if (ts != null) {
TrxTransactionState existingTs = transactionsById.get(key);
if(existingTs != null) {
for(WriteAction wa : existingTs.getWriteOrdering()) {
if(wa.getPut() != null) {
ts.addWrite(wa.getPut(), this.useCommitIdInCells);
}
else {
ts.addDelete(wa.getDelete(), this.useCommitIdInCells);
}
}
}
transactionsById.put(key, ts);
try {
transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(txid));
} catch (LeaseStillHeldException e) {
transactionLeases.renewLease(key);
}
}
else {
TrxTransactionState tsEntry = new TrxTransactionState(txid,
0,
new AtomicLong(0),
regionInfo,
m_Region.getTableDesc(),
tHLog,
#ifdef CDH5.7 APACHE1.2
configuredEarlyLogging, this.t_Region,
#else
configuredEarlyLogging,
#endif
-1); // do not worry about startId here
transactionsById.putIfAbsent(key, tsEntry);
}
}
for (int i = 0; i < txnPersistMsg.getSeqNoListSeqCount(); i++) {
TrxTransactionState ts = txnMap.get(txnPersistMsg.getSeqNoListTxn(i));
if (ts!=null)
commitedTransactionsBySequenceNumber.put(txnPersistMsg.getSeqNoListSeq(i), ts);
}
this.nextSequenceId = new AtomicLong(txnPersistMsg.getNextSeqId());
this.onlineEpoch = txnPersistMsg.getOnlineEpoch();
LOG.info("Setting onlineEpoch after split to " + this.onlineEpoch + " in region " + m_regionDetails);
} catch(IOException e) {
LOG.error("readTxnInfo exception: ", e);
}
if(LOG.isTraceEnabled()) LOG.trace("readTxnInfo -- EXIT");
}
public void setBlockAll(boolean value) {
blockAll.set(value);
// for safety
if (value == true) {
blockNewTrans.set(value);
blockNonPhase2.set(value);
}
}
public void setBlockNonPhase2(boolean value) {
blockNonPhase2.set(value);
// for safety
if (value == true)
blockNewTrans.set(value);
}
public void setNewTrans(boolean value) {
blockNewTrans.set(value);
}
public void setClosing(boolean value) {
closing.set(value);
}
// The following are methods for the Trafodion SQL coprocessors.
// compares two qualifiers as unsigned, lexicographically ordered byte strings
static private boolean isQualifierLessThanOrEqual(Cell nextKv, Cell currKv) {
int currLength = currKv.getQualifierLength();
int currOffset = currKv.getQualifierOffset();
byte [] currQual = currKv.getQualifierArray();
int nextLength = nextKv.getQualifierLength();
int nextOffset = nextKv.getQualifierOffset();
byte [] nextQual = nextKv.getQualifierArray();
int minLength = nextLength;
if (currLength < nextLength)
minLength = currLength;
for (int i = 0; i < minLength; i++) {
// ugh... have to do some gymnastics to make this an
// unsigned comparison
int nextQualI = nextQual[i+nextOffset];
if (nextQualI < 0)
nextQualI = nextQualI + 256;
int currQualI = currQual[i+currOffset];
if (currQualI < 0)
currQualI = currQualI + 256;
if (nextQualI < currQualI)
return true;
else if (nextQualI > currQualI)
return false;
// else equal, move on to next byte
}
// the first minLength bytes are the same; the shorter array
// is regarded as less
boolean rc = (nextLength <= currLength);
return rc;
}
// debugging function
private static String bytesToHex(byte[] in) {
final StringBuilder builder = new StringBuilder();
for(byte b : in) {
builder.append(String.format("%02x", b));
}
return builder.toString();
}
// Returns data needed to estimate the row count in the table.
// Entry counts and total size in bytes are extracted from HFiles.
// For non-aligned tables (numCols > 1), sampling is done in order
// to estimate how many entries make up a row on average.
@Override
public void trafEstimateRowCount(RpcController controller,
TrafEstimateRowCountRequest request,
RpcCallback<TrafEstimateRowCountResponse> done) {
TrafEstimateRowCountResponse response = null;
Throwable t = null;
int numCols = request.getNumCols();
// To estimate incidence of nulls, read the first 500 rows worth
// of KeyValues.
final int ROWS_TO_SAMPLE = 500;
int putKVsSampled = 0;
int nonPutKVsSampled = 0;
int missingKVsCount = 0;
int sampleRowCount = 0;
long totalEntries = 0; // KeyValues in all HFiles for table
long totalSizeBytes = 0; // Size of all HFiles for table
long estimatedTotalPuts = 0;
boolean more = true;
long estimatedRowCount = 0;
// Access the file system to go directly to the table's HFiles.
// Create a reader for the file to access the KV entry count and
// size in bytes stored in the trailer block.
// For aligned format tables, the number of rows equals the
// number of KeyValue entries. For non-aligned format, it's
// more complicated. There is a KeyValue entry for each
// column value, except the KeyValue may be missing because
// the column has a null value or because the column has a
// default value that has not been materialized.
// For non-aligned format tables, we sample some rows and
// count how many entries there are per row, so our caller
// can estimate the average number of missing values per row.
// Once our caller has that estimate, it can estimate the
// number of rows.
// We only do the sampling for non-aligned tables (numCols > 1),
// and we only do it on the first HFile of the first Region.
// The first Region is detected by having a null start key.
CacheConfig cacheConf = new CacheConfig(config);
byte[] startKey = regionInfo.getStartKey();
// Get the list of store files in column family '#1'. There might
// not be any. For example, a new Trafodion table might be entirely
// in memstore with nothing written out yet. Or we may be accessing
// a native HBase table which lacks the '#1' colum family.
List<String> storeFileList = null;
try {
byte[] familyName = "#1".getBytes();
byte[][] familyNames = { familyName };
storeFileList = m_Region.getStoreFileList(familyNames);
}
catch (IllegalArgumentException iae) {
// this gets thrown when the column family doesn't exist;
// we'll just use an empty list instead
storeFileList = new ArrayList<String>();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Trafodion estimate row count sees " + storeFileList.size() + " files.");
for (String sfn : storeFileList) {
LOG.debug("*** " + sfn);
}
if (startKey == null)
LOG.debug("startKey is null.");
else
LOG.debug("startKey.length is " + startKey.length + ", startKey is hex " + bytesToHex(startKey));
}
try {
FileSystem fileSystem = FileSystem.get(config);
for (String storeFileName : storeFileList) {
Path path = new Path(storeFileName);
// Make sure the file name conforms to HFile name pattern.
if (!StoreFileInfo.isHFile(path)) {
if (LOG.isDebugEnabled()) LOG.debug("Trafodion estimate row count file name " + path + " is not an HFile.");
continue;
}
HFile.Reader reader = HFile.createReader(fileSystem, path, cacheConf, config);
try {
totalEntries += reader.getEntries();
totalSizeBytes += reader.length();
if (ROWS_TO_SAMPLE > 0 &&
numCols > 1 && // only need to do this for non-aligned format
(startKey == null || startKey.length == 0) && // first region only
totalEntries == reader.getEntries()) { // first file only
// Trafodion column qualifiers are ordinal numbers, but are represented
// as varying length unsigned little-endian integers in lexicographical
// order. So, for example, in a table with 260 columns, the column
// qualifiers (if present) will be read in this order:
// 1 (x'01'), 257 (x'0101'), 2 (x'02'), 258 (x'0201'), 3 (x'03'),
// 259 (x'0301'), 4 (x'04'), 260 (x'0401'), 5 (x'05'), 6 (x'06'),
// 7 (x'07'), ...
// We have crossed the boundary to the next row if and only if the
// next qualifier read is less than or equal to the previous,
// compared unsigned, lexicographically.
HFileScanner scanner = reader.getScanner(false, false, false);
scanner.seekTo(); //position at beginning of first data block
// the next line should succeed, as we know the HFile is non-empty
Cell currKv = scanner.getKeyValue();
while ((more) && (currKv.getTypeByte() != KeyValue.Type.Put.getCode())) {
nonPutKVsSampled++;
more = scanner.next();
currKv = scanner.getKeyValue();
}
if (more) {
// now we have the first KeyValue in the HFile
int putKVsThisRow = 1;
putKVsSampled++;
sampleRowCount++; // we have at least one row
more = scanner.next();
while ((more) && (sampleRowCount <= ROWS_TO_SAMPLE)) {
Cell nextKv = scanner.getKeyValue();
if (nextKv.getTypeByte() == KeyValue.Type.Put.getCode()) {
if (isQualifierLessThanOrEqual(nextKv,currKv)) {
// we have crossed a row boundary
sampleRowCount++;
missingKVsCount += (numCols - putKVsThisRow);
putKVsThisRow = 1;
} else {
putKVsThisRow++;
}
currKv = nextKv;
putKVsSampled++;
} else {
nonPutKVsSampled++; // don't count these toward the number
}
more = scanner.next();
}
}
//scanner.close(); This API is not available until HBase 2.0
if (sampleRowCount > ROWS_TO_SAMPLE) {
// we read one KeyValue beyond the ROWS_TO_SAMPLE-eth row, so
// adjust counts for that
putKVsSampled--;
sampleRowCount--;
}
} // code for first file
} catch (IOException exp1) {
if (LOG.isErrorEnabled()) LOG.error("Trafodion estimate row count encountered exception ", exp1);
t = exp1;
} finally {
reader.close(false);
}
} // for
} catch (IOException exp2) {
if (LOG.isErrorEnabled()) LOG.error("Trafodion estimate row count encountered exception ", exp2);
if (t == null) // don't bury the root cause if an exception also occurred in the for loop
t = exp2;
}
// don't you just love Java + Maven? it takes 112 characters to specify the type of the next variable :-)
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafEstimateRowCountResponse.Builder
responseBuilder = TrafEstimateRowCountResponse.newBuilder();
responseBuilder.setTotalEntries(totalEntries).
setTotalSizeBytes(totalSizeBytes).
setPutKVsSampled(putKVsSampled).
setNonPutKVsSampled(nonPutKVsSampled).
setMissingKVsCount(missingKVsCount);
if (t != null)
{
responseBuilder.setHasException(true);
responseBuilder.setException(t.toString());
}
response = responseBuilder.build();
done.run(response);
}
}
//1}