| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.core.transaction.impl; |
| |
| import javax.transaction.xa.Xid; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; |
| import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; |
| import org.apache.activemq.artemis.api.core.ActiveMQTransactionTimeoutException; |
| import org.apache.activemq.artemis.core.io.IOCallback; |
| import org.apache.activemq.artemis.core.persistence.StorageManager; |
| import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; |
| import org.apache.activemq.artemis.core.server.Queue; |
| import org.apache.activemq.artemis.core.server.impl.AckReason; |
| import org.apache.activemq.artemis.core.server.impl.RefsOperation; |
| import org.apache.activemq.artemis.core.transaction.Transaction; |
| import org.apache.activemq.artemis.core.transaction.TransactionOperation; |
| import org.apache.activemq.artemis.utils.ArtemisCloseable; |
| import org.jboss.logging.Logger; |
| |
| public class TransactionImpl implements Transaction { |
| |
| private static final Logger logger = Logger.getLogger(TransactionImpl.class); |
| |
| private List<TransactionOperation> operations; |
| |
| private List<TransactionOperation> storeOperations; |
| |
| private static final int INITIAL_NUM_PROPERTIES = 10; |
| |
| private Object[] properties = null; |
| |
| protected final StorageManager storageManager; |
| |
| private final Xid xid; |
| |
| private final long id; |
| |
| private volatile State state = State.ACTIVE; |
| |
| private ActiveMQException exception; |
| |
| private final Object timeoutLock = new Object(); |
| |
| private final long createTime; |
| |
| private volatile boolean containsPersistent; |
| |
| private int timeoutSeconds = -1; |
| |
| private Object protocolData; |
| |
| private void ensurePropertiesCapacity(int capacity) { |
| if (properties != null && properties.length >= capacity) { |
| return; |
| } |
| createOrEnlargeProperties(capacity); |
| } |
| |
| private void createOrEnlargeProperties(int capacity) { |
| if (properties == null) { |
| properties = new Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)]; |
| } else { |
| assert properties.length < capacity; |
| properties = Arrays.copyOf(properties, capacity); |
| } |
| } |
| |
| @Override |
| public Object getProtocolData() { |
| return protocolData; |
| } |
| |
| @Override |
| public void setProtocolData(Object protocolData) { |
| this.protocolData = protocolData; |
| } |
| |
| public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) { |
| this(storageManager.generateID(), null, storageManager, timeoutSeconds); |
| } |
| |
| public TransactionImpl(final StorageManager storageManager) { |
| this(storageManager.generateID(), null, storageManager,-1); |
| } |
| |
| |
| public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { |
| this(storageManager.generateID(), xid, storageManager, timeoutSeconds); |
| } |
| |
| public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) { |
| this(id, xid, storageManager, -1); |
| } |
| |
| private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { |
| this.storageManager = storageManager; |
| |
| this.xid = xid; |
| |
| this.id = id; |
| |
| this.createTime = System.currentTimeMillis(); |
| |
| this.timeoutSeconds = timeoutSeconds; |
| } |
| |
| // Transaction implementation |
| // ----------------------------------------------------------- |
| |
| @Override |
| public boolean isEffective() { |
| return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK; |
| } |
| |
| @Override |
| public void setContainsPersistent() { |
| containsPersistent = true; |
| } |
| |
| @Override |
| public boolean isContainsPersistent() { |
| return containsPersistent; |
| } |
| |
| @Override |
| public void setTimeout(final int timeout) { |
| this.timeoutSeconds = timeout; |
| } |
| |
| @Override |
| public RefsOperation createRefsOperation(Queue queue, AckReason reason) { |
| return new RefsOperation(queue, reason, storageManager); |
| } |
| |
| @Override |
| public long getID() { |
| return id; |
| } |
| |
| @Override |
| public long getCreateTime() { |
| return createTime; |
| } |
| |
| @Override |
| public boolean hasTimedOut(final long currentTime, final int defaultTimeout) { |
| synchronized (timeoutLock) { |
| boolean timedout; |
| if (timeoutSeconds == -1) { |
| timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) defaultTimeout * 1000; |
| } else { |
| timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + (long) timeoutSeconds * 1000; |
| } |
| |
| if (timedout) { |
| markAsRollbackOnly(new ActiveMQTransactionTimeoutException()); |
| } |
| |
| return timedout; |
| } |
| } |
| |
| @Override |
| public boolean hasTimedOut() { |
| return state == State.ROLLBACK_ONLY && exception != null && exception.getType() == ActiveMQExceptionType.TRANSACTION_TIMEOUT; |
| } |
| |
| @Override |
| public void prepare() throws Exception { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::prepare::" + this); |
| } |
| try (ArtemisCloseable lock = storageManager.closeableReadLock()) { |
| synchronized (timeoutLock) { |
| if (isEffective()) { |
| logger.debug("TransactionImpl::prepare::" + this + " is being ignored"); |
| return; |
| } |
| if (state == State.ROLLBACK_ONLY) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this); |
| } |
| |
| internalRollback(); |
| |
| if (exception != null) { |
| throw exception; |
| } else { |
| // Do nothing |
| return; |
| } |
| } else if (state != State.ACTIVE) { |
| throw new IllegalStateException("Transaction is in invalid state " + state); |
| } |
| |
| if (xid == null) { |
| throw new IllegalStateException("Cannot prepare non XA transaction"); |
| } |
| |
| beforePrepare(); |
| |
| storageManager.prepare(id, xid); |
| |
| state = State.PREPARED; |
| // We use the Callback even for non persistence |
| // If we are using non-persistence with replication, the replication manager will have |
| // to execute this runnable in the correct order |
| storageManager.afterCompleteOperations(new IOCallback() { |
| |
| @Override |
| public void onError(final int errorCode, final String errorMessage) { |
| ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); |
| } |
| |
| @Override |
| public void done() { |
| afterPrepare(); |
| } |
| }); |
| } |
| } |
| } |
| |
| @Override |
| public void commit() throws Exception { |
| commit(true); |
| } |
| |
| @Override |
| public void commit(final boolean onePhase) throws Exception { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::commit::" + this); |
| } |
| synchronized (timeoutLock) { |
| if (state == State.COMMITTED) { |
| // I don't think this could happen, but just in case |
| logger.debug("TransactionImpl::commit::" + this + " is being ignored"); |
| return; |
| } |
| if (state == State.ROLLBACK_ONLY) { |
| internalRollback(); |
| |
| if (exception != null) { |
| throw exception; |
| } else { |
| // Do nothing |
| return; |
| } |
| } |
| |
| if (xid != null) { |
| if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) { |
| throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); |
| } |
| } else { |
| if (state != State.ACTIVE) { |
| throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); |
| } |
| } |
| |
| beforeCommit(); |
| |
| doCommit(); |
| |
| // We want to make sure that nothing else gets done after the commit is issued |
| // this will eliminate any possibility or races |
| final List<TransactionOperation> operationsToComplete = this.operations; |
| this.operations = null; |
| |
| // We use the Callback even for non persistence |
| // If we are using non-persistence with replication, the replication manager will have |
| // to execute this runnable in the correct order |
| // This also will only use a different thread if there are any IO pending. |
| // If the IO finished early by the time we got here, we won't need an executor |
| storageManager.afterCompleteOperations(new IOCallback() { |
| |
| @Override |
| public void onError(final int errorCode, final String errorMessage) { |
| ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); |
| } |
| |
| @Override |
| public void done() { |
| afterCommit(operationsToComplete); |
| } |
| }); |
| |
| final List<TransactionOperation> storeOperationsToComplete = this.storeOperations; |
| this.storeOperations = null; |
| |
| if (storeOperationsToComplete != null) { |
| storageManager.afterStoreOperations(new IOCallback() { |
| |
| @Override |
| public void onError(final int errorCode, final String errorMessage) { |
| ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); |
| } |
| |
| @Override |
| public void done() { |
| afterCommit(storeOperationsToComplete); |
| } |
| }); |
| } |
| |
| } |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| protected void doCommit() throws Exception { |
| if (containsPersistent || xid != null && state == State.PREPARED) { |
| // ^^ These are the scenarios where we require a storage.commit |
| // for anything else we won't use the journal |
| storageManager.commit(id); |
| } |
| |
| state = State.COMMITTED; |
| } |
| |
| @Override |
| public boolean tryRollback() { |
| synchronized (timeoutLock) { |
| if (state == State.ROLLEDBACK) { |
| // I don't think this could happen, but just in case |
| logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored"); |
| return true; |
| } |
| if (state != State.PREPARED) { |
| try { |
| internalRollback(); |
| return true; |
| } catch (Exception e) { |
| // nothing we can do beyond logging |
| // no need to special handler here as this was not even supposed to happen at this point |
| // even if it happenes this would be the exception of the exception, so we just log here |
| logger.warn(e.getMessage(), e); |
| } |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void rollback() throws Exception { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::rollback::" + this); |
| } |
| |
| synchronized (timeoutLock) { |
| if (state == State.ROLLEDBACK) { |
| // I don't think this could happen, but just in case |
| logger.debug("TransactionImpl::rollback::" + this + " is being ignored"); |
| return; |
| } |
| if (xid != null) { |
| if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) { |
| throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); |
| } |
| } else { |
| if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) { |
| throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state); |
| } |
| } |
| |
| internalRollback(); |
| } |
| } |
| |
| private void internalRollback() throws Exception { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::internalRollback " + this); |
| } |
| |
| beforeRollback(); |
| |
| try { |
| doRollback(); |
| state = State.ROLLEDBACK; |
| } catch (IllegalStateException e) { |
| // Something happened before and the TX didn't make to the Journal / Storage |
| // We will like to execute afterRollback and clear anything pending |
| ActiveMQServerLogger.LOGGER.failedToPerformRollback(e); |
| } |
| // We want to make sure that nothing else gets done after the commit is issued |
| // this will eliminate any possibility or races |
| final List<TransactionOperation> operationsToComplete = this.operations; |
| this.operations = null; |
| |
| final List<TransactionOperation> storeOperationsToComplete = this.storeOperations; |
| this.storeOperations = null; |
| |
| // We use the Callback even for non persistence |
| // If we are using non-persistence with replication, the replication manager will have |
| // to execute this runnable in the correct order |
| storageManager.afterCompleteOperations(new IOCallback() { |
| |
| @Override |
| public void onError(final int errorCode, final String errorMessage) { |
| ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); |
| } |
| |
| @Override |
| public void done() { |
| afterRollback(operationsToComplete); |
| } |
| }); |
| |
| if (storeOperationsToComplete != null) { |
| storageManager.afterStoreOperations(new IOCallback() { |
| |
| @Override |
| public void onError(final int errorCode, final String errorMessage) { |
| ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); |
| } |
| |
| @Override |
| public void done() { |
| afterRollback(storeOperationsToComplete); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void suspend() { |
| synchronized (timeoutLock) { |
| if (state != State.ACTIVE) { |
| throw new IllegalStateException("Can only suspend active transaction"); |
| } |
| state = State.SUSPENDED; |
| } |
| } |
| |
| @Override |
| public void resume() { |
| synchronized (timeoutLock) { |
| if (state != State.SUSPENDED) { |
| throw new IllegalStateException("Can only resume a suspended transaction"); |
| } |
| state = State.ACTIVE; |
| } |
| } |
| |
| @Override |
| public Transaction.State getState() { |
| return state; |
| } |
| |
| @Override |
| public void setState(final State state) { |
| this.state = state; |
| } |
| |
| @Override |
| public Xid getXid() { |
| return xid; |
| } |
| |
| @Override |
| public void markAsRollbackOnly(final ActiveMQException exception) { |
| synchronized (timeoutLock) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage()); |
| } |
| |
| if (isEffective()) { |
| logger.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)"); |
| return; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Marking Transaction " + this.id + " as rollback only"); |
| } |
| state = State.ROLLBACK_ONLY; |
| |
| this.exception = exception; |
| } |
| } |
| |
| @Override |
| public synchronized void addOperation(final TransactionOperation operation) { |
| checkCreateOperations(); |
| |
| operations.add(operation); |
| } |
| |
| @Override |
| public synchronized void afterStore(TransactionOperation sync) { |
| if (storeOperations == null) { |
| storeOperations = new LinkedList<>(); |
| } |
| storeOperations.add(sync); |
| } |
| |
| private int getOperationsCount() { |
| checkCreateOperations(); |
| |
| return operations.size(); |
| } |
| |
| @Override |
| public synchronized List<TransactionOperation> getAllOperations() { |
| |
| if (operations != null) { |
| return new ArrayList<>(operations); |
| } else { |
| return new ArrayList<>(); |
| } |
| } |
| |
| @Override |
| public void putProperty(final int index, final Object property) { |
| ensurePropertiesCapacity(index + 1); |
| |
| properties[index] = property; |
| } |
| |
| @Override |
| public Object getProperty(final int index) { |
| return properties == null ? null : (index < properties.length ? properties[index] : null); |
| } |
| |
| // Private |
| // ------------------------------------------------------------------- |
| |
| protected void doRollback() throws Exception { |
| if (containsPersistent || xid != null && state == State.PREPARED) { |
| storageManager.rollback(id); |
| } |
| } |
| |
| private void checkCreateOperations() { |
| if (operations == null) { |
| operations = new LinkedList<>(); |
| } |
| } |
| |
| protected synchronized void afterCommit(List<TransactionOperation> operationsToComplete) { |
| if (operationsToComplete != null) { |
| for (TransactionOperation operation : operationsToComplete) { |
| operation.afterCommit(this); |
| } |
| // Help out GC here |
| operationsToComplete.clear(); |
| } |
| } |
| |
| private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) { |
| if (operationsToComplete != null) { |
| for (TransactionOperation operation : operationsToComplete) { |
| operation.afterRollback(this); |
| } |
| // Help out GC here |
| operationsToComplete.clear(); |
| } |
| } |
| |
| private synchronized void beforeCommit() throws Exception { |
| if (operations != null) { |
| for (TransactionOperation operation : operations) { |
| operation.beforeCommit(this); |
| } |
| } |
| if (storeOperations != null) { |
| for (TransactionOperation operation : storeOperations) { |
| operation.beforeCommit(this); |
| } |
| } |
| } |
| |
| private synchronized void beforePrepare() throws Exception { |
| if (operations != null) { |
| for (TransactionOperation operation : operations) { |
| operation.beforePrepare(this); |
| } |
| } |
| if (storeOperations != null) { |
| for (TransactionOperation operation : storeOperations) { |
| operation.beforePrepare(this); |
| } |
| } |
| } |
| |
| private synchronized void beforeRollback() throws Exception { |
| if (operations != null) { |
| for (TransactionOperation operation : operations) { |
| operation.beforeRollback(this); |
| } |
| } |
| if (storeOperations != null) { |
| for (TransactionOperation operation : storeOperations) { |
| operation.beforeRollback(this); |
| } |
| } |
| } |
| |
| private synchronized void afterPrepare() { |
| if (operations != null) { |
| for (TransactionOperation operation : operations) { |
| operation.afterPrepare(this); |
| } |
| } |
| if (storeOperations != null) { |
| for (TransactionOperation operation : storeOperations) { |
| operation.afterPrepare(this); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| Date dt = new Date(this.createTime); |
| return "TransactionImpl [xid=" + xid + |
| ", txID=" + |
| id + |
| ", xid=" + xid + |
| ", state=" + |
| state + |
| ", createTime=" + |
| createTime + "(" + dt + ")" + |
| ", timeoutSeconds=" + |
| timeoutSeconds + |
| ", nr operations = " + getOperationsCount() + |
| "]@" + |
| Integer.toHexString(hashCode()); |
| } |
| } |