[tx-control] Actually enlist the XA connection
git-svn-id: https://svn.apache.org/repos/asf/aries/trunk/tx-control@1738968 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java b/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
index 95db41f..58b729c 100644
--- a/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
+++ b/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
@@ -2,6 +2,7 @@
import static java.util.Optional.ofNullable;
import static javax.persistence.spi.PersistenceUnitTransactionType.JTA;
+import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION;
import java.io.PrintWriter;
import java.sql.Connection;
@@ -9,13 +10,20 @@
import java.sql.SQLFeatureNotSupportedException;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.logging.Logger;
import javax.persistence.EntityManagerFactory;
import javax.persistence.spi.PersistenceUnitTransactionType;
import javax.sql.DataSource;
+import javax.transaction.xa.XAResource;
+import org.apache.aries.tx.control.jdbc.common.impl.ScopedConnectionWrapper;
+import org.apache.aries.tx.control.jdbc.common.impl.TxConnectionWrapper;
+import org.apache.aries.tx.control.jdbc.xa.connection.impl.XAConnectionWrapper;
import org.osgi.service.jpa.EntityManagerFactoryBuilder;
+import org.osgi.service.transaction.control.TransactionContext;
import org.osgi.service.transaction.control.TransactionControl;
import org.osgi.service.transaction.control.TransactionException;
import org.osgi.service.transaction.control.jpa.JPAEntityManagerProvider;
@@ -26,16 +34,15 @@
@Override
public JPAEntityManagerProvider getProviderFor(EntityManagerFactoryBuilder emfb, Map<String, Object> jpaProperties,
Map<String, Object> resourceProviderProperties) {
- if(checkEnlistment(resourceProviderProperties)) {
- return new DelayedJPAEntityManagerProvider(tx -> {
-
- Map<String, Object> toUse = enlistDataSource(tx, jpaProperties);
-
- return internalBuilderCreate(emfb, toUse);
+ return new DelayedJPAEntityManagerProvider(tx -> {
+ Map<String, Object> toUse;
+ if(checkEnlistment(resourceProviderProperties)) {
+ toUse = enlistDataSource(tx, jpaProperties);
+ } else {
+ toUse = jpaProperties;
+ }
+ return tx.notSupported(() -> internalBuilderCreate(emfb, toUse));
});
- }
-
- return internalBuilderCreate(emfb, jpaProperties);
}
private Map<String, Object> enlistDataSource(TransactionControl tx, Map<String, Object> jpaProperties) {
@@ -47,7 +54,7 @@
toReturn.put("javax.persistence.jtaDataSource", ds);
}
- toReturn.put("javax.persistence.jtaDataSource", new EnlistingDataSource(ds));
+ toReturn.put("javax.persistence.jtaDataSource", new EnlistingDataSource(tx, ds));
return toReturn;
}
@@ -115,9 +122,14 @@
public class EnlistingDataSource implements DataSource {
+ private final TransactionControl txControl;
+
private final DataSource delegate;
- public EnlistingDataSource(DataSource delegate) {
+ private final UUID resourceId = UUID.randomUUID();
+
+ public EnlistingDataSource(TransactionControl txControl, DataSource delegate) {
+ this.txControl = txControl;
this.delegate = delegate;
}
@@ -138,7 +150,7 @@
}
public Connection getConnection() throws SQLException {
- return delegate.getConnection();
+ return enlistedConnection(() -> delegate.getConnection());
}
public void setLoginTimeout(int seconds) throws SQLException {
@@ -146,7 +158,7 @@
}
public Connection getConnection(String username, String password) throws SQLException {
- return delegate.getConnection(username, password);
+ return enlistedConnection(() -> delegate.getConnection(username, password));
}
public int getLoginTimeout() throws SQLException {
@@ -156,5 +168,63 @@
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return delegate.getParentLogger();
}
+
+ private Connection enlistedConnection(Callable<Connection> supplier) {
+ TransactionContext txContext = txControl.getCurrentContext();
+
+ if (txContext == null) {
+ throw new TransactionException("The resource " + resourceId
+ + " cannot be accessed outside of an active Transaction Context");
+ }
+
+ Connection existing = (Connection) txContext.getScopedValue(resourceId);
+
+ if (existing != null) {
+ return existing;
+ }
+
+ Connection toReturn;
+ Connection toClose;
+
+ try {
+ toClose = supplier.call();
+ if (txContext.getTransactionStatus() == NO_TRANSACTION) {
+ toReturn = new ScopedConnectionWrapper(toClose);
+ } else if (txContext.supportsXA()) {
+ toReturn = new TxConnectionWrapper(toClose);
+ txContext.registerXAResource(getXAResource(toClose));
+ } else {
+ throw new TransactionException(
+ "There is a transaction active, but it does not support XA participants");
+ }
+ } catch (Exception sqle) {
+ throw new TransactionException(
+ "There was a problem getting hold of a database connection",
+ sqle);
+ }
+
+
+ txContext.postCompletion(x -> {
+ try {
+ toClose.close();
+ } catch (SQLException sqle) {
+ // TODO log this
+ }
+ });
+
+ txContext.putScopedValue(resourceId, toReturn);
+
+ return toReturn;
+ }
+
+ private XAResource getXAResource(Connection conn) throws SQLException {
+ if(conn instanceof XAConnectionWrapper) {
+ return ((XAConnectionWrapper)conn).getXaResource();
+ } else if(conn.isWrapperFor(XAConnectionWrapper.class)){
+ return conn.unwrap(XAConnectionWrapper.class).getXaResource();
+ } else {
+ throw new IllegalArgumentException("The XAResource for the connection cannot be found");
+ }
+ }
}
}
diff --git a/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java b/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java
deleted file mode 100644
index 5b7c884..0000000
--- a/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.aries.tx.control.jpa.xa.impl;
-
-import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.UUID;
-
-import javax.sql.DataSource;
-import javax.transaction.xa.XAResource;
-
-import org.apache.aries.tx.control.jdbc.common.impl.ConnectionWrapper;
-import org.apache.aries.tx.control.jdbc.common.impl.TxConnectionWrapper;
-import org.apache.aries.tx.control.jdbc.xa.connection.impl.XAConnectionWrapper;
-import org.osgi.service.transaction.control.TransactionContext;
-import org.osgi.service.transaction.control.TransactionControl;
-import org.osgi.service.transaction.control.TransactionException;
-
-public class XAEnabledTxContextBindingConnection extends ConnectionWrapper {
-
- private final TransactionControl txControl;
- private final UUID resourceId;
- private final DataSource dataSource;
-
- public XAEnabledTxContextBindingConnection(TransactionControl txControl,
- DataSource dataSource, UUID resourceId, boolean xaEnabled, boolean localEnabled) {
- this.txControl = txControl;
- this.dataSource = dataSource;
- this.resourceId = resourceId;
- }
-
- @Override
- protected final Connection getDelegate() {
-
- TransactionContext txContext = txControl.getCurrentContext();
-
- if (txContext == null) {
- throw new TransactionException("The resource " + dataSource
- + " cannot be accessed outside of an active Transaction Context");
- }
-
- Connection existing = (Connection) txContext.getScopedValue(resourceId);
-
- if (existing != null) {
- return existing;
- }
-
- Connection toReturn;
- Connection toClose;
-
- try {
- if (txContext.getTransactionStatus() == NO_TRANSACTION) {
- throw new TransactionException("The JTA DataSource cannot be used outside a transaction");
- } else if (txContext.supportsXA()) {
- toClose = dataSource.getConnection();
- toReturn = new TxConnectionWrapper(toClose);
- txContext.registerXAResource(getXAResource(toClose));
- } else {
- throw new TransactionException(
- "There is a transaction active, but it does not support XA participants");
- }
- } catch (Exception sqle) {
- throw new TransactionException(
- "There was a problem getting hold of a database connection",
- sqle);
- }
-
-
- txContext.postCompletion(x -> {
- try {
- toClose.close();
- } catch (SQLException sqle) {
- // TODO log this
- }
- });
-
- txContext.putScopedValue(resourceId, toReturn);
-
- return toReturn;
- }
-
-
- private XAResource getXAResource(Connection conn) throws SQLException {
- if(conn instanceof XAConnectionWrapper) {
- return ((XAConnectionWrapper)conn).getXaResource();
- } else if(conn.isWrapperFor(XAConnectionWrapper.class)){
- return conn.unwrap(XAConnectionWrapper.class).getXaResource();
- } else {
- throw new IllegalArgumentException("The XAResource for the connection cannot be found");
- }
- }
-}