| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| |
| import javax.jms.JMSException; |
| import javax.jms.TransactionInProgressException; |
| import javax.jms.TransactionRolledBackException; |
| import javax.transaction.xa.XAException; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| |
| import org.apache.activemq.command.ConnectionId; |
| import org.apache.activemq.command.DataArrayResponse; |
| import org.apache.activemq.command.DataStructure; |
| import org.apache.activemq.command.IntegerResponse; |
| import org.apache.activemq.command.LocalTransactionId; |
| import org.apache.activemq.command.TransactionId; |
| import org.apache.activemq.command.TransactionInfo; |
| import org.apache.activemq.command.XATransactionId; |
| import org.apache.activemq.transaction.Synchronization; |
| import org.apache.activemq.util.JMSExceptionSupport; |
| import org.apache.activemq.util.LongSequenceGenerator; |
| import org.apache.activemq.util.XASupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A TransactionContext provides the means to control a JMS transaction. It |
| * provides a local transaction interface and also an XAResource interface. <p/> |
| * An application server controls the transactional assignment of an XASession |
| * by obtaining its XAResource. It uses the XAResource to assign the session to |
| * a transaction, prepare and commit work on the transaction, and so on. <p/> An |
| * XAResource provides some fairly sophisticated facilities for interleaving |
| * work on multiple transactions, recovering a list of transactions in progress, |
| * and so on. A JTA aware JMS provider must fully implement this functionality. |
| * This could be done by using the services of a database that supports XA, or a |
| * JMS provider may choose to implement this functionality from scratch. <p/> |
| * |
| * |
| * @see javax.jms.Session |
| * @see javax.jms.QueueSession |
| * @see javax.jms.TopicSession |
| * @see javax.jms.XASession |
| */ |
| public class TransactionContext implements XAResource { |
| |
| public static final String xaErrorCodeMarker = "xaErrorCode:"; |
| private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); |
| |
| // XATransactionId -> ArrayList of TransactionContext objects |
| private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = |
| new HashMap<TransactionId, List<TransactionContext>>(); |
| |
| private ActiveMQConnection connection; |
| private final LongSequenceGenerator localTransactionIdGenerator; |
| private List<Synchronization> synchronizations; |
| |
| // To track XA transactions. |
| private Xid associatedXid; |
| private TransactionId transactionId; |
| private LocalTransactionEventListener localTransactionEventListener; |
| private int beforeEndIndex; |
| private volatile boolean rollbackOnly; |
| |
| // for RAR recovery |
| public TransactionContext() { |
| localTransactionIdGenerator = null; |
| } |
| |
| public TransactionContext(ActiveMQConnection connection) { |
| this.connection = connection; |
| this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); |
| } |
| |
| public boolean isInXATransaction() { |
| if (transactionId != null && transactionId.isXATransaction()) { |
| return true; |
| } else { |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { |
| if (transactions.contains(this)) { |
| return true; |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| public void setRollbackOnly(boolean val) { |
| rollbackOnly = val; |
| } |
| |
| public boolean isRollbackOnly() { |
| return rollbackOnly; |
| } |
| |
| public boolean isInLocalTransaction() { |
| return transactionId != null && transactionId.isLocalTransaction(); |
| } |
| |
| public boolean isInTransaction() { |
| return transactionId != null; |
| } |
| |
| /** |
| * @return Returns the localTransactionEventListener. |
| */ |
| public LocalTransactionEventListener getLocalTransactionEventListener() { |
| return localTransactionEventListener; |
| } |
| |
| /** |
| * Used by the resource adapter to listen to transaction events. |
| * |
| * @param localTransactionEventListener The localTransactionEventListener to |
| * set. |
| */ |
| public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { |
| this.localTransactionEventListener = localTransactionEventListener; |
| } |
| |
| // /////////////////////////////////////////////////////////// |
| // |
| // Methods that work with the Synchronization objects registered with |
| // the transaction. |
| // |
| // /////////////////////////////////////////////////////////// |
| |
| public void addSynchronization(Synchronization s) { |
| if (synchronizations == null) { |
| synchronizations = new ArrayList<Synchronization>(10); |
| } |
| synchronizations.add(s); |
| } |
| |
| private void afterRollback() throws JMSException { |
| if (synchronizations == null) { |
| return; |
| } |
| |
| Throwable firstException = null; |
| int size = synchronizations.size(); |
| for (int i = 0; i < size; i++) { |
| try { |
| synchronizations.get(i).afterRollback(); |
| } catch (Throwable t) { |
| LOG.debug("Exception from afterRollback on {}", synchronizations.get(i), t); |
| if (firstException == null) { |
| firstException = t; |
| } |
| } |
| } |
| synchronizations = null; |
| if (firstException != null) { |
| throw JMSExceptionSupport.create(firstException); |
| } |
| } |
| |
| private void afterCommit() throws JMSException { |
| if (synchronizations == null) { |
| return; |
| } |
| |
| Throwable firstException = null; |
| int size = synchronizations.size(); |
| for (int i = 0; i < size; i++) { |
| try { |
| synchronizations.get(i).afterCommit(); |
| } catch (Throwable t) { |
| LOG.debug("Exception from afterCommit on {}", synchronizations.get(i), t); |
| if (firstException == null) { |
| firstException = t; |
| } |
| } |
| } |
| synchronizations = null; |
| if (firstException != null) { |
| throw JMSExceptionSupport.create(firstException); |
| } |
| } |
| |
| private void beforeEnd() throws JMSException { |
| if (synchronizations == null) { |
| return; |
| } |
| |
| int size = synchronizations.size(); |
| try { |
| for (;beforeEndIndex < size;) { |
| synchronizations.get(beforeEndIndex++).beforeEnd(); |
| } |
| } catch (JMSException e) { |
| throw e; |
| } catch (Throwable e) { |
| throw JMSExceptionSupport.create(e); |
| } |
| } |
| |
| public TransactionId getTransactionId() { |
| return transactionId; |
| } |
| |
| // /////////////////////////////////////////////////////////// |
| // |
| // Local transaction interface. |
| // |
| // /////////////////////////////////////////////////////////// |
| |
| /** |
| * Start a local transaction. |
| * @throws javax.jms.JMSException on internal error |
| */ |
| public void begin() throws JMSException { |
| |
| if (isInXATransaction()) { |
| throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); |
| } |
| |
| if (transactionId == null) { |
| synchronizations = null; |
| beforeEndIndex = 0; |
| setRollbackOnly(false); |
| this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); |
| TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); |
| this.connection.ensureConnectionInfoSent(); |
| this.connection.asyncSendPacket(info); |
| |
| // Notify the listener that the tx was started. |
| if (localTransactionEventListener != null) { |
| localTransactionEventListener.beginEvent(); |
| } |
| |
| LOG.debug("Begin:{}", transactionId); |
| } |
| } |
| |
| /** |
| * Rolls back any work done in this transaction and releases any locks |
| * currently held. |
| * |
| * @throws JMSException if the JMS provider fails to roll back the |
| * transaction due to some internal error. |
| * @throws javax.jms.IllegalStateException if the method is not called by a |
| * transacted session. |
| */ |
| public void rollback() throws JMSException { |
| if (isInXATransaction()) { |
| throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); |
| } |
| |
| try { |
| beforeEnd(); |
| } catch (TransactionRolledBackException canOcurrOnFailover) { |
| LOG.warn("rollback processing error", canOcurrOnFailover); |
| } |
| if (transactionId != null) { |
| LOG.debug("Rollback: {} syncCount: {}", |
| transactionId, (synchronizations != null ? synchronizations.size() : 0)); |
| |
| TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); |
| this.transactionId = null; |
| //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 |
| this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); |
| // Notify the listener that the tx was rolled back |
| if (localTransactionEventListener != null) { |
| localTransactionEventListener.rollbackEvent(); |
| } |
| } |
| |
| afterRollback(); |
| } |
| |
| /** |
| * Commits all work done in this transaction and releases any locks |
| * currently held. |
| * |
| * @throws JMSException if the JMS provider fails to commit the transaction |
| * due to some internal error. |
| * @throws javax.jms.IllegalStateException if the method is not called by a |
| * transacted session. |
| */ |
| public void commit() throws JMSException { |
| if (isInXATransaction()) { |
| throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); |
| } |
| |
| try { |
| beforeEnd(); |
| } catch (JMSException e) { |
| rollback(); |
| throw e; |
| } |
| |
| if (transactionId != null && rollbackOnly) { |
| final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; |
| try { |
| rollback(); |
| } finally { |
| LOG.warn(message); |
| throw new TransactionRolledBackException(message); |
| } |
| } |
| |
| // Only send commit if the transaction was started. |
| if (transactionId != null) { |
| LOG.debug("Commit: {} syncCount: {}", |
| transactionId, (synchronizations != null ? synchronizations.size() : 0)); |
| |
| TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); |
| this.transactionId = null; |
| // Notify the listener that the tx was committed back |
| try { |
| this.connection.syncSendPacket(info); |
| if (localTransactionEventListener != null) { |
| localTransactionEventListener.commitEvent(); |
| } |
| afterCommit(); |
| } catch (JMSException cause) { |
| LOG.info("commit failed for transaction {}", info.getTransactionId(), cause); |
| if (localTransactionEventListener != null) { |
| localTransactionEventListener.rollbackEvent(); |
| } |
| afterRollback(); |
| throw cause; |
| } |
| |
| } |
| } |
| |
| // /////////////////////////////////////////////////////////// |
| // |
| // XAResource Implementation |
| // |
| // /////////////////////////////////////////////////////////// |
| /** |
| * Associates a transaction with the resource. |
| */ |
| @Override |
| public void start(Xid xid, int flags) throws XAException { |
| |
| LOG.debug("Start: {}, flags: {}", xid, XASupport.toString(flags)); |
| |
| if (isInLocalTransaction()) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| // Are we already associated? |
| if (associatedXid != null) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| |
| // if ((flags & TMJOIN) == TMJOIN) { |
| // TODO: verify that the server has seen the xid |
| // // } |
| // if ((flags & TMRESUME) == TMRESUME) { |
| // // TODO: verify that the xid was suspended. |
| // } |
| |
| // associate |
| synchronizations = null; |
| beforeEndIndex = 0; |
| setRollbackOnly(false); |
| setXid(xid); |
| } |
| |
| /** |
| * @return connectionId for connection |
| */ |
| private ConnectionId getConnectionId() { |
| return connection.getConnectionInfo().getConnectionId(); |
| } |
| |
| @Override |
| public void end(Xid xid, int flags) throws XAException { |
| |
| LOG.debug("End: {}, flags: {}", xid, XASupport.toString(flags)); |
| |
| if (isInLocalTransaction()) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| |
| if ((flags & (TMSUSPEND | TMFAIL)) != 0) { |
| // You can only suspend the associated xid. |
| if (!equals(associatedXid, xid)) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| invokeBeforeEnd(); |
| } else if ((flags & TMSUCCESS) == TMSUCCESS) { |
| // set to null if this is the current xid. |
| // otherwise this could be an asynchronous success call |
| if (equals(associatedXid, xid)) { |
| invokeBeforeEnd(); |
| } |
| } else { |
| throw new XAException(XAException.XAER_INVAL); |
| } |
| } |
| |
| private void invokeBeforeEnd() throws XAException { |
| boolean throwingException = false; |
| try { |
| beforeEnd(); |
| } catch (JMSException e) { |
| throwingException = true; |
| throw toXAException(e); |
| } finally { |
| try { |
| setXid(null); |
| } catch (XAException ignoreIfWillMask){ |
| if (!throwingException) { |
| throw ignoreIfWillMask; |
| } |
| } |
| } |
| } |
| |
| private boolean equals(Xid xid1, Xid xid2) { |
| if (xid1 == xid2) { |
| return true; |
| } |
| if (xid1 == null ^ xid2 == null) { |
| return false; |
| } |
| return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) |
| && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); |
| } |
| |
| @Override |
| public int prepare(Xid xid) throws XAException { |
| LOG.debug("Prepare: {}", xid); |
| |
| // We allow interleaving multiple transactions, so |
| // we don't limit prepare to the associated xid. |
| XATransactionId x; |
| // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been |
| // called first |
| if (xid == null || (equals(associatedXid, xid))) { |
| throw new XAException(XAException.XAER_PROTO); |
| } else { |
| // TODO: cache the known xids so we don't keep recreating this one?? |
| x = new XATransactionId(xid); |
| } |
| |
| if (rollbackOnly) { |
| LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); |
| throw new XAException(XAException.XA_RBINTEGRITY); |
| } |
| |
| try { |
| TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); |
| |
| // Find out if the server wants to commit or rollback. |
| IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); |
| if (XAResource.XA_RDONLY == response.getResult()) { |
| // transaction stops now, may be syncs that need a callback |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| // After commit may be expensive and can deadlock, do it outside global synch block |
| // No risk for concurrent updates as we own the list now |
| if (l != null) { |
| if(! l.isEmpty()) { |
| LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); |
| for (TransactionContext ctx : l) { |
| ctx.afterCommit(); |
| } |
| } |
| } |
| } |
| return response.getResult(); |
| |
| } catch (JMSException e) { |
| LOG.warn("prepare of: " + x + " failed with: " + e, e); |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| // After rollback may be expensive and can deadlock, do it outside global synch block |
| // No risk for concurrent updates as we own the list now |
| if (l != null) { |
| for (TransactionContext ctx : l) { |
| try { |
| ctx.afterRollback(); |
| } catch (Throwable ignored) { |
| LOG.debug("failed to firing afterRollback callbacks on prepare " + |
| "failure, txid: {}, context: {}", x, ctx, ignored); |
| } |
| } |
| } |
| throw toXAException(e); |
| } |
| } |
| |
| @Override |
| public void rollback(Xid xid) throws XAException { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Rollback: " + xid); |
| } |
| |
| // We allow interleaving multiple transactions, so |
| // we don't limit rollback to the associated xid. |
| XATransactionId x; |
| if (xid == null) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| if (equals(associatedXid, xid)) { |
| // I think this can happen even without an end(xid) call. Need to |
| // check spec. |
| x = (XATransactionId)transactionId; |
| } else { |
| x = new XATransactionId(xid); |
| } |
| |
| try { |
| this.connection.checkClosedOrFailed(); |
| this.connection.ensureConnectionInfoSent(); |
| |
| // Let the server know that the tx is rollback. |
| TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); |
| this.connection.syncSendPacket(info); |
| |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| // After rollback may be expensive and can deadlock, do it outside global synch block |
| // No risk for concurrent updates as we own the list now |
| if (l != null) { |
| for (TransactionContext ctx : l) { |
| try { |
| ctx.afterRollback(); |
| } catch (Exception ignored) { |
| LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored); |
| } |
| } |
| } |
| } catch (JMSException e) { |
| throw toXAException(e); |
| } |
| } |
| |
| // XAResource interface |
| @Override |
| public void commit(Xid xid, boolean onePhase) throws XAException { |
| |
| LOG.debug("Commit: {}, onePhase={}", xid, onePhase); |
| |
| // We allow interleaving multiple transactions, so |
| // we don't limit commit to the associated xid. |
| XATransactionId x; |
| if (xid == null || (equals(associatedXid, xid))) { |
| // should never happen, end(xid,TMSUCCESS) must have been previously |
| // called |
| throw new XAException(XAException.XAER_PROTO); |
| } else { |
| x = new XATransactionId(xid); |
| } |
| |
| if (rollbackOnly) { |
| LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); |
| throw new XAException(XAException.XA_RBINTEGRITY); |
| } |
| |
| try { |
| this.connection.checkClosedOrFailed(); |
| this.connection.ensureConnectionInfoSent(); |
| |
| // Notify the server that the tx was committed back |
| TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); |
| |
| this.connection.syncSendPacket(info); |
| |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| // After commit may be expensive and can deadlock, do it outside global synch block |
| // No risk for concurrent updates as we own the list now |
| if (l != null) { |
| for (TransactionContext ctx : l) { |
| try { |
| ctx.afterCommit(); |
| } catch (Exception ignored) { |
| LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); |
| } |
| } |
| } |
| |
| } catch (JMSException e) { |
| LOG.warn("commit of: " + x + " failed with: " + e, e); |
| if (onePhase) { |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| // After rollback may be expensive and can deadlock, do it outside global synch block |
| // No risk for concurrent updates as we own the list now |
| if (l != null) { |
| for (TransactionContext ctx : l) { |
| try { |
| ctx.afterRollback(); |
| } catch (Throwable ignored) { |
| LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); |
| } |
| } |
| } |
| } |
| throw toXAException(e); |
| } |
| } |
| |
| @Override |
| public void forget(Xid xid) throws XAException { |
| LOG.debug("Forget: {}", xid); |
| |
| // We allow interleaving multiple transactions, so |
| // we don't limit forget to the associated xid. |
| XATransactionId x; |
| if (xid == null) { |
| throw new XAException(XAException.XAER_PROTO); |
| } |
| if (equals(associatedXid, xid)) { |
| // TODO determine if this can happen... I think not. |
| x = (XATransactionId)transactionId; |
| } else { |
| x = new XATransactionId(xid); |
| } |
| |
| TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); |
| |
| try { |
| // Tell the server to forget the transaction. |
| this.connection.syncSendPacket(info); |
| } catch (JMSException e) { |
| throw toXAException(e); |
| } |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| ENDED_XA_TRANSACTION_CONTEXTS.remove(x); |
| } |
| } |
| |
| @Override |
| public boolean isSameRM(XAResource xaResource) throws XAException { |
| if (xaResource == null) { |
| return false; |
| } |
| if (!(xaResource instanceof TransactionContext)) { |
| return false; |
| } |
| TransactionContext xar = (TransactionContext)xaResource; |
| try { |
| return getResourceManagerId().equals(xar.getResourceManagerId()); |
| } catch (Throwable e) { |
| throw (XAException)new XAException("Could not get resource manager id.").initCause(e); |
| } |
| } |
| |
| @Override |
| public Xid[] recover(int flag) throws XAException { |
| LOG.debug("recover({})", flag); |
| XATransactionId[] answer; |
| |
| if (XAResource.TMNOFLAGS == flag) { |
| // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state |
| // allows looping scan to complete |
| answer = new XATransactionId[0]; |
| } else { |
| TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); |
| try { |
| this.connection.checkClosedOrFailed(); |
| this.connection.ensureConnectionInfoSent(); |
| |
| DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); |
| DataStructure[] data = receipt.getData(); |
| if (data instanceof XATransactionId[]) { |
| answer = (XATransactionId[]) data; |
| } else { |
| answer = new XATransactionId[data.length]; |
| System.arraycopy(data, 0, answer, 0, data.length); |
| } |
| } catch (JMSException e) { |
| throw toXAException(e); |
| } |
| } |
| LOG.debug("recover({})={}", flag, answer); |
| return answer; |
| } |
| |
| @Override |
| public int getTransactionTimeout() throws XAException { |
| return 0; |
| } |
| |
| @Override |
| public boolean setTransactionTimeout(int seconds) throws XAException { |
| return false; |
| } |
| |
| // /////////////////////////////////////////////////////////// |
| // |
| // Helper methods. |
| // |
| // /////////////////////////////////////////////////////////// |
| protected String getResourceManagerId() throws JMSException { |
| return this.connection.getResourceManagerId(); |
| } |
| |
| private void setXid(Xid xid) throws XAException { |
| |
| try { |
| this.connection.checkClosedOrFailed(); |
| this.connection.ensureConnectionInfoSent(); |
| } catch (JMSException e) { |
| disassociate(); |
| throw toXAException(e); |
| } |
| |
| if (xid != null) { |
| // associate |
| associatedXid = xid; |
| transactionId = new XATransactionId(xid); |
| |
| TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); |
| try { |
| this.connection.asyncSendPacket(info); |
| LOG.debug("{} started XA transaction {}", this, transactionId); |
| } catch (JMSException e) { |
| disassociate(); |
| throw toXAException(e); |
| } |
| |
| } else { |
| |
| if (transactionId != null) { |
| TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); |
| try { |
| this.connection.syncSendPacket(info); |
| LOG.debug("{} ended XA transaction {}", this, transactionId); |
| } catch (JMSException e) { |
| disassociate(); |
| throw toXAException(e); |
| } |
| |
| // Add our self to the list of contexts that are interested in |
| // post commit/rollback events. |
| List<TransactionContext> l; |
| synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { |
| l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); |
| if (l == null) { |
| l = new ArrayList<TransactionContext>(3); |
| ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); |
| } |
| if (!l.contains(this)) { |
| l.add(this); |
| } |
| } |
| } |
| |
| disassociate(); |
| } |
| } |
| |
| private void disassociate() { |
| // dis-associate |
| associatedXid = null; |
| transactionId = null; |
| } |
| |
| /** |
| * Converts a JMSException from the server to an XAException. if the |
| * JMSException contained a linked XAException that is returned instead. |
| * |
| * @param e JMSException to convert |
| * @return XAException wrapping original exception or its message |
| */ |
| public static XAException toXAException(JMSException e) { |
| if (e.getCause() != null && e.getCause() instanceof XAException) { |
| XAException original = (XAException)e.getCause(); |
| XAException xae = new XAException(original.getMessage()); |
| if (original != null) { |
| xae.errorCode = original.errorCode; |
| } |
| if (original != null && xae != null && xae.errorCode == XA_OK) { |
| // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable |
| xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); |
| } |
| if (original != null) { |
| xae.initCause(original); |
| } |
| return xae; |
| } |
| |
| XAException xae = new XAException(e.getMessage()); |
| xae.errorCode = XAException.XAER_RMFAIL; |
| xae.initCause(e); |
| return xae; |
| } |
| |
| private static int parseFromMessageOr(String message, int fallbackCode) { |
| final String marker = "xaErrorCode:"; |
| final int index = message.lastIndexOf(marker); |
| if (index > -1) { |
| try { |
| return Integer.parseInt(message.substring(index + marker.length())); |
| } catch (Exception ignored) {} |
| } |
| return fallbackCode; |
| } |
| |
| public ActiveMQConnection getConnection() { |
| return connection; |
| } |
| |
| // for RAR xa recovery where xaresource connection is per request |
| public ActiveMQConnection setConnection(ActiveMQConnection connection) { |
| ActiveMQConnection existing = this.connection; |
| this.connection = connection; |
| return existing; |
| } |
| |
| public void cleanup() { |
| associatedXid = null; |
| transactionId = null; |
| } |
| |
| @Override |
| public String toString() { |
| return "TransactionContext{" + |
| "transactionId=" + transactionId + |
| ",connection=" + connection + |
| '}'; |
| } |
| } |