BATCHEE-131 keep track of running threads

And shut them down if the container stops
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
index 1f92e2f..03c38bc 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
@@ -1,52 +1,107 @@
-/*

- * Copyright 2013 International Business Machines Corp.

- * 

- * See the NOTICE file distributed with this work for additional information

- * regarding copyright ownership. Licensed under the Apache License, 

- * Version 2.0 (the "License"); you may not use this file except in compliance

- * with the License. You may obtain a copy of the License at

- * 

- *   http://www.apache.org/licenses/LICENSE-2.0

- * 

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

-*/

-package org.apache.batchee.container.services.executor;

-

-import org.apache.batchee.container.exception.BatchContainerServiceException;

-import org.apache.batchee.spi.BatchThreadPoolService;

-

-import java.util.Properties;

-import java.util.concurrent.ExecutorService;

-

-import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;

-

-public abstract class AbstractThreadPoolService implements BatchThreadPoolService {

-    protected ExecutorService executorService;

-

-    protected abstract ExecutorService newExecutorService(Properties batchConfig);

-

-    @Override

-    public void init(final Properties batchConfig) throws BatchContainerServiceException {

-        executorService = newExecutorService(batchConfig);

-    }

-

-    @Override

-    public void shutdown() throws BatchContainerServiceException {

-        executorService.shutdownNow();

-        executorService = null;

-    }

-

-    @Override

-    public void executeTask(final Runnable work, final Object config) {

-        executorService.execute(runnableLoaderAware(work));

-    }

-

-    @Override

-    public String toString() {

-        return getClass().getName();

-    }

-}

+/*
+ * Copyright 2013 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.executor;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.spi.BatchThreadPoolService;
+
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;
+
+public abstract class AbstractThreadPoolService implements BatchThreadPoolService {
+    private final static Logger LOGGER = Logger.getLogger(AbstractThreadPoolService.class.getName());
+
+    protected ExecutorService executorService;
+
+    volatile boolean shutdown = false;
+
+    private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+    protected abstract ExecutorService newExecutorService(Properties batchConfig);
+
+    @Override
+    public void init(final Properties batchConfig) throws BatchContainerServiceException {
+        executorService = newExecutorService(batchConfig);
+    }
+
+    @Override
+    public void shutdown() throws BatchContainerServiceException {
+        this.shutdown = true;
+        if (!runningBatchWorkUnits.isEmpty()) {
+            JobOperator jobOperator = BatchRuntime.getJobOperator();
+            for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+                try {
+                    long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+                    if (executionId >= 0) {
+                        jobOperator.stop(executionId);
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failure while shutting down execution", e);
+                }
+            }
+        }
+
+        executorService.shutdownNow();
+        executorService = null;
+    }
+
+    @Override
+    public void executeTask(final Runnable work, final Object config) {
+        if (shutdown) {
+            throw new IllegalStateException("Refuse to start Batch Task due to shutdown being in progress!");
+        }
+        executorService.execute(runnableLoaderAware(new ActiveWorkTracker(work)));
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getName();
+    }
+
+    class ActiveWorkTracker implements Runnable {
+        private final Runnable work;
+
+        ActiveWorkTracker(Runnable work) {
+            this.work = work;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (work instanceof BatchWorkUnit) {
+                    runningBatchWorkUnits.add((BatchWorkUnit) work);
+                }
+                work.run();
+            } finally {
+                if (work instanceof BatchWorkUnit) {
+                    runningBatchWorkUnits.remove(work);
+                }
+            }
+        }
+    }
+
+}