BATCHEE-131 keep track of running jobs
and actively stop them on shutdown.
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index 27082f7..af0f54d 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -61,7 +61,7 @@
import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;
-public class JobOperatorImpl implements JobOperator {
+public class JobOperatorImpl implements JobOperator, AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());
static {
@@ -115,6 +115,10 @@
this(ServicesManager.find());
}
+ public void close() throws Exception {
+
+ }
+
@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
/*
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
index f4db719..df8beea 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
@@ -18,12 +18,17 @@
import java.util.Properties;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.util.BatchWorkUnit;
import org.apache.batchee.spi.BatchThreadPoolService;
/**
@@ -43,10 +48,12 @@
*
*/
public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
-
+
+ private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName());
+
private BeanManager beanManager;
private ThreadExecutorEjb threadExecutorEjb;
-
+
@Override
public void init(Properties batchConfig) {
beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager();
@@ -65,8 +72,19 @@
@Override
public void shutdown() {
- // We cannot force an async EJB to shutdown.
- // This usually works out of the box if the container EJB
- // undeploys or stops the application.
+ Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits();
+ if (!runningBatchWorkUnits.isEmpty()) {
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+ try {
+ long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+ if (executionId >= 0) {
+ jobOperator.stop(executionId);
+ }
+ } catch(Exception e) {
+ logger.log(Level.SEVERE, "Failure while shutting down execution", e);
+ }
+ }
+ }
}
}
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
index d4923d9..edb732e 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
@@ -16,6 +16,8 @@
*/
package org.apache.batchee.tools.services.thread;
+import org.apache.batchee.container.util.BatchWorkUnit;
+
import javax.annotation.Resource;
import javax.ejb.Asynchronous;
import javax.ejb.Lock;
@@ -24,6 +26,9 @@
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.UserTransaction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Small helper class to allow new threads being created via the
@@ -42,18 +47,31 @@
@Resource
private UserTransaction ut;
+ private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+
private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>();
@Asynchronous
public void executeTask(Runnable work, Object config) {
try {
userTransactions.set(ut);
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.add((BatchWorkUnit) work);
+ }
+
work.run();
} finally {
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.remove(work);
+ }
userTransactions.remove();
}
}
+ public Set<BatchWorkUnit> getRunningBatchWorkUnits() {
+ return runningBatchWorkUnits;
+ }
public static UserTransaction getUserTransaction() {
return userTransactions.get();