BATCHEE-95 stop subJobs only if they are running
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@
if (parallelBatchWorkUnits != null) {
for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
try {
- kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+ // only try to stop the sub-jobs if they are running
+ if (subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTING ||
+ subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTED) {
+
+ kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ }
} catch (Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
diff --git a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+ private static final Logger log = LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+ @Test
+ public void testStopPartitionedBatch() throws Exception {
+
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ long executionId = jobOperator.start("partition-stop", new Properties());
+
+ do {
+ log.info("Waiting til batch is started");
+ Thread.sleep(50);
+ }
+ while (jobOperator.getJobExecution(executionId).getBatchStatus() != BatchStatus.STARTED);
+
+ Thread.sleep(100);
+
+ jobOperator.stop(executionId);
+
+ BatchStatus status = Batches.waitFor(jobOperator, executionId);
+ Assert.assertEquals(status, BatchStatus.STOPPED);
+ }
+
+
+ public static class StopReader extends AbstractItemReader {
+
+ private static final int MAX_INVOCATIONS = 2;
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ private int invocations;
+
+
+ @Override
+ public Object readItem() throws Exception {
+ if (invocations++ < MAX_INVOCATIONS) {
+
+ Thread.sleep(5);
+ return invocations;
+ }
+
+ log.info("{} invoked {} times", idx, invocations);
+ return null;
+ }
+ }
+
+ public static class StopWriter extends AbstractItemWriter {
+
+ private static final Map<Integer, List<Object>> STORAGE = new HashMap<Integer, List<Object>>(2);
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ @Override
+ public void writeItems(List<Object> items) throws Exception {
+
+ List<Object> objects = STORAGE.get(idx);
+ if (objects == null) {
+ objects = new ArrayList<Object>();
+ STORAGE.put(idx, objects);
+ }
+
+ objects.addAll(items);
+ }
+ }
+
+ public static class StopMapper implements PartitionMapper {
+
+ private static final int NUMBER_OF_PARTITIONS = 50;
+ private static final int NUMBER_OF_THREADS = 5;
+
+ @Override
+ public PartitionPlan mapPartitions() throws Exception {
+
+ Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+ for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+ Properties properties = new Properties();
+ properties.setProperty("idx", String.valueOf(i + 1));
+
+ props[i] = properties;
+ }
+
+ PartitionPlanImpl plan = new PartitionPlanImpl();
+ plan.setPartitions(NUMBER_OF_PARTITIONS);
+ plan.setThreads(NUMBER_OF_THREADS);
+ plan.setPartitionProperties(props);
+
+ return plan;
+ }
+ }
+}
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+ <step id="the-step">
+ <chunk item-count="10">
+ <reader ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+ <properties>
+ <property name="idx" value="#{partitionPlan['idx']}" />
+ </properties>
+ </reader>
+ <writer ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+ </chunk>
+ <partition>
+ <mapper ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+ </partition>
+ </step>
+</job>