blob: 9815a387db38f26cc75c91f5237c726cdb4b3d56 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.TransactionInProgressException;
import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.XASupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A TransactionContext provides the means to control a JMS transaction. It
* provides a local transaction interface and also an XAResource interface. <p/>
* An application server controls the transactional assignment of an XASession
* by obtaining its XAResource. It uses the XAResource to assign the session to
* a transaction, prepare and commit work on the transaction, and so on. <p/> An
* XAResource provides some fairly sophisticated facilities for interleaving
* work on multiple transactions, recovering a list of transactions in progress,
* and so on. A JTA aware JMS provider must fully implement this functionality.
* This could be done by using the services of a database that supports XA, or a
* JMS provider may choose to implement this functionality from scratch. <p/>
*
*
* @see javax.jms.Session
* @see javax.jms.QueueSession
* @see javax.jms.TopicSession
* @see javax.jms.XASession
*/
public class TransactionContext implements XAResource {
public static final String xaErrorCodeMarker = "xaErrorCode:";
private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
// XATransactionId -> ArrayList of TransactionContext objects
private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
new HashMap<TransactionId, List<TransactionContext>>();
private ActiveMQConnection connection;
private final LongSequenceGenerator localTransactionIdGenerator;
private List<Synchronization> synchronizations;
// To track XA transactions.
private Xid associatedXid;
private TransactionId transactionId;
private LocalTransactionEventListener localTransactionEventListener;
private int beforeEndIndex;
private volatile boolean rollbackOnly;
// for RAR recovery
public TransactionContext() {
localTransactionIdGenerator = null;
}
public TransactionContext(ActiveMQConnection connection) {
this.connection = connection;
this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
}
public boolean isInXATransaction() {
if (transactionId != null && transactionId.isXATransaction()) {
return true;
} else {
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
if (transactions.contains(this)) {
return true;
}
}
}
}
return false;
}
public void setRollbackOnly(boolean val) {
rollbackOnly = val;
}
public boolean isRollbackOnly() {
return rollbackOnly;
}
public boolean isInLocalTransaction() {
return transactionId != null && transactionId.isLocalTransaction();
}
public boolean isInTransaction() {
return transactionId != null;
}
/**
* @return Returns the localTransactionEventListener.
*/
public LocalTransactionEventListener getLocalTransactionEventListener() {
return localTransactionEventListener;
}
/**
* Used by the resource adapter to listen to transaction events.
*
* @param localTransactionEventListener The localTransactionEventListener to
* set.
*/
public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
this.localTransactionEventListener = localTransactionEventListener;
}
// ///////////////////////////////////////////////////////////
//
// Methods that work with the Synchronization objects registered with
// the transaction.
//
// ///////////////////////////////////////////////////////////
public void addSynchronization(Synchronization s) {
if (synchronizations == null) {
synchronizations = new ArrayList<Synchronization>(10);
}
synchronizations.add(s);
}
private void afterRollback() throws JMSException {
if (synchronizations == null) {
return;
}
Throwable firstException = null;
int size = synchronizations.size();
for (int i = 0; i < size; i++) {
try {
synchronizations.get(i).afterRollback();
} catch (Throwable t) {
LOG.debug("Exception from afterRollback on {}", synchronizations.get(i), t);
if (firstException == null) {
firstException = t;
}
}
}
synchronizations = null;
if (firstException != null) {
throw JMSExceptionSupport.create(firstException);
}
}
private void afterCommit() throws JMSException {
if (synchronizations == null) {
return;
}
Throwable firstException = null;
int size = synchronizations.size();
for (int i = 0; i < size; i++) {
try {
synchronizations.get(i).afterCommit();
} catch (Throwable t) {
LOG.debug("Exception from afterCommit on {}", synchronizations.get(i), t);
if (firstException == null) {
firstException = t;
}
}
}
synchronizations = null;
if (firstException != null) {
throw JMSExceptionSupport.create(firstException);
}
}
private void beforeEnd() throws JMSException {
if (synchronizations == null) {
return;
}
int size = synchronizations.size();
try {
for (;beforeEndIndex < size;) {
synchronizations.get(beforeEndIndex++).beforeEnd();
}
} catch (JMSException e) {
throw e;
} catch (Throwable e) {
throw JMSExceptionSupport.create(e);
}
}
public TransactionId getTransactionId() {
return transactionId;
}
// ///////////////////////////////////////////////////////////
//
// Local transaction interface.
//
// ///////////////////////////////////////////////////////////
/**
* Start a local transaction.
* @throws javax.jms.JMSException on internal error
*/
public void begin() throws JMSException {
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress.");
}
if (transactionId == null) {
synchronizations = null;
beforeEndIndex = 0;
setRollbackOnly(false);
this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
this.connection.ensureConnectionInfoSent();
this.connection.asyncSendPacket(info);
// Notify the listener that the tx was started.
if (localTransactionEventListener != null) {
localTransactionEventListener.beginEvent();
}
LOG.debug("Begin:{}", transactionId);
}
}
/**
* Rolls back any work done in this transaction and releases any locks
* currently held.
*
* @throws JMSException if the JMS provider fails to roll back the
* transaction due to some internal error.
* @throws javax.jms.IllegalStateException if the method is not called by a
* transacted session.
*/
public void rollback() throws JMSException {
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
}
try {
beforeEnd();
} catch (TransactionRolledBackException canOcurrOnFailover) {
LOG.warn("rollback processing error", canOcurrOnFailover);
}
if (transactionId != null) {
LOG.debug("Rollback: {} syncCount: {}",
transactionId, (synchronizations != null ? synchronizations.size() : 0));
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;
//make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
// Notify the listener that the tx was rolled back
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
}
}
afterRollback();
}
/**
* Commits all work done in this transaction and releases any locks
* currently held.
*
* @throws JMSException if the JMS provider fails to commit the transaction
* due to some internal error.
* @throws javax.jms.IllegalStateException if the method is not called by a
* transacted session.
*/
public void commit() throws JMSException {
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
}
try {
beforeEnd();
} catch (JMSException e) {
rollback();
throw e;
}
if (transactionId != null && rollbackOnly) {
final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks";
try {
rollback();
} finally {
LOG.warn(message);
throw new TransactionRolledBackException(message);
}
}
// Only send commit if the transaction was started.
if (transactionId != null) {
LOG.debug("Commit: {} syncCount: {}",
transactionId, (synchronizations != null ? synchronizations.size() : 0));
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
this.transactionId = null;
// Notify the listener that the tx was committed back
try {
this.connection.syncSendPacket(info);
if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
afterCommit();
} catch (JMSException cause) {
LOG.info("commit failed for transaction {}", info.getTransactionId(), cause);
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
}
afterRollback();
throw cause;
}
}
}
// ///////////////////////////////////////////////////////////
//
// XAResource Implementation
//
// ///////////////////////////////////////////////////////////
/**
* Associates a transaction with the resource.
*/
@Override
public void start(Xid xid, int flags) throws XAException {
LOG.debug("Start: {}, flags: {}", xid, XASupport.toString(flags));
if (isInLocalTransaction()) {
throw new XAException(XAException.XAER_PROTO);
}
// Are we already associated?
if (associatedXid != null) {
throw new XAException(XAException.XAER_PROTO);
}
// if ((flags & TMJOIN) == TMJOIN) {
// TODO: verify that the server has seen the xid
// // }
// if ((flags & TMRESUME) == TMRESUME) {
// // TODO: verify that the xid was suspended.
// }
// associate
synchronizations = null;
beforeEndIndex = 0;
setRollbackOnly(false);
setXid(xid);
}
/**
* @return connectionId for connection
*/
private ConnectionId getConnectionId() {
return connection.getConnectionInfo().getConnectionId();
}
@Override
public void end(Xid xid, int flags) throws XAException {
LOG.debug("End: {}, flags: {}", xid, XASupport.toString(flags));
if (isInLocalTransaction()) {
throw new XAException(XAException.XAER_PROTO);
}
if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
// You can only suspend the associated xid.
if (!equals(associatedXid, xid)) {
throw new XAException(XAException.XAER_PROTO);
}
invokeBeforeEnd();
} else if ((flags & TMSUCCESS) == TMSUCCESS) {
// set to null if this is the current xid.
// otherwise this could be an asynchronous success call
if (equals(associatedXid, xid)) {
invokeBeforeEnd();
}
} else {
throw new XAException(XAException.XAER_INVAL);
}
}
private void invokeBeforeEnd() throws XAException {
boolean throwingException = false;
try {
beforeEnd();
} catch (JMSException e) {
throwingException = true;
throw toXAException(e);
} finally {
try {
setXid(null);
} catch (XAException ignoreIfWillMask){
if (!throwingException) {
throw ignoreIfWillMask;
}
}
}
}
private boolean equals(Xid xid1, Xid xid2) {
if (xid1 == xid2) {
return true;
}
if (xid1 == null ^ xid2 == null) {
return false;
}
return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
&& Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
}
@Override
public int prepare(Xid xid) throws XAException {
LOG.debug("Prepare: {}", xid);
// We allow interleaving multiple transactions, so
// we don't limit prepare to the associated xid.
XATransactionId x;
// THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
// called first
if (xid == null || (equals(associatedXid, xid))) {
throw new XAException(XAException.XAER_PROTO);
} else {
// TODO: cache the known xids so we don't keep recreating this one??
x = new XATransactionId(xid);
}
if (rollbackOnly) {
LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
throw new XAException(XAException.XA_RBINTEGRITY);
}
try {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
// Find out if the server wants to commit or rollback.
IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
// After commit may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
if(! l.isEmpty()) {
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
for (TransactionContext ctx : l) {
ctx.afterCommit();
}
}
}
}
return response.getResult();
} catch (JMSException e) {
LOG.warn("prepare of: " + x + " failed with: " + e, e);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
// After rollback may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
LOG.debug("failed to firing afterRollback callbacks on prepare " +
"failure, txid: {}, context: {}", x, ctx, ignored);
}
}
}
throw toXAException(e);
}
}
@Override
public void rollback(Xid xid) throws XAException {
if (LOG.isDebugEnabled()) {
LOG.debug("Rollback: " + xid);
}
// We allow interleaving multiple transactions, so
// we don't limit rollback to the associated xid.
XATransactionId x;
if (xid == null) {
throw new XAException(XAException.XAER_PROTO);
}
if (equals(associatedXid, xid)) {
// I think this can happen even without an end(xid) call. Need to
// check spec.
x = (XATransactionId)transactionId;
} else {
x = new XATransactionId(xid);
}
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
// Let the server know that the tx is rollback.
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
// After rollback may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored);
}
}
}
} catch (JMSException e) {
throw toXAException(e);
}
}
// XAResource interface
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
LOG.debug("Commit: {}, onePhase={}", xid, onePhase);
// We allow interleaving multiple transactions, so
// we don't limit commit to the associated xid.
XATransactionId x;
if (xid == null || (equals(associatedXid, xid))) {
// should never happen, end(xid,TMSUCCESS) must have been previously
// called
throw new XAException(XAException.XAER_PROTO);
} else {
x = new XATransactionId(xid);
}
if (rollbackOnly) {
LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
throw new XAException(XAException.XA_RBINTEGRITY);
}
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
// Notify the server that the tx was committed back
TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
// After commit may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
for (TransactionContext ctx : l) {
try {
ctx.afterCommit();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
}
}
}
} catch (JMSException e) {
LOG.warn("commit of: " + x + " failed with: " + e, e);
if (onePhase) {
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
// After rollback may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
}
}
}
}
throw toXAException(e);
}
}
@Override
public void forget(Xid xid) throws XAException {
LOG.debug("Forget: {}", xid);
// We allow interleaving multiple transactions, so
// we don't limit forget to the associated xid.
XATransactionId x;
if (xid == null) {
throw new XAException(XAException.XAER_PROTO);
}
if (equals(associatedXid, xid)) {
// TODO determine if this can happen... I think not.
x = (XATransactionId)transactionId;
} else {
x = new XATransactionId(xid);
}
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
try {
// Tell the server to forget the transaction.
this.connection.syncSendPacket(info);
} catch (JMSException e) {
throw toXAException(e);
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
}
@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
if (xaResource == null) {
return false;
}
if (!(xaResource instanceof TransactionContext)) {
return false;
}
TransactionContext xar = (TransactionContext)xaResource;
try {
return getResourceManagerId().equals(xar.getResourceManagerId());
} catch (Throwable e) {
throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
}
}
@Override
public Xid[] recover(int flag) throws XAException {
LOG.debug("recover({})", flag);
XATransactionId[] answer;
if (XAResource.TMNOFLAGS == flag) {
// signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state
// allows looping scan to complete
answer = new XATransactionId[0];
} else {
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
DataStructure[] data = receipt.getData();
if (data instanceof XATransactionId[]) {
answer = (XATransactionId[]) data;
} else {
answer = new XATransactionId[data.length];
System.arraycopy(data, 0, answer, 0, data.length);
}
} catch (JMSException e) {
throw toXAException(e);
}
}
LOG.debug("recover({})={}", flag, answer);
return answer;
}
@Override
public int getTransactionTimeout() throws XAException {
return 0;
}
@Override
public boolean setTransactionTimeout(int seconds) throws XAException {
return false;
}
// ///////////////////////////////////////////////////////////
//
// Helper methods.
//
// ///////////////////////////////////////////////////////////
protected String getResourceManagerId() throws JMSException {
return this.connection.getResourceManagerId();
}
private void setXid(Xid xid) throws XAException {
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
} catch (JMSException e) {
disassociate();
throw toXAException(e);
}
if (xid != null) {
// associate
associatedXid = xid;
transactionId = new XATransactionId(xid);
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
try {
this.connection.asyncSendPacket(info);
LOG.debug("{} started XA transaction {}", this, transactionId);
} catch (JMSException e) {
disassociate();
throw toXAException(e);
}
} else {
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
try {
this.connection.syncSendPacket(info);
LOG.debug("{} ended XA transaction {}", this, transactionId);
} catch (JMSException e) {
disassociate();
throw toXAException(e);
}
// Add our self to the list of contexts that are interested in
// post commit/rollback events.
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
if (l == null) {
l = new ArrayList<TransactionContext>(3);
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
}
if (!l.contains(this)) {
l.add(this);
}
}
}
disassociate();
}
}
private void disassociate() {
// dis-associate
associatedXid = null;
transactionId = null;
}
/**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.
*
* @param e JMSException to convert
* @return XAException wrapping original exception or its message
*/
public static XAException toXAException(JMSException e) {
if (e.getCause() != null && e.getCause() instanceof XAException) {
XAException original = (XAException)e.getCause();
XAException xae = new XAException(original.getMessage());
if (original != null) {
xae.errorCode = original.errorCode;
}
if (original != null && xae != null && xae.errorCode == XA_OK) {
// detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
}
if (original != null) {
xae.initCause(original);
}
return xae;
}
XAException xae = new XAException(e.getMessage());
xae.errorCode = XAException.XAER_RMFAIL;
xae.initCause(e);
return xae;
}
private static int parseFromMessageOr(String message, int fallbackCode) {
final String marker = "xaErrorCode:";
final int index = message.lastIndexOf(marker);
if (index > -1) {
try {
return Integer.parseInt(message.substring(index + marker.length()));
} catch (Exception ignored) {}
}
return fallbackCode;
}
public ActiveMQConnection getConnection() {
return connection;
}
// for RAR xa recovery where xaresource connection is per request
public ActiveMQConnection setConnection(ActiveMQConnection connection) {
ActiveMQConnection existing = this.connection;
this.connection = connection;
return existing;
}
public void cleanup() {
associatedXid = null;
transactionId = null;
}
@Override
public String toString() {
return "TransactionContext{" +
"transactionId=" + transactionId +
",connection=" + connection +
'}';
}
}