blob: 8b4e4fd5dffb207ef4d3f5db0f5ac44dc4cd7b7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import javax.annotation.Nullable;
/**
* Utility class that encapsulates the transaction life cycle over a given set of
* transaction-aware datasets. It is not thread-safe for concurrent execution.
*/
public class TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
private final Collection<TransactionAware> txAwares;
private final TransactionSystemClient txClient;
private Transaction currentTx;
public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) {
this(txClient, ImmutableList.copyOf(txAwares));
}
public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) {
// Use a set to avoid adding the same TransactionAware twice and to make removal faster.
// Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List).
this.txAwares = Sets.newLinkedHashSet(txAwares);
this.txClient = txClient;
}
/**
* Adds a new transaction-aware to participate in the transaction.
* @param txAware the new transaction-aware
*/
public boolean addTransactionAware(TransactionAware txAware) {
// If the txAware is newly added, call startTx as well if there is an active transaction
boolean added = txAwares.add(txAware);
if (added && currentTx != null) {
txAware.startTx(currentTx);
}
return added;
}
/**
* Removes a {@link TransactionAware} and withdraws from participation in the transaction.
* Withdrawal is only allowed if there is no active transaction.
*
* @param txAware the {@link TransactionAware} to be removed
* @return true if the given {@link TransactionAware} is removed; false otherwise.
* @throws IllegalStateException if there is an active transaction going on with this TransactionContext.
*/
public boolean removeTransactionAware(TransactionAware txAware) {
Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction.");
return txAwares.remove(txAware);
}
/**
* Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient},
* and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
* TransactionAware. If an exception is encountered, the transaction will be aborted and a
* {@code TransactionFailureException} wrapping the root cause will be thrown.
*
* @throws TransactionFailureException if an exception occurs starting the transaction with any registered
* TransactionAware
*/
public void start() throws TransactionFailureException {
currentTx = txClient.startShort();
startAllTxAwares();
}
/**
* Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient},
* and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
* TransactionAware. If an exception is encountered, the transaction will be aborted and a
* {@code TransactionFailureException} wrapping the root cause will be thrown.
*
* @param timeout the transaction timeout for the transaction
*
* @throws TransactionFailureException if an exception occurs starting the transaction with any registered
* TransactionAware
*/
public void start(int timeout) throws TransactionFailureException {
currentTx = txClient.startShort(timeout);
startAllTxAwares();
}
/**
* This is a helper for {@link #start()} and {@link #start(int)}.
*
* Passes the current transaction to {@link TransactionAware#startTx(Transaction)} for each registered
* TransactionAware. If an exception is encountered, the transaction will be aborted and a
* {@code TransactionFailureException} wrapping the root cause will be thrown.
*
* @throws TransactionFailureException if an exception occurs starting the transaction with any registered
* TransactionAware
*/
private void startAllTxAwares() throws TransactionFailureException {
for (TransactionAware txAware : txAwares) {
try {
txAware.startTx(currentTx);
} catch (Throwable e) {
String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
txAware.getTransactionAwareName(), currentTx.getTransactionId());
LOG.warn(message, e);
txClient.abort(currentTx);
currentTx = null;
throw new TransactionFailureException(message, e);
}
}
}
/**
* Commits the current transaction. This will: check for any conflicts, based on the change set aggregated from
* all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s;
* commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state.
*
* @throws TransactionConflictException if a conflict is detected with a recently committed transaction
* @throws TransactionFailureException if an error occurs while committing
*/
public void finish() throws TransactionFailureException {
Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started");
// each of these steps will abort and rollback the tx in case if errors, and throw an exception
checkForConflicts();
persist();
commit();
postCommit();
currentTx = null;
}
/**
* Aborts the given transaction, and rolls back all data set changes. If rollback fails,
* the transaction is invalidated. If an exception is caught during rollback, the exception
* is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have
* completed rollback.
*
* @throws TransactionFailureException for any exception that is encountered.
*/
public void abort() throws TransactionFailureException {
abort(null);
}
/**
* Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware}
* instances, and obtaining a new current write pointer for the transaction. By performing a checkpoint,
* the client can ensure that all previous writes were flushed and are visible. By default, the current write
* pointer for the transaction is also visible. The current write pointer can be excluded from read
* operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set
* to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created
* by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}.
*
* After the checkpoint operation is performed, the updated
* {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each
* registered {@code TransactionAware} instance.
*
* @throws TransactionFailureException if an error occurs while performing the checkpoint
*/
public void checkpoint() throws TransactionFailureException {
Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started");
persist();
try {
currentTx = txClient.checkpoint(currentTx);
// update the current transaction with all TransactionAwares
for (TransactionAware txAware : txAwares) {
txAware.updateTx(currentTx);
}
} catch (TransactionNotInProgressException e) {
String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
} catch (Throwable e) {
String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId());
LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
}
/**
* Returns the current transaction or null if no transaction is currently in progress.
*/
@Nullable
public Transaction getCurrentTransaction() {
return currentTx;
}
// CHECKSTYLE IGNORE "@throws" FOR 11 LINES
/**
* Aborts the given transaction, and rolls back all data set changes. If rollback fails,
* the transaction is invalidated. If an exception is caught during rollback, the exception
* is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have
* completed rollback. If an existing exception is passed in, that exception is thrown in either
* case, whether the rollback is successful or not. In other words, this method always throws the
* first exception that it encounters.
* @param cause the original exception that caused the abort
* @throws TransactionFailureException for any exception that is encountered.
*/
public void abort(TransactionFailureException cause) throws TransactionFailureException {
if (currentTx == null) {
// might be called by some generic exception handler even though already aborted/finished - we allow that
return;
}
try {
boolean success = true;
for (TransactionAware txAware : txAwares) {
try {
if (!txAware.rollbackTx()) {
success = false;
}
} catch (Throwable e) {
String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
txAware.getTransactionAwareName(), currentTx.getTransactionId());
LOG.warn(message, e);
if (cause == null) {
cause = new TransactionFailureException(message, e);
}
success = false;
}
}
if (success) {
txClient.abort(currentTx);
} else {
txClient.invalidate(currentTx.getTransactionId());
}
if (cause != null) {
throw cause;
}
} finally {
currentTx = null;
}
}
private void checkForConflicts() throws TransactionFailureException {
Collection<byte[]> changes = Lists.newArrayList();
for (TransactionAware txAware : txAwares) {
try {
changes.addAll(txAware.getTxChanges());
} catch (Throwable e) {
String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
txAware.getTransactionAwareName(), currentTx.getTransactionId());
LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
}
boolean canCommit = false;
try {
canCommit = txClient.canCommitOrThrow(currentTx, changes);
} catch (TransactionNotInProgressException | TransactionSizeException e) {
throw e;
// abort will throw that exception
} catch (Throwable e) {
String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
if (!canCommit) {
String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
abort(new TransactionConflictException(message));
// abort will throw
}
}
private void persist() throws TransactionFailureException {
for (TransactionAware txAware : txAwares) {
boolean success;
Throwable cause = null;
try {
success = txAware.commitTx();
} catch (Throwable e) {
success = false;
cause = e;
}
if (!success) {
String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
txAware.getTransactionAwareName(), currentTx.getTransactionId());
if (cause == null) {
LOG.warn(message);
} else {
LOG.warn(message, cause);
}
abort(new TransactionFailureException(message, cause));
// abort will throw that exception
}
}
}
private void commit() throws TransactionFailureException {
boolean commitSuccess = false;
try {
commitSuccess = txClient.commit(currentTx);
} catch (TransactionNotInProgressException e) {
String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
} catch (Throwable e) {
String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
if (!commitSuccess) {
String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
abort(new TransactionConflictException(message));
// abort will throw
}
}
private void postCommit() throws TransactionFailureException {
TransactionFailureException cause = null;
for (TransactionAware txAware : txAwares) {
try {
txAware.postTxCommit();
} catch (Throwable e) {
String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
txAware.getTransactionAwareName(), currentTx.getTransactionId());
LOG.warn(message, e);
cause = new TransactionFailureException(message, e);
}
}
if (cause != null) {
throw cause;
}
}
}