blob: 990eb2c2e8e5796745340b06629396c7c4ec9148 [file] [log] [blame]
package org.apache.aries.tx.control.service.xa.impl;
import static java.util.Optional.ofNullable;
import static javax.transaction.xa.XAException.XA_HEURMIX;
import static javax.transaction.xa.XAException.XA_RBOTHER;
import static javax.transaction.xa.XAException.XA_RBPROTO;
import static org.osgi.service.transaction.control.TransactionStatus.ACTIVE;
import static org.osgi.service.transaction.control.TransactionStatus.COMMITTED;
import static org.osgi.service.transaction.control.TransactionStatus.COMMITTING;
import static org.osgi.service.transaction.control.TransactionStatus.MARKED_ROLLBACK;
import static org.osgi.service.transaction.control.TransactionStatus.PREPARED;
import static org.osgi.service.transaction.control.TransactionStatus.PREPARING;
import static org.osgi.service.transaction.control.TransactionStatus.ROLLED_BACK;
import static org.osgi.service.transaction.control.TransactionStatus.ROLLING_BACK;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.aries.tx.control.service.common.impl.AbstractTransactionContextImpl;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.transaction.control.LocalResource;
import org.osgi.service.transaction.control.TransactionContext;
import org.osgi.service.transaction.control.TransactionException;
import org.osgi.service.transaction.control.TransactionStatus;
public class TransactionContextImpl extends AbstractTransactionContextImpl implements TransactionContext {
final List<LocalResource> resources = new ArrayList<>();
private final Transaction oldTran;
private final Transaction currentTransaction;
private final AtomicReference<TransactionStatus> completionState = new AtomicReference<>();
private final GeronimoTransactionManager transactionManager;
private final Object key;
private final boolean readOnly;
public TransactionContextImpl(GeronimoTransactionManager transactionManager, Coordination coordination,
boolean readOnly) {
super(coordination);
this.transactionManager = transactionManager;
this.readOnly = readOnly;
Transaction tmp = null;
try {
tmp = transactionManager.suspend();
transactionManager.begin();
} catch (Exception e) {
if(tmp != null) {
try {
transactionManager.resume(tmp);
} catch (Exception e1) {
e.addSuppressed(e1);
}
}
throw new TransactionException("There was a serious error creating a transaction");
}
oldTran = tmp;
currentTransaction = transactionManager.getTransaction();
key = transactionManager.getTransactionKey();
}
@Override
public Object getTransactionKey() {
return key;
}
@Override
public boolean getRollbackOnly() throws IllegalStateException {
switch (getTransactionStatus()) {
case MARKED_ROLLBACK:
case ROLLING_BACK:
case ROLLED_BACK:
return true;
default:
return false;
}
}
@Override
public void setRollbackOnly() throws IllegalStateException {
TransactionStatus status = getTransactionStatus();
switch (status) {
case ACTIVE:
case MARKED_ROLLBACK:
try {
currentTransaction.setRollbackOnly();
} catch (Exception e) {
throw new TransactionException("Unable to set rollback for the transaction", e);
}
break;
case COMMITTING:
// TODO something here? If it's the first resource then it might
// be ok to roll back?
throw new IllegalStateException("The transaction is already being committed");
case COMMITTED:
throw new IllegalStateException("The transaction is already committed");
case ROLLING_BACK:
case ROLLED_BACK:
// A no op
break;
default:
throw new IllegalStateException("The transaction is in an unkown state");
}
}
@Override
protected void safeSetRollbackOnly() {
TransactionStatus status = getTransactionStatus();
switch (status) {
case ACTIVE:
case MARKED_ROLLBACK:
try {
currentTransaction.setRollbackOnly();
} catch (Exception e) {
throw new TransactionException("Unable to set rollback for the transaction", e);
}
break;
default:
break;
}
}
@Override
public TransactionStatus getTransactionStatus() {
return ofNullable(completionState.get())
.orElseGet(this::getStatusFromTransaction);
}
private TransactionStatus getStatusFromTransaction() {
int status;
try {
status = currentTransaction.getStatus();
} catch (SystemException e) {
throw new TransactionException("Unable to determine the state of the transaction.", e);
}
switch (status) {
case Status.STATUS_ACTIVE:
return ACTIVE;
case Status.STATUS_MARKED_ROLLBACK:
return MARKED_ROLLBACK;
case Status.STATUS_PREPARING:
return PREPARING;
case Status.STATUS_PREPARED:
return PREPARED;
case Status.STATUS_COMMITTING:
return COMMITTING;
case Status.STATUS_COMMITTED:
return COMMITTED;
case Status.STATUS_ROLLING_BACK:
return ROLLING_BACK;
case Status.STATUS_ROLLEDBACK:
return ROLLED_BACK;
default:
throw new TransactionException("Unable to determine the state of the transaction: " + status);
}
}
@Override
public void preCompletion(Runnable job) throws IllegalStateException {
TransactionStatus status = getTransactionStatus();
if (status.compareTo(MARKED_ROLLBACK) > 0) {
throw new IllegalStateException("The current transaction is in state " + status);
}
preCompletion.add(job);
}
@Override
public void postCompletion(Consumer<TransactionStatus> job) throws IllegalStateException {
TransactionStatus status = getTransactionStatus();
if (status == COMMITTED || status == ROLLED_BACK) {
throw new IllegalStateException("The current transaction is in state " + status);
}
postCompletion.add(job);
}
@Override
public void registerXAResource(XAResource resource) {
TransactionStatus status = getTransactionStatus();
if (status.compareTo(MARKED_ROLLBACK) > 0) {
throw new IllegalStateException("The current transaction is in state " + status);
}
try {
currentTransaction.enlistResource(resource);
} catch (Exception e) {
throw new TransactionException("The transaction was unable to enlist a resource", e);
}
}
@Override
public void registerLocalResource(LocalResource resource) {
TransactionStatus status = getTransactionStatus();
if (status.compareTo(MARKED_ROLLBACK) > 0) {
throw new IllegalStateException("The current transaction is in state " + status);
}
resources.add(resource);
}
@Override
public boolean supportsXA() {
return true;
}
@Override
public boolean supportsLocal() {
return true;
}
@Override
public boolean isReadOnly() {
return readOnly;
}
@Override
protected boolean isAlive() {
TransactionStatus status = getTransactionStatus();
return status != COMMITTED && status != ROLLED_BACK;
}
@Override
public void finish() {
if(!resources.isEmpty()) {
XAResource localResource = new LocalXAResourceImpl();
try {
currentTransaction.enlistResource(localResource);
} catch (Exception e) {
safeSetRollbackOnly();
recordFailure(e);
try {
localResource.rollback(null);
} catch (XAException e1) {
recordFailure(e1);
}
}
}
TxListener listener;
boolean manualCallListener;
if(!preCompletion.isEmpty() || !postCompletion.isEmpty()) {
listener = new TxListener();
try {
transactionManager.registerInterposedSynchronization(listener);
manualCallListener = false;
} catch (Exception e) {
manualCallListener = true;
recordFailure(e);
safeSetRollbackOnly();
}
} else {
listener = null;
manualCallListener = false;
}
try {
int status;
try {
if (getRollbackOnly()) {
// GERONIMO-4449 says that we get no beforeCompletion
// callback for rollback :(
if(listener != null) {
listener.beforeCompletion();
}
transactionManager.rollback();
status = Status.STATUS_ROLLEDBACK;
completionState.set(ROLLED_BACK);
} else {
if(manualCallListener) {
listener.beforeCompletion();
}
transactionManager.commit();
status = Status.STATUS_COMMITTED;
completionState.set(COMMITTED);
}
} catch (Exception e) {
recordFailure(e);
status = Status.STATUS_ROLLEDBACK;
completionState.set(ROLLED_BACK);
}
if(manualCallListener) {
listener.afterCompletion(status);
}
} finally {
try {
transactionManager.resume(oldTran);
} catch (Exception e) {
recordFailure(e);
}
}
}
private class LocalXAResourceImpl implements XAResource {
private final AtomicBoolean finished = new AtomicBoolean();
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
if(!finished.compareAndSet(false, true)) {
return;
}
doCommit();
}
private void doCommit() throws XAException {
AtomicBoolean commit = new AtomicBoolean(true);
List<LocalResource> committed = new ArrayList<>(resources.size());
List<LocalResource> rolledback = new ArrayList<>(0);
resources.stream().forEach(lr -> {
try {
if (commit.get()) {
lr.commit();
committed.add(lr);
} else {
lr.rollback();
rolledback.add(lr);
}
} catch (Exception e) {
recordFailure(e);
if (committed.isEmpty()) {
commit.set(false);
// This is needed to override the status from the
// Transaction, which thinks that we're committing
// until we throw an XAException from this commit.
completionState.set(ROLLING_BACK);
}
rolledback.add(lr);
}
});
if(!rolledback.isEmpty()) {
if(committed.isEmpty()) {
throw (XAException) new XAException(XA_RBOTHER)
.initCause(firstUnexpectedException.get());
} else {
throw (XAException) new XAException(XA_HEURMIX)
.initCause(firstUnexpectedException.get());
}
}
}
@Override
public void end(Xid xid, int flags) throws XAException {
//Nothing to do here
}
@Override
public void forget(Xid xid) throws XAException {
//Nothing to do here
}
@Override
public int getTransactionTimeout() throws XAException {
return 3600;
}
@Override
public boolean isSameRM(XAResource xares) throws XAException {
return this == xares;
}
@Override
public int prepare(Xid xid) throws XAException {
if(!finished.compareAndSet(false, true)) {
switch(getTransactionStatus()) {
case COMMITTING:
return XA_OK;
case ROLLING_BACK:
throw new XAException(XA_RBOTHER);
default:
throw new XAException(XA_RBPROTO);
}
}
completionState.set(COMMITTING);
doCommit();
return XA_OK;
}
@Override
public Xid[] recover(int flag) throws XAException {
return new Xid[0];
}
@Override
public void rollback(Xid xid) throws XAException {
if(!finished.compareAndSet(false, true)) {
return;
}
resources.stream().forEach(lr -> {
try {
lr.rollback();
} catch (Exception e) {
// TODO log this
recordFailure(e);
}
});
}
@Override
public boolean setTransactionTimeout(int seconds) throws XAException {
return false;
}
@Override
public void start(Xid xid, int flags) throws XAException {
// Nothing to do here
}
}
private class TxListener implements Synchronization {
@Override
public void beforeCompletion() {
TransactionContextImpl.this.beforeCompletion(() -> safeSetRollbackOnly());
}
@Override
public void afterCompletion(int status) {
TransactionContextImpl.this.afterCompletion(status == Status.STATUS_COMMITTED ? COMMITTED : ROLLED_BACK);
}
}
}