BATCHEE-138 ensure we don't fail when a chunk is stopped with JobOperator, thanks a lot Christian Berger for his analyzis+report of this issue
diff --git a/jbatch/pom.xml b/jbatch/pom.xml
index 52d59f4..c2e5535 100644
--- a/jbatch/pom.xml
+++ b/jbatch/pom.xml
@@ -251,6 +251,8 @@
and it avoids to spread it over all our executions
-->
<systemProperties>
+ <openejb.additional.exclude>com.ibm</openejb.additional.exclude>
+
<batchee.service-manager.log>true</batchee.service-manager.log>
<!-- replace properties file -->
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 6d72ccd..fad9406 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -26,7 +26,6 @@
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
-import org.apache.batchee.container.util.TCCLObjectInputStream;
import org.apache.batchee.jaxb.Chunk;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
@@ -215,7 +214,6 @@
List<Object> chunkToWrite = new ArrayList<Object>();
Object itemRead;
Object itemProcessed;
- int readProcessedCount = 0;
while (true) {
currentItemStatus = new SingleItemStatus();
@@ -641,14 +639,14 @@
private void updateNormalMetrics(int writeCount) {
int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
- if (currentChunkStatus.isFinished()) {
+ if (currentChunkStatus.isFinished() && !BatchStatus.STOPPING.equals(stepContext.getBatchStatus())) {
readCount--;
}
int filterCount = readCount - writeCount;
if (readCount < 0 || filterCount < 0 || writeCount < 0) {
- throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
- ", Filter count: " + filterCount + ", Write count: " + writeCount);
+ throw new IllegalStateException("Somehow one of the metrics was less than zero. " +
+ "Read count: " + readCount + ", Filter count: " + filterCount + ", Write count: " + writeCount);
}
stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValueBy(readCount);
@@ -1020,7 +1018,6 @@
// check for data in backing store
if (writerData != null) {
byte[] writertoken = writerData.getRestartToken();
- TCCLObjectInputStream writerOIS;
try {
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (Exception ex) {
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
index f98eedd..bbf1465 100644
--- a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class SimpleJobExecutionCallbackService implements JobExecutionCallbackService {
private final ConcurrentMap<Long, Collection<CountDownLatch>> waiters = new ConcurrentHashMap<Long, Collection<CountDownLatch>>();
@@ -54,18 +55,18 @@
toRelease = existing;
}
}
-
- // check before blocking
- final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
- if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
- waiters.remove(id);
+ if (checkIsDone(id)) {
return;
}
final CountDownLatch latch = new CountDownLatch(1);
toRelease.add(latch);
try {
- latch.await();
+ while (!latch.await(1, TimeUnit.SECONDS)) {
+ if (checkIsDone(id)) {
+ return;
+ }
+ }
waiters.remove(id);
} catch (final InterruptedException e) {
throw new BatchContainerRuntimeException(e);
@@ -76,4 +77,14 @@
public void init(final Properties batchConfig) {
// no-op
}
+
+ private boolean checkIsDone(final long id) {
+ // check before blocking
+ final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
+ if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
+ waiters.remove(id);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
index 962c044..35a6252 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/factory/DefaultBatchArtifactFactory.java
@@ -28,6 +28,7 @@
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,14 +58,18 @@
try {
final Class<?> artifactClass = tccl.loadClass(batchId);
if (artifactClass != null) {
- loadedArtifact = artifactClass.newInstance();
+ loadedArtifact = artifactClass.getConstructor().newInstance();
}
} catch (final ClassNotFoundException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+ } catch (final NoSuchMethodException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final InstantiationException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final IllegalAccessException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
+ } catch (final InvocationTargetException e) {
+ throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e.getCause());
}
}
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/transaction/NoTxMgrBatchTransactionService.java b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/NoTxMgrBatchTransactionService.java
new file mode 100755
index 0000000..8454521
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/NoTxMgrBatchTransactionService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.batchee.container.services.transaction;
+
+import java.util.Properties;
+
+import javax.batch.runtime.context.StepContext;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.exception.TransactionManagementException;
+import org.apache.batchee.spi.TransactionManagementService;
+import org.apache.batchee.spi.TransactionManagerAdapter;
+
+public class NoTxMgrBatchTransactionService implements TransactionManagementService {
+ private final TransactionManagerAdapter adapter = new DefaultNonTransactionalManager();
+
+ @Override
+ public void init(final Properties batchConfig) throws BatchContainerServiceException {
+ // no-op
+ }
+
+ @Override
+ public TransactionManagerAdapter getTransactionManager(final StepContext stepContext) throws TransactionManagementException {
+ return adapter;
+ }
+}
diff --git a/jbatch/src/test/java/org/apache/batchee/container/impl/ChunkStepControllerTest.java b/jbatch/src/test/java/org/apache/batchee/container/impl/ChunkStepControllerTest.java
new file mode 100644
index 0000000..3692972
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/container/impl/ChunkStepControllerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.batchee.container.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.JobExecution;
+import javax.batch.runtime.StepExecution;
+
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
+import org.apache.batchee.container.services.transaction.NoTxMgrBatchTransactionService;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.spi.TransactionManagementService;
+import org.apache.batchee.util.Batches;
+import org.testng.annotations.Test;
+
+public class ChunkStepControllerTest {
+ @Test
+ public void earlyStop() throws InterruptedException {
+ final JobOperator operator = new JobOperatorImpl(new ServicesManager() {{
+ init(new Properties() {{
+ setProperty(PersistenceManagerService.class.getSimpleName(), MemoryPersistenceManagerService.class.getName());
+ setProperty(TransactionManagementService.class.getSimpleName(), NoTxMgrBatchTransactionService.class.getName());
+ }});
+ }});
+
+ final long id = operator.start("stop-chunk", new Properties());
+ Reader.LATCH.await();
+ operator.stop(id);
+ Batches.waitFor(operator, id);
+ final List<StepExecution> stepExecutions = operator.getStepExecutions(id);
+ final StepExecution stepExecution = stepExecutions.iterator().next();
+ assertEquals("STOPPED", stepExecution.getExitStatus()); // before (BATCHEE-138) it was FAILED
+ }
+
+ public static class Reader extends AbstractItemReader {
+ static final CountDownLatch LATCH = new CountDownLatch(8);
+
+ @Override
+ public Object readItem() throws Exception {
+ LATCH.countDown();
+ return new Object();
+ }
+ }
+
+ public static class Writer extends AbstractItemWriter {
+ @Override
+ public void writeItems(final List<Object> list) {
+ // no-op
+ }
+ }
+}
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/stop-chunk.xml b/jbatch/src/test/resources/META-INF/batch-jobs/stop-chunk.xml
new file mode 100644
index 0000000..e467480
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/stop-chunk.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="stop-chunk" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+ <step id="test">
+ <chunk>
+ <reader ref="org.apache.batchee.container.impl.ChunkStepControllerTest$Reader" />
+ <writer ref="org.apache.batchee.container.impl.ChunkStepControllerTest$Writer" />
+ </chunk>
+ </step>
+</job>