BATCHEE-95 stop subJobs only if they are running
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@
             if (parallelBatchWorkUnits != null) {

                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {

                     try {

-                        kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());

+

+                        // only try to stop the sub-jobs if they are running

+                        if (subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTING ||

+                            subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTED) {

+

+                            kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());

+                        }

                     } catch (Exception e) {

                         // TODO - Is this what we want to know.

                         // Blow up if it happens to force the issue.

diff --git a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+    private static final Logger log = LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+    @Test
+    public void testStopPartitionedBatch() throws Exception {
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+        long executionId = jobOperator.start("partition-stop", new Properties());
+
+        do {
+            log.info("Waiting til batch is started");
+            Thread.sleep(50);
+        }
+        while (jobOperator.getJobExecution(executionId).getBatchStatus() != BatchStatus.STARTED);
+
+        Thread.sleep(100);
+
+        jobOperator.stop(executionId);
+
+        BatchStatus status = Batches.waitFor(jobOperator, executionId);
+        Assert.assertEquals(status, BatchStatus.STOPPED);
+    }
+
+
+    public static class StopReader extends AbstractItemReader {
+
+        private static final int MAX_INVOCATIONS = 2;
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        private int invocations;
+
+
+        @Override
+        public Object readItem() throws Exception {
+            if (invocations++ < MAX_INVOCATIONS) {
+
+                Thread.sleep(5);
+                return invocations;
+            }
+
+            log.info("{} invoked {} times", idx, invocations);
+            return null;
+        }
+    }
+
+    public static class StopWriter extends AbstractItemWriter {
+
+        private static final Map<Integer, List<Object>> STORAGE = new HashMap<Integer, List<Object>>(2);
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        @Override
+        public void writeItems(List<Object> items) throws Exception {
+
+            List<Object> objects = STORAGE.get(idx);
+            if (objects == null) {
+                objects = new ArrayList<Object>();
+                STORAGE.put(idx, objects);
+            }
+
+            objects.addAll(items);
+        }
+    }
+
+    public static class StopMapper implements PartitionMapper {
+
+        private static final int NUMBER_OF_PARTITIONS = 50;
+        private static final int NUMBER_OF_THREADS = 5;
+
+        @Override
+        public PartitionPlan mapPartitions() throws Exception {
+
+            Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+            for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+                Properties properties = new Properties();
+                properties.setProperty("idx", String.valueOf(i + 1));
+
+                props[i] = properties;
+            }
+
+            PartitionPlanImpl plan = new PartitionPlanImpl();
+            plan.setPartitions(NUMBER_OF_PARTITIONS);
+            plan.setThreads(NUMBER_OF_THREADS);
+            plan.setPartitionProperties(props);
+
+            return plan;
+        }
+    }
+}
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+    <step id="the-step">
+        <chunk item-count="10">
+            <reader ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+                <properties>
+                    <property name="idx" value="#{partitionPlan['idx']}" />
+                </properties>
+            </reader>
+            <writer ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+        </chunk>
+        <partition>
+            <mapper ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+        </partition>
+    </step>
+</job>