blob: b3c83b95b069064f774d0735eaaaec3735eb75fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.streaming;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* Streaming transaction to use most of the times. Will query the
* metastore to get the transaction ids and the writer ids and then
* will commit them.
*/
public class TransactionBatch extends AbstractStreamingTransaction {
private static final Logger LOG = LoggerFactory.getLogger(
TransactionBatch.class.getName());
private static final int DEFAULT_HEARTBEAT_INTERVAL = 60 * 1000;
protected Set<String> createdPartitions = null;
private String username;
private HiveStreamingConnection conn;
private ScheduledExecutorService scheduledExecutorService;
private String partNameForLock = null;
private LockRequest lockRequest = null;
// heartbeats can only be sent for open transactions.
// there is a race between committing/aborting a transaction and heartbeat.
// Example: If a heartbeat is sent for committed txn, exception will be thrown.
// Similarly if we don't send a heartbeat, metastore server might abort a txn
// for missed heartbeat right before commit txn call.
// This lock is used to mutex commit/abort and heartbeat calls
private final ReentrantLock transactionLock = new ReentrantLock();
// min txn id is incremented linearly within a transaction batch.
// keeping minTxnId atomic as it is shared with heartbeat thread
private final AtomicLong minTxnId;
// max txn id does not change for a transaction batch
private final long maxTxnId;
private String agentInfo;
private int numTxns;
/**
* Id of the table from the streaming connection.
*/
private final long tableId;
/**
* Tracks the state of each transaction.
*/
private HiveStreamingConnection.TxnState[] txnStatus;
/**
* ID of the last txn used by {@link #beginNextTransactionImpl()}.
*/
private long lastTxnUsed;
/**
* Represents a batch of transactions acquired from MetaStore.
*
* @param conn - hive streaming connection
* @throws StreamingException if failed to create new RecordUpdater for batch
*/
public TransactionBatch(HiveStreamingConnection conn) throws StreamingException {
boolean success = false;
try {
if (conn.isPartitionedTable() && !conn.isDynamicPartitioning()) {
List<FieldSchema> partKeys = conn.getTable().getPartitionKeys();
partNameForLock = Warehouse.makePartName(partKeys, conn.getStaticPartitionValues());
}
this.conn = conn;
this.username = conn.getUsername();
this.recordWriter = conn.getRecordWriter();
this.agentInfo = conn.getAgentInfo();
this.numTxns = conn.getTransactionBatchSize();
this.tableId = conn.getTable().getTTable().getId();
List<Long> txnIds = openTxnImpl(username, numTxns);
txnToWriteIds = allocateWriteIdsImpl(txnIds);
assert (txnToWriteIds.size() == numTxns);
txnStatus = new HiveStreamingConnection.TxnState[numTxns];
for (int i = 0; i < txnStatus.length; i++) {
assert (txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
txnStatus[i] = HiveStreamingConnection.TxnState.OPEN; //Open matches Metastore state
}
this.state = HiveStreamingConnection.TxnState.INACTIVE;
// initialize record writer with connection and write id info
recordWriter.init(conn, txnToWriteIds.get(0).getWriteId(),
txnToWriteIds.get(numTxns - 1).getWriteId(), conn.getStatementId());
this.minTxnId = new AtomicLong(txnIds.get(0));
this.maxTxnId = txnIds.get(txnIds.size() - 1);
setupHeartBeatThread();
success = true;
} catch (TException e) {
throw new StreamingException(conn.toString(), e);
} finally {
//clean up if above throws
markDead(success);
}
}
private static class HeartbeatRunnable implements Runnable {
private final HiveStreamingConnection conn;
private final AtomicLong minTxnId;
private final long maxTxnId;
private final ReentrantLock transactionLock;
private final AtomicBoolean isTxnClosed;
HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId,
final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
this.conn = conn;
this.minTxnId = minTxnId;
this.maxTxnId = maxTxnId;
this.transactionLock = transactionLock;
this.isTxnClosed = isTxnClosed;
}
@Override
public void run() {
transactionLock.lock();
try {
if (minTxnId.get() > 0) {
HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
LOG.error("Heartbeat failure: {}", resp.toString());
isTxnClosed.set(true);
} else {
LOG.info("Heartbeat sent for range: [{}-{}]", minTxnId.get(), maxTxnId);
}
}
} catch (TException e) {
LOG.warn("Failure to heartbeat for transaction range: [" + minTxnId.get() + "-" + maxTxnId + "]", e);
} finally {
transactionLock.unlock();
}
}
}
private void setupHeartBeatThread() {
// start heartbeat thread
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("HiveStreamingConnection-Heartbeat-Thread")
.build();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
long heartBeatInterval;
long initialDelay;
try {
// if HIVE_TXN_TIMEOUT is defined, heartbeat interval will be HIVE_TXN_TIMEOUT/2
heartBeatInterval = DbTxnManager.getHeartbeatInterval(conn.getConf());
} catch (LockException e) {
heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
}
// to introduce some randomness and to avoid hammering the metastore at the same time (same logic as DbTxnManager)
initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}",
heartBeatInterval, initialDelay, conn.getAgentInfo());
Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed);
this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit
.MILLISECONDS);
}
private List<Long> openTxnImpl(final String user, final int numTxns) throws TException {
return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
}
private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException {
return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.getDatabase(),
conn.getTable().getTableName());
}
@Override
public String toString() {
if (txnToWriteIds == null || txnToWriteIds.isEmpty()) {
return "{}";
}
StringBuilder sb = new StringBuilder(" TxnStatus[");
for (HiveStreamingConnection.TxnState state : txnStatus) {
//'state' should not be null - future proofing
sb.append(state == null ? "N" : state);
}
sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+ "/" + txnToWriteIds.get(0).getWriteId()
+ "..."
+ txnToWriteIds.get(txnToWriteIds.size() - 1).getTxnId()
+ "/" + txnToWriteIds.get(txnToWriteIds.size() - 1).getWriteId()
+ "] on connection = " + conn + "; " + sb;
}
public void beginNextTransaction() throws StreamingException {
checkIsClosed();
beginNextTransactionImpl();
}
private void beginNextTransactionImpl() throws StreamingException {
beginNextTransactionImpl("No more transactions available in" +
" next batch for connection: " + conn + " user: " + username);
lastTxnUsed = getCurrentTxnId();
lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo);
createdPartitions = Sets.newHashSet();
try {
LockResponse res = conn.getMSC().lock(lockRequest);
if (res.getState() != LockState.ACQUIRED) {
throw new TransactionError("Unable to acquire lock on " + conn);
}
} catch (TException e) {
throw new TransactionError("Unable to acquire lock on " + conn, e);
}
}
public void commit(Set<String> partitions, String key, String value)
throws StreamingException {
checkIsClosed();
boolean success = false;
try {
commitImpl(partitions, key, value);
success = true;
} finally {
markDead(success);
}
}
private void commitImpl(Set<String> partitions, String key, String value)
throws StreamingException {
try {
if ((key == null && value != null) || (key != null && value == null)) {
throw new StreamingException(String.format(
"If key is set, the value should be as well and vice versa,"
+ " key, value = %s, %s", key, value));
}
recordWriter.flush();
TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
if (conn.isDynamicPartitioning()) {
List<String> partNames = new ArrayList<>(recordWriter.getPartitions());
createdPartitions.addAll(partNames);
if (partitions != null) {
partNames.addAll(partitions);
}
if (!partNames.isEmpty()) {
conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(),
txnToWriteId.getWriteId(), conn.getDatabase(),
conn.getTable().getTableName(), partNames,
DataOperationType.INSERT);
}
}
// If it is the last transaction in the batch, then close the files and add write events.
// We need to close the writer as file checksum can't be obtained on the opened file.
if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
// Replication doesn't work if txn batch size > 1 and the last txn is aborted as write events
// are ignored by abort txn event causing data loss. However, if the last txn in the batch is
// committed, then data gets replicated making eventually consistent, but doesn't guarantee
// point-in-time consistency.
recordWriter.close();
// Add write notification events if it is enabled.
conn.addWriteNotificationEvents();
}
transactionLock.lock();
try {
if (key != null) {
conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(),
tableId, key, value);
} else {
conn.getMSC().commitTxn(txnToWriteId.getTxnId());
}
// increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction.
// the current transaction is going to committed or fail, so don't need heartbeat for current transaction.
if (currentTxnIndex + 1 < txnToWriteIds.size()) {
minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
} else {
// exhausted the batch, no longer have to heartbeat for current txn batch
minTxnId.set(-1);
}
} finally {
transactionLock.unlock();
}
state = HiveStreamingConnection.TxnState.COMMITTED;
txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.COMMITTED;
} catch (NoSuchTxnException e) {
throw new TransactionError("Invalid transaction id : "
+ getCurrentTxnId(), e);
} catch (TxnAbortedException e) {
throw new TransactionError("Aborted transaction "
+ "cannot be committed", e);
} catch (TException e) {
throw new TransactionError("Unable to commitTransaction transaction"
+ getCurrentTxnId(), e);
}
}
public void abort() throws StreamingException {
if (isTxnClosed.get()) {
/*
* isDead is only set internally by this class. {@link #markDead(boolean)} will abort all
* remaining txns, so make this no-op to make sure that a well-behaved client that calls abortTransaction()
* error doesn't get misleading errors
*/
return;
}
abort(false);
}
private void abort(final boolean abortAllRemaining) throws StreamingException {
abortImpl(abortAllRemaining);
}
private void abortImpl(boolean abortAllRemaining) throws StreamingException {
if (minTxnId == null) {
return;
}
transactionLock.lock();
try {
if (abortAllRemaining) {
// we are aborting all txns in the current batch, so no need to heartbeat
minTxnId.set(-1);
//when last txn finished (abortTransaction/commitTransaction) the currentTxnIndex is pointing at that txn
//so we need to start from next one, if any. Also if batch was created but
//fetchTransactionBatch() was never called, we want to start with first txn
int minOpenTxnIndex = Math.max(currentTxnIndex +
(state == HiveStreamingConnection.TxnState.ABORTED
|| state == HiveStreamingConnection.TxnState.COMMITTED
? 1 : 0), 0);
for (currentTxnIndex = minOpenTxnIndex;
currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
AbortTxnRequest abortTxnRequest = new AbortTxnRequest(txnToWriteIds.get(currentTxnIndex).getTxnId());
abortTxnRequest.setErrorCode(TxnErrorMsg.ABORT_ROLLBACK.getErrorCode());
conn.getMSC().rollbackTxn(abortTxnRequest);
txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
}
currentTxnIndex--; //since the loop left it == txnToWriteIds.size()
} else {
// we are aborting only the current transaction, so move the min range for heartbeat or disable heartbeat
// if the current txn is last in the batch.
if (currentTxnIndex + 1 < txnToWriteIds.size()) {
minTxnId.set(txnToWriteIds.get(currentTxnIndex + 1).getTxnId());
} else {
// exhausted the batch, no longer have to heartbeat
minTxnId.set(-1);
}
long currTxnId = getCurrentTxnId();
if (currTxnId > 0) {
AbortTxnRequest abortTxnRequest = new AbortTxnRequest(currTxnId);
abortTxnRequest.setErrorCode(TxnErrorMsg.ABORT_ROLLBACK.getErrorCode());
conn.getMSC().rollbackTxn(abortTxnRequest);
txnStatus[currentTxnIndex] = HiveStreamingConnection.TxnState.ABORTED;
}
}
state = HiveStreamingConnection.TxnState.ABORTED;
} catch (NoSuchTxnException e) {
throw new TransactionError("Unable to abort invalid transaction id : "
+ getCurrentTxnId(), e);
} catch (TException e) {
throw new TransactionError("Unable to abort transaction id : "
+ getCurrentTxnId(), e);
} finally {
transactionLock.unlock();
}
}
/**
* Close the TransactionBatch. This will abort any still open txns in this batch.
*
* @throws StreamingException - failure when closing transaction batch
*/
public void close() throws StreamingException {
if (isClosed()) {
return;
}
isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
try {
abort(true); //abort all remaining txns
} catch (Exception ex) {
LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
throw new StreamingException("Unable to abort", ex);
}
try {
closeImpl();
} catch (Exception ex) {
LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
throw new StreamingException("Unable to close", ex);
}
}
private void closeImpl() throws StreamingException {
state = HiveStreamingConnection.TxnState.INACTIVE;
recordWriter.close();
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
}
private static LockRequest createLockRequest(final HiveStreamingConnection connection,
String partNameForLock, String user, long txnId, String agentInfo) {
LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
requestBuilder.setUser(user);
requestBuilder.setTransactionId(txnId);
LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
.setDbName(connection.getDatabase())
.setTableName(connection.getTable().getTableName())
.setSharedRead()
.setOperationType(DataOperationType.INSERT);
if (connection.isDynamicPartitioning()) {
lockCompBuilder.setIsDynamicPartitionWrite(true);
}
if (partNameForLock != null && !partNameForLock.isEmpty()) {
lockCompBuilder.setPartitionName(partNameForLock);
}
requestBuilder.addLockComponent(lockCompBuilder.build());
return requestBuilder.build();
}
/**
* @return the list of created partitions.
*/
@Override
public Set<String> getPartitions() {
return createdPartitions;
}
}