| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.client; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import javax.transaction.xa.XAException; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| |
| import org.apache.qpid.dtx.XidImpl; |
| import org.apache.qpid.transport.DtxXaStatus; |
| import org.apache.qpid.transport.ExecutionErrorCode; |
| import org.apache.qpid.transport.Future; |
| import org.apache.qpid.transport.Option; |
| import org.apache.qpid.transport.RecoverResult; |
| import org.apache.qpid.transport.SessionException; |
| import org.apache.qpid.transport.XaResult; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This is an implementation of javax.njms.XAResource. |
| */ |
| public class XAResourceImpl implements AMQXAResource |
| { |
| /** |
| * this XAResourceImpl's logger |
| */ |
| private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class); |
| |
| /** |
| * Reference to the associated XASession |
| */ |
| private XASessionImpl _xaSession = null; |
| |
| /** |
| * The XID of this resource |
| */ |
| private Xid _xid; |
| |
| /** |
| * The time for this resource |
| */ |
| private int _timeout; |
| |
| //--- constructor |
| |
| private List<XAResource> _siblings = new ArrayList<XAResource>(); |
| |
| /** |
| * Create an XAResource associated with a XASession |
| * |
| * @param xaSession The session XAresource |
| */ |
| protected XAResourceImpl(XASessionImpl xaSession) |
| { |
| _xaSession = xaSession; |
| } |
| |
| //--- The XAResource |
| /** |
| * Commits the global transaction specified by xid. |
| * |
| * @param xid A global transaction identifier |
| * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid. |
| * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ, |
| * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO. |
| */ |
| public void commit(Xid xid, boolean b) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("commit tx branch with xid: {} ", xid); |
| } |
| Future<XaResult> future = |
| _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE); |
| |
| // now wait on the future for the result |
| XaResult result = null; |
| try |
| { |
| result = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr(e.getException().getErrorCode()); |
| } |
| finally |
| { |
| _xid = null; |
| } |
| checkStatus(result.getStatus()); |
| } |
| |
| /** |
| * Ends the work performed on behalf of a transaction branch. |
| * The resource manager disassociates the XA resource from the transaction branch specified |
| * and lets the transaction complete. |
| * <ul> |
| * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state. |
| * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified. |
| * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only |
| * <li> If TMSUCCESS is specified, the portion of work has completed successfully. |
| * </ul> |
| * |
| * @param xid A global transaction identifier that is the same as the identifier used previously in the start method |
| * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND. |
| * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR, |
| * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*. |
| */ |
| public void end(Xid xid, int flag) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("end tx branch with xid: {}", xid); |
| } |
| switch (flag) |
| { |
| case(XAResource.TMSUCCESS): |
| break; |
| case(XAResource.TMFAIL): |
| break; |
| case(XAResource.TMSUSPEND): |
| break; |
| default: |
| throw new XAException(XAException.XAER_INVAL); |
| } |
| _xaSession.flushAcknowledgments(); |
| Future<XaResult> future = _xaSession.getQpidSession() |
| .dtxEnd(convertXid(xid), |
| flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE, |
| flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE); |
| // now wait on the future for the result |
| XaResult result = null; |
| try |
| { |
| result = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr(e.getException().getErrorCode()); |
| } |
| |
| checkStatus(result.getStatus()); |
| |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings"); |
| } |
| |
| for(XAResource sibling: _siblings) |
| { |
| |
| sibling.end(xid, flag); |
| } |
| |
| _siblings.clear(); |
| } |
| |
| |
| /** |
| * Tells the resource manager to forget about a heuristically completed transaction branch. |
| * |
| * @param xid String(xid.getGlobalTransactionId() A global transaction identifier |
| * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, |
| * XAER_NOTA, XAER_INVAL, or XAER_PROTO. |
| */ |
| public void forget(Xid xid) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("forget tx branch with xid: {}", xid); |
| } |
| _xaSession.getQpidSession().dtxForget(convertXid(xid)); |
| try |
| { |
| _xaSession.getQpidSession().sync(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr(e.getException().getErrorCode()); |
| } |
| finally |
| { |
| _xid = null; |
| } |
| } |
| |
| |
| /** |
| * Obtains the current transaction timeout value set for this XAResource instance. |
| * If XAResource.setTransactionTimeout was not used prior to invoking this method, |
| * the return value is the default timeout i.e. 0; |
| * |
| * @return The transaction timeout value in seconds. |
| * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. |
| */ |
| public int getTransactionTimeout() throws XAException |
| { |
| return _timeout; |
| } |
| |
| /** |
| * This method is called to determine if the resource manager instance represented |
| * by the target object is the same as the resouce manager instance represented by |
| * the parameter xaResource. |
| * |
| * @param xaResource An XAResource object whose resource manager instance is to |
| * be compared with the resource manager instance of the target object |
| * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>. |
| * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. |
| */ |
| public boolean isSameRM(XAResource xaResource) throws XAException |
| { |
| if(this == xaResource) |
| { |
| return true; |
| } |
| |
| if(!(xaResource instanceof AMQXAResource)) |
| { |
| return false; |
| } |
| |
| String myUUID = getBrokerUUID(); |
| String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID(); |
| |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); |
| } |
| |
| boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); |
| |
| if(isSameRm) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. "); |
| } |
| _siblings.add(xaResource); |
| } |
| |
| return isSameRm; |
| |
| } |
| |
| /** |
| * Prepare for a transaction commit of the transaction specified in <code>Xid</code>. |
| * |
| * @param xid A global transaction identifier. |
| * @return A value indicating the resource manager's vote on the outcome of the transaction. |
| * The possible values are: XA_RDONLY or XA_OK. |
| * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA |
| */ |
| public int prepare(Xid xid) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("prepare {}", xid); |
| } |
| Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid)); |
| XaResult result = null; |
| try |
| { |
| result = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr(e.getException().getErrorCode()); |
| } |
| DtxXaStatus status = result.getStatus(); |
| int outcome = XAResource.XA_OK; |
| switch (status) |
| { |
| case XA_OK: |
| break; |
| case XA_RDONLY: |
| outcome = XAResource.XA_RDONLY; |
| break; |
| default: |
| checkStatus(status); |
| } |
| return outcome; |
| } |
| |
| /** |
| * Obtains a list of prepared transaction branches. |
| * <p> |
| * The transaction manager calls this method during recovery to obtain the list of transaction branches |
| * that are currently in prepared or heuristically completed states. |
| * |
| * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. |
| * TMNOFLAGS must be used when no other flags are set in the parameter. |
| * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically |
| * completed state. |
| * @throws XAException An error has occurred. Possible value is XAER_INVAL. |
| */ |
| public Xid[] recover(int flag) throws XAException |
| { |
| // the flag is ignored |
| Future<RecoverResult> future = _xaSession.getQpidSession().dtxRecover(); |
| RecoverResult res = null; |
| try |
| { |
| res = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr( e.getException().getErrorCode()); |
| } |
| Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0]; |
| if(result.length != 0) |
| { |
| int i = 0; |
| for (Object obj : res.getInDoubt()) |
| { |
| org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; |
| result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); |
| i++; |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Informs the resource manager to roll back work done on behalf of a transaction branch |
| * |
| * @param xid A global transaction identifier. |
| * @throws XAException An error has occurred. |
| */ |
| public void rollback(Xid xid) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("rollback tx branch with xid: {}", xid); |
| } |
| |
| Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid)); |
| // now wait on the future for the result |
| XaResult result = null; |
| try |
| { |
| result = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr( e.getException().getErrorCode()); |
| } |
| finally |
| { |
| _xid = null; |
| } |
| checkStatus(result.getStatus()); |
| } |
| |
| /** |
| * Sets the current transaction timeout value for this XAResource instance. |
| * Once set, this timeout value is effective until setTransactionTimeout is |
| * invoked again with a different value. |
| * To reset the timeout value to the default value used by the resource manager, set the value to zero. |
| * |
| * @param timeout The transaction timeout value in seconds. |
| * @return true if transaction timeout value is set successfully; otherwise false. |
| * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL. |
| */ |
| public boolean setTransactionTimeout(int timeout) throws XAException |
| { |
| _timeout = timeout; |
| if (timeout != _timeout && _xid != null) |
| { |
| setDtxTimeout(_timeout); |
| } |
| return true; |
| } |
| |
| private void setDtxTimeout(int timeout) throws XAException |
| { |
| _xaSession.getQpidSession() |
| .dtxSetTimeout(XidImpl.convert(_xid), timeout); |
| } |
| |
| /** |
| * Starts work on behalf of a transaction branch specified in xid. |
| * <ul> |
| * <li> If TMJOIN is specified, an exception is thrown as it is not supported |
| * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid. |
| * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the |
| * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code. |
| * </ul> |
| * |
| * @param xid A global transaction identifier to be associated with the resource |
| * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME |
| * @throws XAException An error has occurred. Possible exceptions |
| * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO. |
| */ |
| public void start(Xid xid, int flag) throws XAException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("start tx branch with xid: {}", xid); |
| } |
| switch (flag) |
| { |
| case(XAResource.TMNOFLAGS): |
| break; |
| case(XAResource.TMJOIN): |
| break; |
| case(XAResource.TMRESUME): |
| break; |
| default: |
| throw new XAException(XAException.XAER_INVAL); |
| } |
| Future<XaResult> future = _xaSession.getQpidSession() |
| .dtxStart(convertXid(xid), |
| flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE, |
| flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE); |
| // now wait on the future for the result |
| XaResult result = null; |
| try |
| { |
| result = future.get(); |
| } |
| catch (SessionException e) |
| { |
| // we need to restore the qpid session that has been closed |
| _xaSession.createSession(); |
| convertExecutionErrorToXAErr(e.getException().getErrorCode()); |
| // TODO: The amqp spec does not allow to make the difference |
| // between an already known XID and a wrong arguments (join and resume are set) |
| // TODO: make sure amqp addresses that |
| } |
| checkStatus(result.getStatus()); |
| _xid = xid; |
| if (_timeout > 0) |
| { |
| setDtxTimeout(_timeout); |
| } |
| |
| for(XAResource sibling: _siblings) |
| { |
| sibling.start(xid, flag); |
| } |
| } |
| |
| /** |
| * Is this resource currently enlisted in a transaction? |
| * |
| * @return true if the resource is associated with a transaction, false otherwise. |
| */ |
| public boolean isEnlisted() |
| { |
| return (_xid != null) ; |
| } |
| |
| //------------------------------------------------------------------------ |
| // Private methods |
| //------------------------------------------------------------------------ |
| |
| /** |
| * Check xa method outcome and, when required, convert the status into the corresponding xa exception |
| * @param status method status code |
| * @throws XAException corresponding XA Exception when required |
| */ |
| private void checkStatus(DtxXaStatus status) throws XAException |
| { |
| switch (status) |
| { |
| case XA_OK: |
| // Do nothing this ok |
| break; |
| case XA_RBROLLBACK: |
| // The tx has been rolled back for an unspecified reason. |
| throw new XAException(XAException.XA_RBROLLBACK); |
| case XA_RBTIMEOUT: |
| // The transaction branch took too long. |
| throw new XAException(XAException.XA_RBTIMEOUT); |
| case XA_HEURHAZ: |
| // The transaction branch may have been heuristically completed. |
| throw new XAException(XAException.XA_HEURHAZ); |
| case XA_HEURCOM: |
| // The transaction branch has been heuristically committed. |
| throw new XAException(XAException.XA_HEURCOM); |
| case XA_HEURRB: |
| // The transaction branch has been heuristically rolled back. |
| throw new XAException(XAException.XA_HEURRB); |
| case XA_HEURMIX: |
| // The transaction branch has been heuristically committed and rolled back. |
| throw new XAException(XAException.XA_HEURMIX); |
| case XA_RDONLY: |
| // The transaction branch was read-only and has been committed. |
| throw new XAException(XAException.XA_RDONLY); |
| default: |
| // this should not happen |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("got unexpected status value: {}", status); |
| } |
| //A resource manager error has occured in the transaction branch. |
| throw new XAException(XAException.XAER_RMERR); |
| } |
| } |
| |
| /** |
| * Convert execution error to xa exception. |
| * @param error the execution error code |
| * @throws XAException |
| */ |
| private void convertExecutionErrorToXAErr(ExecutionErrorCode error) throws XAException |
| { |
| switch (error) |
| { |
| case NOT_ALLOWED: |
| // The XID already exists. |
| throw new XAException(XAException.XAER_DUPID); |
| case NOT_FOUND: |
| // The XID is not valid. |
| try |
| { |
| throw new XAException(XAException.XAER_NOTA); |
| } |
| catch (XAException e) |
| { |
| _logger.error(e.getMessage(), e); |
| throw e; |
| } |
| case ILLEGAL_STATE: |
| // Routine was invoked in an inproper context. |
| throw new XAException(XAException.XAER_PROTO); |
| case NOT_IMPLEMENTED: |
| // the command is not implemented |
| throw new XAException(XAException.XAER_RMERR); |
| case COMMAND_INVALID: |
| // Invalid call |
| throw new XAException(XAException.XAER_INVAL); |
| default: |
| // this should not happen |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Got unexpected error: " + error); |
| } |
| //A resource manager error has occured in the transaction branch. |
| throw new XAException(XAException.XAER_RMERR); |
| } |
| } |
| |
| /** |
| * convert a generic xid into qpid format |
| * @param xid xid to be converted |
| * @return the qpid formated xid |
| * @throws XAException when xid is null |
| */ |
| private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException |
| { |
| if (xid == null) |
| { |
| // Invalid arguments were given. |
| throw new XAException(XAException.XAER_INVAL); |
| } |
| return XidImpl.convert(xid); |
| } |
| |
| public String getBrokerUUID() |
| { |
| return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); |
| } |
| |
| public List<XAResource> getSiblings() |
| { |
| return Collections.unmodifiableList(_siblings); |
| } |
| } |