BATCHEE-112 persistence service doesnt support multiple execution for getRunningExecution case of JobOperator when set to memory (default)
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index d1f5a3a..27082f7 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -255,7 +255,7 @@
// get the jobexecution ids associated with this job name
final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName);
- if (executionIds.size() <= 0) {
+ if (executionIds.isEmpty()) {
throw new NoSuchJobException("Job Name " + jobName + " not found");
}
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
index 1416894..faf0f60 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
@@ -57,7 +57,7 @@
// check before blocking
final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
- if (Batches.isDone(finalCheckExec.getBatchStatus())) {
+ if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
waiters.remove(id);
return;
}
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
index ff988f4..7ca4aef 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java
@@ -232,7 +232,7 @@
public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
final List<InternalJobExecution> list = new LinkedList<InternalJobExecution>();
final Structures.JobInstanceData jobInstanceData = data.jobInstanceData.get(jobInstanceId);
- if (jobInstanceData == null || jobInstanceData.executions == null) {
+ if (jobInstanceData == null) {
return list;
}
synchronized (jobInstanceData.executions) {
@@ -245,27 +245,20 @@
@Override
public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
- Structures.JobInstanceData jobInstanceData = null;
- for (final Structures.JobInstanceData instanceData : data.jobInstanceData.values()) {
- if (instanceData.instance.getJobName().equals(jobName)) {
- jobInstanceData = instanceData;
- break;
- }
- }
-
- if (jobInstanceData == null) {
- return Collections.emptySet();
- }
-
final Set<Long> set = new HashSet<Long>();
- synchronized (jobInstanceData.executions) {
- for (final Structures.ExecutionInstanceData executionInstanceData : jobInstanceData.executions) {
- if (RUNNING_STATUSES.contains(executionInstanceData.execution.getBatchStatus())) {
- set.add(executionInstanceData.execution.getExecutionId());
+ for (final Structures.JobInstanceData instanceData : data.jobInstanceData.values()) {
+ if (instanceData.instance.getJobName().equals(jobName)) {
+ synchronized (instanceData.executions) {
+ for (final Structures.ExecutionInstanceData executionInstanceData : instanceData.executions) {
+ if (RUNNING_STATUSES.contains(executionInstanceData.execution.getBatchStatus())) {
+ set.add(executionInstanceData.execution.getExecutionId());
+ }
+ }
}
}
}
+
return set;
}
diff --git a/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java b/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java
new file mode 100644
index 0000000..3127063
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.component;
+
+import javax.batch.api.AbstractBatchlet;
+import javax.batch.api.BatchProperty;
+import javax.inject.Inject;
+
+import static java.lang.Thread.sleep;
+
+public class SimpleBatchlet extends AbstractBatchlet {
+ @Inject
+ @BatchProperty
+ private Long duration;
+
+ @Override
+ public String process() throws Exception {
+ if (duration != null && duration > 0) {
+ sleep(duration);
+ }
+ return "ok";
+ }
+}
diff --git a/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java b/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java
new file mode 100644
index 0000000..4042cf5
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl;
+
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.junit.Test;
+
+import javax.batch.operations.JobOperator;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.batchee.util.Batches.waitForEnd;
+import static org.junit.Assert.assertEquals;
+
+public class JobOperatorImplTest {
+ @Test
+ public void runningExecutionMemory_BATCHEE112() {
+ final JobOperator operator = new JobOperatorImpl(new ServicesManager() {{
+ init(new Properties() {{
+ setProperty(PersistenceManagerService.class.getSimpleName(), MemoryPersistenceManagerService.class.getName());
+ }});
+ }});
+ for (int i = 0; i < 10; i++) {
+ final long id = operator.start("simple", new Properties() {{
+ setProperty("duration", "3000");
+ }});
+ final List<Long> running = operator.getRunningExecutions("simple");
+ assertEquals("Iteration: " + i, singletonList(id), running);
+ waitForEnd(operator, id);
+ }
+ }
+}
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml b/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml
new file mode 100644
index 0000000..9b73787
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+<job id="simple" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+ <step id="exec">
+ <batchlet ref="org.apache.batchee.component.SimpleBatchlet" />
+ </step>
+</job>