ARIES-1783 EM joins transaction when needed

EMSupplier makes a cached EM join a transaction if there is currently
an active one.
diff --git a/itests/jpa-container-blueprint-testbundle/src/main/java/org/apache/aries/jpa/container/itest/bundle/blueprint/impl/CarServiceWithRequiresNew.java b/itests/jpa-container-blueprint-testbundle/src/main/java/org/apache/aries/jpa/container/itest/bundle/blueprint/impl/CarServiceWithRequiresNew.java
index d1b041c..754bdd8 100644
--- a/itests/jpa-container-blueprint-testbundle/src/main/java/org/apache/aries/jpa/container/itest/bundle/blueprint/impl/CarServiceWithRequiresNew.java
+++ b/itests/jpa-container-blueprint-testbundle/src/main/java/org/apache/aries/jpa/container/itest/bundle/blueprint/impl/CarServiceWithRequiresNew.java
@@ -17,6 +17,7 @@
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 import javax.transaction.Transactional;
 import javax.transaction.Transactional.TxType;
@@ -53,7 +54,9 @@
             ref = getService();
             CarService carService = bundleContext.getService(ref);
             carService.addCar(c);
-            return Arrays.asList(this.getCar("TR123"));
+            final List<Car> returnVal = Arrays.asList(this.getCar("TR123"));
+            carService.deleteCar("TR123");
+            return returnVal;
         } finally {
             if (ref != null) {
                 bundleContext.ungetService(ref);
@@ -75,7 +78,9 @@
     }
 
     @Override
+    @Transactional
     public void deleteCar(String id) {
+        em.remove(this.getCar(id));
     }
 
     public void setBundleContext(BundleContext bundleContext) {
diff --git a/itests/jpa-container-itest/src/test/java/org/apache/aries/jpa/blueprint/aries/itest/BlueprintTest.java b/itests/jpa-container-itest/src/test/java/org/apache/aries/jpa/blueprint/aries/itest/BlueprintTest.java
index 507ff3e..ab516c4 100644
--- a/itests/jpa-container-itest/src/test/java/org/apache/aries/jpa/blueprint/aries/itest/BlueprintTest.java
+++ b/itests/jpa-container-itest/src/test/java/org/apache/aries/jpa/blueprint/aries/itest/BlueprintTest.java
@@ -43,10 +43,10 @@
 public class BlueprintTest extends AbstractCarJPAITest {
     @Inject
     Coordinator coordinator;
-    
+
     @Inject
     UserTransaction ut;
-    
+
     @Test
     public void testCoordination() {
         assertNoCoordination();
@@ -66,7 +66,7 @@
             Assert.assertEquals(0, carService.getCars().size());
         }
     }
-    
+
     @Test
     public void testInjectToMethod() throws Exception {
         carLifecycle(getCarService("method"));
@@ -86,7 +86,7 @@
     public void testEm() throws Exception {
         carLifecycle(getCarService("em"));
     }
-    
+
     @Test
     public void testEmJtaAnn() throws Exception {
         carLifecycle(getCarService("emJtaAnn"));
@@ -96,17 +96,17 @@
     public void testSupplier() throws Exception {
         carLifecycle(getCarService("supplier"));
     }
-    
+
     @Test
     public void testRealTransactional() throws Exception {
         carRealTransactionalLifecycle(getCarService("emJtaAnn"));
     }
-    
+
     @Test
     public void testInlined() throws Exception {
         carRealTransactionalLifecycle(getCarService("emJtaAnnInlined"));
     }
-    
+
     @Test
     public void testCoordinationLifecycle() throws InterruptedException, ExecutionException {
         CarService carService = getCarService("em");
@@ -127,7 +127,6 @@
     }
 
     @Test
-    @Ignore
     public void testCarWithRequiresNewAnnotation() throws Exception {
         CarService cs = getCarService("rn");
         cs.getCars();
@@ -146,7 +145,7 @@
         assertBlueCar(carService.getCar(BLUE_PLATE));
         carService.deleteCar(BLUE_PLATE);
     }
-    
+
     private void carRealTransactionalLifecycle(CarService carService) throws IllegalStateException, SystemException, NotSupportedException {
         assertNoCoordination();
         if (carService.getCar(BLACK_CAR_PLATE) != null) {
@@ -162,7 +161,7 @@
         Coordination coord = coordinator.peek();
         Assert.assertNull("There should not be a coordination on this thread", coord);
     }
-    
+
     private void assertNoCars(CarService carService) {
         Assert.assertEquals("Invalid number of cars", 0, carService.getCars().size());
     }
diff --git a/jpa-support/src/main/java/org/apache/aries/jpa/support/impl/EMSupplierImpl.java b/jpa-support/src/main/java/org/apache/aries/jpa/support/impl/EMSupplierImpl.java
index 14577e6..1956f01 100644
--- a/jpa-support/src/main/java/org/apache/aries/jpa/support/impl/EMSupplierImpl.java
+++ b/jpa-support/src/main/java/org/apache/aries/jpa/support/impl/EMSupplierImpl.java
@@ -29,8 +29,16 @@
 
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 
 import org.apache.aries.jpa.supplier.EmSupplier;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
 import org.osgi.service.coordinator.Coordination;
 import org.osgi.service.coordinator.Coordinator;
 import org.osgi.service.coordinator.Participant;
@@ -39,7 +47,7 @@
 
 /**
  * Thread safe way to use an EntityManager.
- * 
+ *
  * Before the EMF is closed the close() method has to be called to make
  * sure all EMs are closed.
  */
@@ -88,9 +96,26 @@
             setEm(coordination, em);
             coordination.addParticipant(new EmShutDownParticipant());
         }
+        else {
+            final Bundle bundle = FrameworkUtil.getBundle(this.getClass());
+            if (bundle != null) {
+                final BundleContext bundleContext = bundle.getBundleContext();
+                ServiceReference<TransactionManager> tmRef = bundleContext.getServiceReference(TransactionManager.class);
+                TransactionManager tm = bundleContext.getService(tmRef);
+                try {
+                    final Transaction transaction = tm.getTransaction();
+                    if (transaction != null && transaction.getStatus() == Status.STATUS_ACTIVE) {
+                        em.joinTransaction();
+                    }
+                }
+                catch( SystemException se ) {
+                    throw new IllegalStateException("Unable to check transaction status and join the transaction", se);
+                }
+            }
+        }
         return em;
     }
-    
+
     Coordination getTopCoordination() {
         Coordination coordination = coordinator.peek();
         while (coordination != null && coordination.getEnclosingCoordination() != null) {
@@ -98,7 +123,7 @@
         }
         return coordination;
     }
-    
+
     private void setEm(Coordination coordination, EntityManager em) {
         Map<Class<?>, Object> vars = coordination.getVariables();
         synchronized (vars) {
@@ -126,7 +151,7 @@
         }
     }
 
-    
+
     @SuppressWarnings("unchecked")
     private Map<String, EntityManager> getEmMap(Coordination coordination) {
         Map<String, EntityManager> emMap = (Map<String, EntityManager>)coordination.getVariables().get(EntityManager.class);
@@ -167,7 +192,7 @@
     }
 
     private synchronized boolean shutdownRemaining() {
-        boolean clean = emSet.isEmpty(); 
+        boolean clean = emSet.isEmpty();
         if  (!clean) {
             LOG.warn("{} EntityManagers still open after timeout. Shutting them down now", emSet.size());
         }
@@ -206,7 +231,7 @@
             EntityManager em = removeEm(coordination);
             emSet.remove(em);
             em.close();
-            
+
             if (shutdown.get()) {
                 emsToShutDown.countDown();
             }