BATCHEE-131 keep track of running jobs

and actively stop them on shutdown.
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index 27082f7..af0f54d 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -61,7 +61,7 @@
 import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;

 

 

-public class JobOperatorImpl implements JobOperator {

+public class JobOperatorImpl implements JobOperator, AutoCloseable {

     private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());

 

     static {

@@ -115,6 +115,10 @@
         this(ServicesManager.find());

     }

 

+    public void close() throws Exception {

+

+    }

+

     @Override

     public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {

         /*

diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
index f4db719..df8beea 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
@@ -18,12 +18,17 @@
 
 import java.util.Properties;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
 import javax.enterprise.context.spi.CreationalContext;
 import javax.enterprise.inject.spi.Bean;
 import javax.enterprise.inject.spi.BeanManager;
 
 import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.util.BatchWorkUnit;
 import org.apache.batchee.spi.BatchThreadPoolService;
 
 /**
@@ -43,10 +48,12 @@
  *
  */
 public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
-    
+
+    private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName());
+
     private BeanManager beanManager;
     private ThreadExecutorEjb threadExecutorEjb;
-    
+
     @Override
     public void init(Properties batchConfig) {
         beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager();
@@ -65,8 +72,19 @@
     
     @Override
     public void shutdown() {
-        // We cannot force an async EJB to shutdown.
-        // This usually works out of the box if the container EJB
-        // undeploys or stops the application.
+        Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits();
+        if (!runningBatchWorkUnits.isEmpty()) {
+            JobOperator jobOperator = BatchRuntime.getJobOperator();
+            for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+                try {
+                    long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+                    if (executionId >= 0) {
+                        jobOperator.stop(executionId);
+                    }
+                } catch(Exception e) {
+                    logger.log(Level.SEVERE, "Failure while shutting down execution", e);
+                }
+            }
+        }
     }
 }
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
index d4923d9..edb732e 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
@@ -16,6 +16,8 @@
  */
 package org.apache.batchee.tools.services.thread;
 
+import org.apache.batchee.container.util.BatchWorkUnit;
+
 import javax.annotation.Resource;
 import javax.ejb.Asynchronous;
 import javax.ejb.Lock;
@@ -24,6 +26,9 @@
 import javax.ejb.TransactionManagement;
 import javax.ejb.TransactionManagementType;
 import javax.transaction.UserTransaction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Small helper class to allow new threads being created via the
@@ -42,18 +47,31 @@
     @Resource
     private UserTransaction ut;
 
+    private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+
     private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>();
 
     @Asynchronous
     public void executeTask(Runnable work, Object config) {
         try {
             userTransactions.set(ut);
+            if (work instanceof BatchWorkUnit) {
+                runningBatchWorkUnits.add((BatchWorkUnit) work);
+            }
+
             work.run();
         } finally {
+            if (work instanceof BatchWorkUnit) {
+                runningBatchWorkUnits.remove(work);
+            }
             userTransactions.remove();
         }
     }
 
+    public Set<BatchWorkUnit> getRunningBatchWorkUnits() {
+        return runningBatchWorkUnits;
+    }
 
     public static UserTransaction getUserTransaction() {
         return userTransactions.get();