BATCHEE-88 improve handling in case of error during step commit
diff --git a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
index acdbd51..564c7a7 100644
--- a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
+++ b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
@@ -19,6 +19,10 @@
 import javax.batch.operations.JobOperator;
 import javax.batch.runtime.BatchRuntime;
 import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.Metric;
+import javax.batch.runtime.StepExecution;
+
+import java.util.List;
 
 import org.apache.batchee.util.Batches;
 import org.testng.Assert;
@@ -30,8 +34,27 @@
     @Test
     public void testRolledBackDuringWork() {
         final JobOperator jobOperator = BatchRuntime.getJobOperator();
-        BatchStatus batchStatus = Batches.waitFor(jobOperator, jobOperator.start("txtest1", null));
-        Assert.assertEquals(batchStatus, BatchStatus.COMPLETED);
-        Assert.assertEquals(TxErrorWriter1.written.intValue(), 5);
+        long executionId = jobOperator.start("txtest1", null);
+        BatchStatus batchStatus = Batches.waitFor(jobOperator, executionId);
+        Assert.assertEquals(batchStatus, BatchStatus.FAILED);
+        Assert.assertEquals(TxErrorWriter1.written.intValue(), 3);
+
+        List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
+        Assert.assertEquals(stepExecutions.size(), 1);
+        StepExecution stepExecution = stepExecutions.get(0);
+        Metric[] metrics = stepExecution.getMetrics();
+        assertMetric(Metric.MetricType.READ_COUNT, 2, metrics);
+        assertMetric(Metric.MetricType.WRITE_COUNT, 2, metrics);
+        assertMetric(Metric.MetricType.ROLLBACK_COUNT, 1, metrics);
+    }
+
+    private void assertMetric(Metric.MetricType metricType, long expected, Metric[] metrics) {
+        for (Metric metric : metrics) {
+            if (metricType.equals(metric.getType())) {
+                Assert.assertEquals(metric.getValue(), expected);
+                return;
+            }
+        }
+        Assert.fail("MetricType " + metricType + " not in collected metrics");
     }
 }
diff --git a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
index 4b7a67c..ba6809f 100644
--- a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
+++ b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
@@ -18,9 +18,6 @@
         <chunk item-count="1">
             <reader ref="org.apache.batchee.its.transaction.TxErrorReader"></reader>
             <writer ref="org.apache.batchee.its.transaction.TxErrorWriter1"></writer>
-            <skippable-exception-classes>
-                <include class="java.lang.Exception"/> <!-- all exceptions... -->
-            </skippable-exception-classes>
         </chunk>
     </step>
 </job>
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index d07d858..63381aa 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -359,6 +359,11 @@
     }

 

     protected void persistUserData() {

+        PersistentDataWrapper userData = resolveUserData();

+        storeUserData(userData);

+    }

+

+    protected PersistentDataWrapper resolveUserData() {

         final ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();

         final ObjectOutputStream persistentDataOOS;

 

@@ -370,8 +375,16 @@
             throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);

         }

 

-        stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));

-        statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);

+        return new PersistentDataWrapper(persistentBAOS.toByteArray());

+    }

+

+    protected void storeUserData(PersistentDataWrapper userData) {

+        try {

+            stepStatus.setPersistentUserData(userData);

+            statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);

+        } catch (final Exception e) {

+            throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);

+        }

     }

 

     protected void persistExitStatusAndEndTimestamp() {

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 7e07450..584d737 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -16,6 +16,9 @@
 */

 package org.apache.batchee.container.impl.controller.chunk;

 

+import java.util.HashMap;

+import java.util.Map;

+

 import org.apache.batchee.container.exception.BatchContainerRuntimeException;

 import org.apache.batchee.container.exception.BatchContainerServiceException;

 import org.apache.batchee.spi.DataRepresentationService;

@@ -75,28 +78,45 @@
         }

     }

 

-    public void checkpoint() {

+    /**

+     * Takes the current checkpoint data from the ItemReader and ItemWriter

+     * and store them in the database

+     */

+    public Map<CheckpointDataKey, CheckpointData> prepareCheckpoints() {

         final CheckpointDataKey readerChkptDK;

         final CheckpointDataKey writerChkptDK;

+        Map<CheckpointDataKey, CheckpointData> checkpoints = new HashMap<CheckpointDataKey, CheckpointData>(2);

         try {

             byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo());

             CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);

             readerChkptData.setRestartToken(checkpointBytes);

             readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);

 

-            persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);

+            checkpoints.put(readerChkptDK, readerChkptData);

 

             checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo());

             CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);

             writerChkptData.setRestartToken(checkpointBytes);

             writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);

 

-            persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);

-

+            checkpoints.put(writerChkptDK, writerChkptData);

         } catch (final Exception ex) {

             // is this what I should be throwing here?

             throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);

         }

+

+        return checkpoints;

+    }

+

+    public void storeCheckPoints(Map<CheckpointDataKey, CheckpointData> checkpoints) {

+        try {

+            for (Map.Entry<CheckpointDataKey, CheckpointData> checkpointEntry : checkpoints.entrySet()) {

+                persistenceManagerService.setCheckpointData(checkpointEntry.getKey(), checkpointEntry.getValue());

+            }

+        } catch (final Exception ex) {

+            throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);

+        }

+

     }

 

     public int checkpointTimeout() {

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index ba256fa..6d72ccd 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -48,10 +48,14 @@
 import javax.batch.api.chunk.listener.SkipProcessListener;

 import javax.batch.api.chunk.listener.SkipReadListener;

 import javax.batch.api.chunk.listener.SkipWriteListener;

+import javax.batch.operations.BatchRuntimeException;

 import javax.batch.runtime.BatchStatus;

+import javax.transaction.Status;

+

 import java.io.Serializable;

 import java.util.ArrayList;

 import java.util.List;

+import java.util.Map;

 import java.util.Properties;

 import java.util.concurrent.BlockingQueue;

 import java.util.logging.Level;

@@ -585,11 +589,22 @@
                     chunkProxy.afterChunk();

                 }

 

-                checkpointManager.checkpoint();

-

-                this.persistUserData();

-

-                transactionManager.commit();

+                Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints();

+                PersistentDataWrapper userData = resolveUserData();

+                try {

+                    transactionManager.commit();

+                    storeUserData(userData);

+                    checkpointManager.storeCheckPoints(checkpoints);

+                } catch (Exception e) {

+                    // only set the Exception if we didn't blow up before anyway

+                    if (this.stepContext.getException() != null) {

+                        this.stepContext.setException(e);

+                    }

+                    if (e instanceof BatchRuntimeException) {

+                        throw e;

+                    }

+                    throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e);

+                }

 

                 checkpointManager.endCheckpoint();

 

@@ -659,14 +674,15 @@
      */

     private void rollback(final Throwable t) {

         try {

-            // ignore, we blow up anyway

-            transactionManager.setRollbackOnly();

             try {

                 doClose();

             } catch (Exception e) {

                 // ignore, we blow up anyway

             }

 

+            // ignore, we blow up anyway

+            transactionManager.setRollbackOnly();

+

             if (t instanceof Exception) {

                 Exception e = (Exception) t;

                 for (ChunkListener chunkProxy : chunkListeners) {

@@ -681,7 +697,10 @@
             // ever come up in the spec, but seems marginally more useful.

             stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

         } finally {

-            transactionManager.rollback();

+            int txStatus = transactionManager.getStatus();

+            if (txStatus == Status.STATUS_ACTIVE || txStatus == Status.STATUS_MARKED_ROLLBACK) {

+                transactionManager.rollback();

+            }

             throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);

         }

     }