BATCHEE-54 BATCHEE-69 applying Scott Kurz's patches, reexecution of partition after a restart and fixes regarding the spec in ChunkStepController
diff --git a/jbatch/pom.xml b/jbatch/pom.xml
index d59f0fe..39e13fd 100644
--- a/jbatch/pom.xml
+++ b/jbatch/pom.xml
@@ -97,7 +97,7 @@
     <dependency>
       <groupId>com.ibm.jbatch.tck</groupId>
       <artifactId>com.ibm.jbatch.tck</artifactId>
-      <version>1.1-b02</version>
+      <version>1.1-b03</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>
@@ -108,7 +108,6 @@
           <groupId>javax.inject</groupId>
           <artifactId>javax.inject</artifactId>
         </exclusion>
-        <!-- SNAPSHOTS -->
         <exclusion>
           <groupId>com.ibm.jbatch</groupId>
           <artifactId>com.ibm.jbatch.spi</artifactId>
@@ -118,7 +117,7 @@
     <dependency>
       <groupId>com.ibm.jbatch.tck</groupId>
       <artifactId>com.ibm.jbatch.tck.spi</artifactId>
-      <version>1.1-b02</version>
+      <version>1.1-b03</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 16ed31d..d07d858 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -67,12 +67,16 @@
 

     protected StepContextImpl stepContext;

     protected Step step;

+    protected String stepName;

     protected StepStatus stepStatus;

 

     protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;

 

     protected long rootJobExecutionId;

 

+    // Restart of partitioned steps needs to be handled specially

+    protected boolean restartAfterCompletion = false;

+

     protected final BatchKernelService kernelService;

     protected final PersistenceManagerService persistenceManagerService;

     private final JobStatusManagerService statusManagerService;

@@ -90,6 +94,7 @@
             throw new IllegalArgumentException("Step parameter to ctor cannot be null.");

         }

         this.step = step;

+        this.stepName = step.getId();

 

         this.txService = servicesManager.service(TransactionManagementService.class);

         this.kernelService = servicesManager.service(BatchKernelService.class);

@@ -311,6 +316,8 @@
             // boolean, but it should default to 'false', which is the spec'd default.

             if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {

                 return false;

+            } else {

+                restartAfterCompletion = true;

             }

         }

 

@@ -340,6 +347,9 @@
         return true;

     }

 

+    protected boolean isRestartExecution() {

+        return stepStatus.getStartCount() > 1;

+    }

 

     protected void statusStarting() {

         stepStatus.setBatchStatus(BatchStatus.STARTING);

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index e072a73..c008523 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -71,6 +71,27 @@
 

     private PartitionReducer partitionReducerProxy = null;

 

+    private enum ExecutionType {

+      /**

+       * First execution of this step for the job instance (among all job executions)

+       */

+      START,

+      /**

+       * Step previously executed but did not complete successfully, override=false so continue from previous partitions' checkpoints, etc.

+       */

+      RESTART_NORMAL,

+      /**

+       * Step previously executed but did not complete successfully, override=true so start with an entire set of new partitions, checkpoints, etc.

+       */

+      RESTART_OVERRIDE,

+      /**

+       * Step previously completed, but we are re-executing with an entire set of new partitions, checkpoints, etc.

+       */

+      RESTART_AFTER_COMPLETION

+    }

+

+    private ExecutionType executionType = null;

+

     // On invocation this will be re-primed to reflect already-completed partitions from a previous execution.

     int numPreviouslyCompleted = 0;

 

@@ -247,6 +268,37 @@
         return plan;

     }

 

+    private void calculateExecutionType() {

+        // We want to ignore override on the initial execution

+        if (isRestartExecution()) {

+            if (restartAfterCompletion) {

+                executionType = ExecutionType.RESTART_AFTER_COMPLETION;

+            } else if (plan.getPartitionsOverride()) {

+                executionType = ExecutionType.RESTART_OVERRIDE;

+            } else {

+                executionType = ExecutionType.RESTART_NORMAL;

+            }

+        } else {

+            executionType = ExecutionType.START;

+        }

+    }

+

+    private void validateNumberOfPartitions() {

+

+        int currentPlanSize = plan.getPartitions();

+

+        if (executionType == ExecutionType.RESTART_NORMAL) {

+            int previousPlanSize = stepStatus.getNumPartitions();

+            if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) {

+                String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " +

+                        "executions' plan specified a different number: " + previousPlanSize + " # of partitions.  Failing job.";

+                throw new IllegalStateException(msg);

+            }

+        }

+

+        //persist the partition plan so on restart we have the same plan to reuse

+        stepStatus.setNumPartitions(currentPlanSize);

+    }

 

     @Override

     protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

@@ -255,6 +307,9 @@
 

         //persist the partition plan so on restart we have the same plan to reuse

         stepStatus.setNumPartitions(plan.getPartitions());

+        calculateExecutionType();

+

+        validateNumberOfPartitions();

 

         /* When true is specified, the partition count from the current run

          * is used and all results from past partitions are discarded. Any

@@ -263,7 +318,7 @@
          * rollbackPartitionedStep method is invoked during restart before any

          * partitions begin processing to provide a cleanup hook.

          */

-        if (plan.getPartitionsOverride()) {

+        if (executionType == ExecutionType.RESTART_OVERRIDE) {

             if (this.partitionReducerProxy != null) {

                 try {

                     this.partitionReducerProxy.rollbackPartitionedStep();

@@ -303,9 +358,14 @@
             PartitionsBuilderConfig config =

                     new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());

             // Then build all the subjobs but do not start them yet

-            if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {

+            if (executionType == ExecutionType.RESTART_NORMAL) {

                 parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);

             } else {

+                // This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION.

+                //

+                // So we're just going to create new "subjob" job instances in the DB in these cases,

+                // and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart

+                // (of the current execution which is itself a restart)

                 parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);

             }

 

@@ -324,6 +384,11 @@
         int numCurrentCompleted = 0;

         int numCurrentSubmitted = 0;

 

+        // All partitions have already completed on a previous execution.

+        if (numTotalForThisExecution == 0) {

+          return;

+        }

+

         //Start up to to the max num we are allowed from the num threads attribute

         for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {

             final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
deleted file mode 100755
index 49df47e..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*

- * Copyright 2012 International Business Machines Corp.

- * 

- * See the NOTICE file distributed with this work for additional information

- * regarding copyright ownership. Licensed under the Apache License, 

- * Version 2.0 (the "License"); you may not use this file except in compliance

- * with the License. You may obtain a copy of the License at

- * 

- *   http://www.apache.org/licenses/LICENSE-2.0

- * 

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

-*/

-package org.apache.batchee.container.impl.controller.chunk;

-

-

-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;

-import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;

-import org.apache.batchee.container.proxy.InjectionReferences;

-import org.apache.batchee.container.proxy.ProxyFactory;

-import org.apache.batchee.jaxb.Chunk;

-import org.apache.batchee.jaxb.Step;

-import org.apache.batchee.spi.BatchArtifactFactory;

-

-public final class CheckpointAlgorithmFactory {

-    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences,

-                                                                       final RuntimeJobExecution jobExecution) {

-        final Chunk chunk = step.getChunk();

-        final String checkpointType = chunk.getCheckpointPolicy();

-        final CheckpointAlgorithmProxy proxy;

-        if ("custom".equalsIgnoreCase(checkpointType)) {

-            proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, jobExecution);

-        } else /* "item" */ {

-            proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());

-        }

-        return proxy;

-

-    }

-

-    private CheckpointAlgorithmFactory() {

-        // no-op

-    }

-}

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index b8163e7..7e07450 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -51,6 +51,22 @@
         this.dataRepresentationService = dataRepresentationService;

     }

 

+    public void beginCheckpoint() {

+        try {

+            this.checkpointAlgorithm.beginCheckpoint();

+        } catch (final Exception e) {

+            throw new BatchContainerRuntimeException("Checkpoint algorithm beginCheckpoint() failed", e);

+        }

+    }

+

+    public void endCheckpoint() {

+        try {

+            this.checkpointAlgorithm.endCheckpoint();

+        } catch (final Exception e) {

+            throw new BatchContainerRuntimeException("Checkpoint algorithm endCheckpoint() failed", e);

+        }

+    }

+

     public boolean applyCheckPointPolicy() {

         try {

             return checkpointAlgorithm.isReadyToCheckpoint();

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
index 2e24855..c70676f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
@@ -48,19 +48,20 @@
         return timeLimit;

     }

 

-    public static String getCheckpointPolicy(Chunk chunk) {

+    public static boolean isCustomCheckpointPolicy(Chunk chunk) {

         String checkpointPolicy = chunk.getCheckpointPolicy();

 

         if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {

-            if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {

+            if (checkpointPolicy.equals("item")) {

+                return false;

+            } else if (checkpointPolicy.equals("custom")) {

+                return true;

+            } else {

                 throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");

             }

         } else {

-            checkpointPolicy = "item";

+            return false;

         }

-

-        chunk.setCheckpointPolicy(checkpointPolicy);

-        return checkpointPolicy;

     }

 

     public static int getSkipLimit(Chunk chunk) {

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 19b1a9e..ba256fa 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -22,7 +22,6 @@
 import org.apache.batchee.container.impl.StepContextImpl;

 import org.apache.batchee.container.impl.controller.SingleThreadedStepController;

 import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;

-import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;

 import org.apache.batchee.container.proxy.InjectionReferences;

 import org.apache.batchee.container.proxy.ProxyFactory;

 import org.apache.batchee.container.services.ServicesManager;

@@ -53,6 +52,7 @@
 import java.io.Serializable;

 import java.util.ArrayList;

 import java.util.List;

+import java.util.Properties;

 import java.util.concurrent.BlockingQueue;

 import java.util.logging.Level;

 import java.util.logging.Logger;

@@ -61,6 +61,8 @@
 

     private final static Logger logger = Logger.getLogger(ChunkStepController.class.getName());

 

+    protected static final int DEFAULT_TRAN_TIMEOUT_SECONDS = 180;  // From the spec Sec. 9.7

+

     private final PersistenceManagerService persistenceManagerService;

     private final BatchArtifactFactory artifactFactory;

     private final DataRepresentationService dataRepresentationService;

@@ -69,8 +71,6 @@
     private ItemReader readerProxy = null;

     private ItemProcessor processorProxy = null;

     private ItemWriter writerProxy = null;

-    private CheckpointAlgorithmProxy checkpointProxy = null;

-    private CheckpointAlgorithm chkptAlg = null;

     private CheckpointManager checkpointManager;

     private SkipHandler skipHandler = null;

     private CheckpointDataKey readerChkptDK = null;

@@ -81,7 +81,14 @@
     private List<ItemWriteListener> itemWriteListeners = null;

     private RetryHandler retryHandler;

 

-    private boolean rollbackRetry = false;

+    protected ChunkStatus currentChunkStatus;

+    protected SingleItemStatus currentItemStatus;

+

+    // Default is item-based policy

+    protected boolean customCheckpointPolicy = false;

+    protected Integer checkpointAtThisItemCount = null;  // Default to spec value elsewhere.

+

+    protected int stepPropertyTranTimeoutSeconds = DEFAULT_TRAN_TIMEOUT_SECONDS;

 

     public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,

                                final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,

@@ -93,9 +100,10 @@
     }

 

     /**

-     * Utility Class to hold statuses at each level of Read-Process-Write loop

+     * Utility Class to hold status for a single item as the read-process portion of

+     * the chunk loop interact.

      */

-    private class ItemStatus {

+    private class SingleItemStatus  {

 

         public boolean isSkipped() {

             return skipped;

@@ -113,12 +121,67 @@
             this.filtered = filtered;

         }

 

-        public boolean isCheckPointed() {

-            return checkPointed;

+        private boolean skipped = false;

+        private boolean filtered = false;

+    }

+

+    private enum ChunkStatusType {

+        NORMAL, RETRY_AFTER_ROLLBACK

+    }

+

+    /**

+     * Utility Class to hold status for the chunk as a whole.

+     *

+     * One key usage is to maintain the state reflecting the sequence in which

+     * we catch a retryable exception, rollback the previous chunk, process 1-item-at-a-time

+     * until we reach "where we left off", then revert to normal chunk processing.

+     *

+     * Another usage is simply to communicate that the reader readItem() returned 'null', so

+     * we're done the chunk.

+     */

+    private class ChunkStatus {

+        ChunkStatusType type;

+

+        ChunkStatus() {

+            this(ChunkStatusType.NORMAL);

         }

 

-        public void setCheckPointed(boolean checkPointed) {

-            this.checkPointed = checkPointed;

+        ChunkStatus(ChunkStatusType type) {

+            this.type = type;

+        }

+

+        public boolean isRetryingAfterRollback() {

+            return type == ChunkStatusType.RETRY_AFTER_ROLLBACK;

+        }

+

+        public boolean wasMarkedForRollbackWithRetry() {

+            return markedForRollbackWithRetry;

+        }

+

+        public Exception getRetryableException() {

+            return retryableException;

+        }

+

+        public void markForRollbackWithRetry(Exception retryableException) {

+            this.markedForRollbackWithRetry = true;

+            this.retryableException = retryableException;

+        }

+

+        public int getItemsTouchedInCurrentChunk() {

+            return itemsTouchedInCurrentChunk;

+        }

+

+        public void incrementItemsTouchedInCurrentChunk() {

+            this.itemsTouchedInCurrentChunk++;

+        }

+

+        public int getItemsToProcessOneByOneAfterRollback() {

+            return itemsToProcessOneByOneAfterRollback;

+        }

+

+        public void setItemsToProcessOneByOneAfterRollback(

+                int itemsToProcessOneByOneAfterRollback) {

+            this.itemsToProcessOneByOneAfterRollback = itemsToProcessOneByOneAfterRollback;

         }

 

         public boolean isFinished() {

@@ -129,24 +192,11 @@
             this.finished = finished;

         }

 

-        public void setRetry(boolean ignored) {

-            // no-op

-        }

-

-        public boolean isRollback() {

-            return rollback;

-        }

-

-        public void setRollback(boolean rollback) {

-            this.rollback = rollback;

-        }

-

-        private boolean skipped = false;

-        private boolean filtered = false;

         private boolean finished = false;

-        private boolean checkPointed = false;

-        private boolean rollback = false;

-

+        private Exception retryableException = null;

+        private boolean markedForRollbackWithRetry = false;

+        private int itemsTouchedInCurrentChunk = 0;

+        private int itemsToProcessOneByOneAfterRollback = 0; // For retry with rollback

     }

 

     /**

@@ -155,65 +205,57 @@
      * reader (not more items to read), or the writer buffer is full or a

      * checkpoint is triggered.

      *

-     * @param chunkSize write buffer size

-     * @param theStatus flags when the read-process reached the last record or a

-     *                  checkpoint is required

      * @return an array list of objects to write

      */

-    private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {

+    private List<Object> readAndProcess() {

         List<Object> chunkToWrite = new ArrayList<Object>();

         Object itemRead;

         Object itemProcessed;

         int readProcessedCount = 0;

 

         while (true) {

-            ItemStatus status = new ItemStatus();

-            itemRead = readItem(status);

+            currentItemStatus = new SingleItemStatus();

+            currentChunkStatus.incrementItemsTouchedInCurrentChunk();

+            itemRead = readItem();

 

-            if (status.isRollback()) {

-                theStatus.setRollback(true);

-                // inc rollbackCount

-                stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+            if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {

                 break;

             }

 

-            if (!status.isSkipped() && !status.isFinished()) {

-                itemProcessed = processItem(itemRead, status);

+            if (!currentItemStatus.isSkipped() && !currentChunkStatus.isFinished()) {

+                itemProcessed = processItem(itemRead);

 

-                if (status.isRollback()) {

-                    theStatus.setRollback(true);

-                    // inc rollbackCount

-                    stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {

                     break;

                 }

 

-                if (!status.isSkipped() && !status.isFiltered()) {

+                if (!currentItemStatus.isSkipped() && !currentItemStatus.isFiltered()) {

                     chunkToWrite.add(itemProcessed);

-                    readProcessedCount++;

                 }

             }

 

-            theStatus.setFinished(status.isFinished());

-            theStatus.setCheckPointed(checkpointManager.applyCheckPointPolicy());

-

-            // This will force the current item to finish processing on a stop

-            // request

-            if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {

-                theStatus.setFinished(true);

-            }

-

-            // write buffer size reached

-            if ((readProcessedCount == chunkSize) && !("custom".equals(checkpointProxy.getCheckpointType()))) {

+            // Break out of the loop to deliver one-at-a-time processing after rollback.

+            // No point calling isReadyToCheckpoint(), we know we're done.  Let's not

+            // complicate the checkpoint algorithm to hold this logic, just break right here.

+            if (currentChunkStatus.isRetryingAfterRollback()) {

                 break;

             }

 

-            // checkpoint reached

-            if (theStatus.isCheckPointed()) {

+            // write buffer size reached

+            // This will force the current item to finish processing on a stop request

+            if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {

+                currentChunkStatus.setFinished(true);

+            }

+

+            // The spec, in Sec. 11.10, Chunk with Custom Checkpoint Processing, clearly

+            // outlines that this gets called even when we've already read a null (which

+            // arguably is pointless).   But we'll follow the spec.

+            if (checkpointManager.applyCheckPointPolicy()) {

                 break;

             }

 

             // last record in readerProxy reached

-            if (theStatus.isFinished()) {

+            if (currentChunkStatus.isFinished()) {

                 break;

             }

 

@@ -223,11 +265,10 @@
 

     /**

      * Reads an item from the reader

-     *

-     * @param status flags the current read status

+     *h

      * @return the item read

      */

-    private Object readItem(ItemStatus status) {

+    private Object readItem() {

         Object itemRead = null;

 

         try {

@@ -244,10 +285,7 @@
 

             // itemRead == null means we reached the end of

             // the readerProxy "resultset"

-            status.setFinished(itemRead == null);

-            if (!status.isFinished()) {

-                stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();

-            }

+            currentChunkStatus.setFinished(itemRead == null);

         } catch (Exception e) {

             stepContext.setException(e);

             for (ItemReadListener readListenerProxy : itemReadListeners) {

@@ -257,26 +295,16 @@
                     ExceptionConfig.wrapBatchException(e1);

                 }

             }

-            if (!rollbackRetry) {

+            if(!currentChunkStatus.isRetryingAfterRollback()) {

                 if (retryReadException(e)) {

-                    for (ItemReadListener readListenerProxy : itemReadListeners) {

-                        try {

-                            readListenerProxy.onReadError(e);

-                        } catch (Exception e1) {

-                            ExceptionConfig.wrapBatchException(e1);

-                        }

-                    }

-                    // if not a rollback exception, just retry the current item

                     if (!retryHandler.isRollbackException(e)) {

-                        itemRead = readItem(status);

+                        itemRead = readItem();

                     } else {

-                        status.setRollback(true);

-                        rollbackRetry = true;

-                        // inc rollbackCount

-                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                        // retry with rollback

+                        currentChunkStatus.markForRollbackWithRetry(e);

                     }

                 } else if (skipReadException(e)) {

-                    status.setSkipped(true);

+                    currentItemStatus.setSkipped(true);

                     stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();

 

                 } else {

@@ -285,16 +313,16 @@
             } else {

                 // coming from a rollback retry

                 if (skipReadException(e)) {

-                    status.setSkipped(true);

+                    currentItemStatus.setSkipped(true);

                     stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();

 

                 } else if (retryReadException(e)) {

                     if (!retryHandler.isRollbackException(e)) {

-                        itemRead = readItem(status);

+                        // retry without rollback

+                        itemRead = readItem();

                     } else {

-                        status.setRollback(true);

-                        // inc rollbackCount

-                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                        // retry with rollback

+                        currentChunkStatus.markForRollbackWithRetry(e);

                     }

                 } else {

                     throw new BatchContainerRuntimeException(e);

@@ -313,10 +341,9 @@
      * Process an item previously read by the reader

      *

      * @param itemRead the item read

-     * @param status   flags the current process status

      * @return the processed item

      */

-    private Object processItem(final Object itemRead, final ItemStatus status) {

+    private Object processItem(final Object itemRead) {

         Object processedItem = null;

 

         // if no processor defined for this chunk

@@ -334,9 +361,7 @@
             processedItem = processorProxy.processItem(itemRead);

 

             if (processedItem == null) {

-                // inc filterCount

-                stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();

-                status.setFiltered(true);

+                currentItemStatus.setFiltered(true);

             }

 

             for (final ItemProcessListener processListenerProxy : itemProcessListeners) {

@@ -350,78 +375,30 @@
                     ExceptionConfig.wrapBatchException(e1);

                 }

             }

-            if (!rollbackRetry) {

+            if(!currentChunkStatus.isRetryingAfterRollback()) {

                 if (retryProcessException(e, itemRead)) {

                     if (!retryHandler.isRollbackException(e)) {

-                        // call process listeners before and after the actual

-                        // process call

-                        for (ItemProcessListener processListenerProxy : itemProcessListeners) {

-                            try {

-                                processListenerProxy.beforeProcess(itemRead);

-                            } catch (Exception e1) {

-                                ExceptionConfig.wrapBatchException(e1);

-                            }

-                        }

-                        processedItem = processItem(itemRead, status);

-                        if (processedItem == null) {

-                            // inc filterCount

-                            stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();

-                            status.setFiltered(true);

-                        }

-

-                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {

-                            try {

-                                processListenerProxy.afterProcess(itemRead, processedItem);

-                            } catch (Exception e1) {

-                                ExceptionConfig.wrapBatchException(e1);

-                            }

-                        }

+                        processedItem = processItem(itemRead);

                     } else {

-                        status.setRollback(true);

-                        rollbackRetry = true;

-                        // inc rollbackCount

-                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                        currentChunkStatus.markForRollbackWithRetry(e);

                     }

                 } else if (skipProcessException(e, itemRead)) {

-                    status.setSkipped(true);

+                    currentItemStatus.setSkipped(true);

                     stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();

                 } else {

                     throw new BatchContainerRuntimeException(e);

                 }

             } else {

                 if (skipProcessException(e, itemRead)) {

-                    status.setSkipped(true);

+                    currentItemStatus.setSkipped(true);

                     stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();

                 } else if (retryProcessException(e, itemRead)) {

                     if (!retryHandler.isRollbackException(e)) {

-                        // call process listeners before and after the actual

-                        // process call

-                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {

-                            try {

-                                processListenerProxy.beforeProcess(itemRead);

-                            } catch (Exception e1) {

-                                ExceptionConfig.wrapBatchException(e1);

-                            }

-                        }

-                        processedItem = processItem(itemRead, status);

-                        if (processedItem == null) {

-                            // inc filterCount

-                            stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();

-                            status.setFiltered(true);

-                        }

-

-                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {

-                            try {

-                                processListenerProxy.afterProcess(itemRead, processedItem);

-                            } catch (Exception e1) {

-                                ExceptionConfig.wrapBatchException(e1);

-                            }

-                        }

+                        // retry without rollback

+                        processedItem = processItem(itemRead);

                     } else {

-                        status.setRollback(true);

-                        rollbackRetry = true;

-                        // inc rollbackCount

-                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                        // retry with rollback

+                        currentChunkStatus.markForRollbackWithRetry(e);

                     }

                 } else {

                     throw new BatchContainerRuntimeException(e);

@@ -440,7 +417,7 @@
      *

      * @param theChunk the array list with all items processed ready to be written

      */

-    private void writeChunk(List<Object> theChunk, ItemStatus status) {

+    private void writeChunk(List<Object> theChunk) {

         if (!theChunk.isEmpty()) {

             try {

 

@@ -454,7 +431,6 @@
                 for (ItemWriteListener writeListenerProxy : itemWriteListeners) {

                     writeListenerProxy.afterWrite(theChunk);

                 }

-                stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());

             } catch (Exception e) {

                 this.stepContext.setException(e);

                 for (ItemWriteListener writeListenerProxy : itemWriteListeners) {

@@ -464,15 +440,14 @@
                         ExceptionConfig.wrapBatchException(e1);

                     }

                 }

-                if (!rollbackRetry) {

+                if(!currentChunkStatus.isRetryingAfterRollback()) {

                     if (retryWriteException(e, theChunk)) {

                         if (!retryHandler.isRollbackException(e)) {

-                            writeChunk(theChunk, status);

+                            // retry without rollback

+                            writeChunk(theChunk);

                         } else {

-                            rollbackRetry = true;

-                            status.setRollback(true);

-                            // inc rollbackCount

-                            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                            // retry with rollback

+                            currentChunkStatus.markForRollbackWithRetry(e);

                         }

                     } else if (skipWriteException(e, theChunk)) {

                         stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);

@@ -485,13 +460,11 @@
                         stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);

                     } else if (retryWriteException(e, theChunk)) {

                         if (!retryHandler.isRollbackException(e)) {

-                            status.setRetry(true);

-                            writeChunk(theChunk, status);

+                            // retry without rollback

+                            writeChunk(theChunk);

                         } else {

-                            rollbackRetry = true;

-                            status.setRollback(true);

-                            // inc rollbackCount

-                            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+                            // retry with rollback

+                            currentChunkStatus.markForRollbackWithRetry(e);

                         }

                     } else {

                         throw new BatchContainerRuntimeException(e);

@@ -504,13 +477,63 @@
         }

     }

 

-    private void invokeChunk() {

-        int itemCount = ChunkHelper.getItemCount(chunk);

-        int timeInterval = ChunkHelper.getTimeLimit(chunk);

-        boolean checkPointed = true;

-        boolean rollback = false;

+    /**

+     * Prime the next chunk's ChunkStatus based on the previous one

+     * (if there was one), particularly taking into account retry-with-rollback

+     * and the one-at-a-time processing it entails.

+     * @return the upcoming chunk's ChunkStatus

+     */

+    private ChunkStatus getNextChunkStatusBasedOnPrevious() {

+        // If this is the first chunk

+        if (currentChunkStatus == null) {

+            return new ChunkStatus();

+        }

 

-        // begin new transaction at first iteration or after a checkpoint commit

+        ChunkStatus nextChunkStatus = null;

+

+        // At this point the 'current' status is the previous chunk's status.

+        if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {

+

+            // Re-position reader & writer

+            transactionManager.begin();

+            positionReaderAtCheckpoint();

+            positionWriterAtCheckpoint();

+            transactionManager.commit();

+

+            nextChunkStatus = new ChunkStatus(ChunkStatusType.RETRY_AFTER_ROLLBACK);

+

+            // What happens if we get a retry-with-rollback on a single item that we were processing

+            // after a prior retry with rollback?   We don't want to revert to normal processing

+            // after completing only the single item of the "single item chunk".  We want to complete

+            // the full portion of the original chunk.  So be careful to propagate this number if

+            // it already exists.

+            int numToProcessOneByOne = currentChunkStatus.getItemsToProcessOneByOneAfterRollback();

+            if (numToProcessOneByOne > 0) {

+                // Retry after rollback AFTER a previous retry after rollback

+                nextChunkStatus.setItemsToProcessOneByOneAfterRollback(numToProcessOneByOne);

+            } else {

+                // "Normal" (i.e. the first) retry after rollback.

+                nextChunkStatus.setItemsToProcessOneByOneAfterRollback(currentChunkStatus.getItemsTouchedInCurrentChunk());

+            }

+        } else if (currentChunkStatus.isRetryingAfterRollback()) {

+            // In this case the 'current' (actually the last) chunk was a single-item retry after rollback chunk,

+            // so we have to see if it's time to revert to normal processing.

+            int numToProcessOneByOne = currentChunkStatus.getItemsToProcessOneByOneAfterRollback();

+            if (numToProcessOneByOne == 1) {

+                // we're done, revert to normal

+                nextChunkStatus = new ChunkStatus();

+            } else {

+                nextChunkStatus = new ChunkStatus(ChunkStatusType.RETRY_AFTER_ROLLBACK);

+                nextChunkStatus.setItemsToProcessOneByOneAfterRollback(numToProcessOneByOne - 1);

+            }

+        } else {

+            nextChunkStatus = new ChunkStatus();

+        }

+

+        return nextChunkStatus;

+    }

+

+    private void invokeChunk() {

 

         try {

             transactionManager.begin();

@@ -523,101 +546,67 @@
         try {

             while (true) {

 

-                if (checkPointed || rollback) {

-                    if ("custom".equals(checkpointProxy.getCheckpointType())) {

-                        int newtimeOut = this.checkpointManager.checkpointTimeout();

-                        transactionManager.setTransactionTimeout(newtimeOut);

-                    }

+                currentChunkStatus = getNextChunkStatusBasedOnPrevious();

+

+                // Sequence surrounding beginCheckpoint() updated per MR

+                // https://java.net/bugzilla/show_bug.cgi?id=5873

+                setNextChunkTransactionTimeout();

+

+                // Remember we "wrap" the built-in item-count + time-limit "algorithm"

+                // in a CheckpointAlgorithm for ease in keeping the sequence consistent

+                checkpointManager.beginCheckpoint();

+

+                transactionManager.begin();

+

+                for (ChunkListener chunkProxy : chunkListeners) {

+                    chunkProxy.beforeChunk();

+                }

+

+                final List<Object> chunkToWrite = readAndProcess();

+

+                if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {

+                    rollbackAfterRetryableException();

+                    continue;

+                }

+

+                // MR 1.0 Rev A clarified we'd only write a chunk with at least one item.

+                // See, e.g. Sec 11.6 of Spec

+                if (chunkToWrite.size() > 0) {

+                    writeChunk(chunkToWrite);

+                }

+

+                if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {

+                    rollbackAfterRetryableException();

+

+                    continue;

+                }

+

+                for (ChunkListener chunkProxy : chunkListeners) {

+                    chunkProxy.afterChunk();

+                }

+

+                checkpointManager.checkpoint();

+

+                this.persistUserData();

+

+                transactionManager.commit();

+

+                checkpointManager.endCheckpoint();

+

+                invokeCollectorIfPresent();

+

+                updateNormalMetrics(chunkToWrite.size());

+

+                // exit loop when last record is written

+                if (currentChunkStatus.isFinished()) {

                     transactionManager.begin();

-                    for (ChunkListener chunkProxy : chunkListeners) {

-                        chunkProxy.beforeChunk();

-                    }

-

-                    if (rollback) {

-                        positionReaderAtCheckpoint();

-                        positionWriterAtCheckpoint();

-                        checkpointManager = new CheckpointManager(readerProxy, writerProxy,

-                            getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl

-                            .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);

-                    }

-                }

-

-                ItemStatus status = new ItemStatus();

-

-                if (rollback) {

-                    rollback = false;

-                }

-

-                final List<Object> chunkToWrite = readAndProcess(itemCount, status);

-

-                if (status.isRollback()) {

-                    itemCount = 1;

-                    rollback = true;

-

-                    doClose();

-

-                    transactionManager.rollback();

-

-                    continue;

-                }

-

-                writeChunk(chunkToWrite, status);

-

-                if (status.isRollback()) {

-                    itemCount = 1;

-                    rollback = true;

-

-                    doClose();

-

-                    transactionManager.rollback();

-

-                    continue;

-                }

-                checkPointed = status.isCheckPointed();

-

-                // we could finish the chunk in 3 conditions: buffer is full,

-                // checkpoint, not more input

-                if (status.isCheckPointed() || status.isFinished()) {

-                    // TODO: missing before checkpoint listeners

-                    // 1.- check if spec list proper steps for before checkpoint

-                    // 2.- ask Andy about retry

-                    // 3.- when do we stop?

-

-                    checkpointManager.checkpoint();

-

-                    for (ChunkListener chunkProxy : chunkListeners) {

-                        chunkProxy.afterChunk();

-                    }

-

-                    this.persistUserData();

-

-                    this.chkptAlg.beginCheckpoint();

-

-                    transactionManager.commit();

-

-                    this.chkptAlg.endCheckpoint();

-

-                    invokeCollectorIfPresent();

-

-                    // exit loop when last record is written

-                    if (status.isFinished()) {

-                        transactionManager.begin();

-

-                        if (doClose()) {

-                            transactionManager.commit();

-                            stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();

-                        } else {

-                            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

-                            transactionManager.rollback();

-                        }

-                        break;

+                    if (doClose()) {

+                        transactionManager.commit();

                     } else {

-                        // increment commitCount

-                        stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();

+                        transactionManager.rollback();

                     }

-

+                    break;

                 }

-

             }

         } catch (final Exception e) {

             logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);

@@ -635,7 +624,24 @@
         }

     }

 

-    private boolean doClose() throws Exception {

+    private void updateNormalMetrics(int writeCount) {

+        int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();

+        if (currentChunkStatus.isFinished()) {

+            readCount--;

+        }

+        int filterCount = readCount - writeCount;

+

+        if (readCount < 0 || filterCount < 0 || writeCount < 0) {

+            throw new IllegalStateException("Somehow one of the metrics was zero.  Read count: " + readCount +

+                    ", Filter count: " + filterCount + ", Write count: " + writeCount);

+        }

+        stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();

+        stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValueBy(readCount);

+        stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValueBy(filterCount);

+        stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(writeCount);

+    }

+

+    private boolean doClose() {

         try {

             readerProxy.close();

             writerProxy.close();

@@ -646,45 +652,106 @@
         }

     }

 

+    /**

+     * Reflect spec order in Sec. 11.9 "Rollback Procedure".

+     *

+     * Also do final rollback in try-finally

+     */

     private void rollback(final Throwable t) {

-        transactionManager.setRollbackOnly();

         try {

-            doClose();

-        } catch (Exception e) {

             // ignore, we blow up anyway

+            transactionManager.setRollbackOnly();

+            try {

+                doClose();

+            } catch (Exception e) {

+                // ignore, we blow up anyway

+            }

+

+            if (t instanceof Exception) {

+                Exception e = (Exception) t;

+                for (ChunkListener chunkProxy : chunkListeners) {

+                    try {

+                        chunkProxy.onError(e);

+                    } catch (final Exception e1) {

+                        logger.log(Level.SEVERE, e1.getMessage(), e1);

+                    }

+                }

+            }

+            // Count non-retryable rollback against metric as well.  Not sure this has

+            // ever come up in the spec, but seems marginally more useful.

+            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+        } finally {

+            transactionManager.rollback();

+            throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);

         }

-        transactionManager.rollback();

-        throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);

     }

 

+    private void rollbackAfterRetryableException() throws Exception {

+        doClose();

+

+        for (ChunkListener chunkProxy : chunkListeners) {

+            try {

+                chunkProxy.onError(currentChunkStatus.getRetryableException());

+            } catch (final Exception e1) {

+                logger.log(Level.SEVERE, e1.getMessage(), e1);

+            }

+        }

+        transactionManager.rollback();

+

+        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();

+    }

+

+    @Override

     protected void invokeCoreStep() throws BatchContainerServiceException {

 

         this.chunk = step.getChunk();

 

         initializeChunkArtifacts();

 

+        initializeCheckpointManager();

+

         invokeChunk();

     }

 

-    private CheckpointAlgorithm getCheckpointAlgorithm(final int itemCount, final int timeInterval) {

-        final CheckpointAlgorithm alg;

-        if ("item".equals(checkpointProxy.getCheckpointType())) {

-            alg = new ItemCheckpointAlgorithm();

-            ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);

-        } else { // custom chkpt alg

-            alg = checkpointProxy;

+    private void initializeCheckpointManager() {

+        CheckpointAlgorithm checkpointAlgorithm = null;

+

+        checkpointAtThisItemCount = ChunkHelper.getItemCount(chunk);

+        int timeLimitSeconds = ChunkHelper.getTimeLimit(chunk);

+        customCheckpointPolicy = ChunkHelper.isCustomCheckpointPolicy(chunk);  // Supplies default if needed

+

+        if (!customCheckpointPolicy) {

+            ItemCheckpointAlgorithm ica = new ItemCheckpointAlgorithm();

+            ica.setItemCount(checkpointAtThisItemCount);

+            ica.setTimeLimitSeconds(timeLimitSeconds);

+            checkpointAlgorithm = ica;

+

+        } else {

+            final List<Property> propList;

+

+            if (chunk.getCheckpointAlgorithm() == null) {

+                throw new IllegalArgumentException("Configured checkpoint-policy of 'custom' but without a corresponding <checkpoint-algorithm> element.");

+            } else {

+                propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();

+            }

+

+            InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);

+            checkpointAlgorithm = ProxyFactory.createCheckpointAlgorithmProxy(artifactFactory, chunk.getCheckpointAlgorithm().getRef(), injectionRef, jobExecutionImpl);

         }

 

-        return alg;

+        // Finally, for both policies now

+        checkpointManager = new CheckpointManager(readerProxy, writerProxy, checkpointAlgorithm,

+                jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);

+

+        // A related piece of data we'll calculate here is the tran timeout.   Though we won't include

+        // it in the checkpoint manager since we'll set it directly on the tran mgr before each chunk.

+        stepPropertyTranTimeoutSeconds = initStepTransactionTimeout();

     }

 

     /*

      * Initialize itemreader, itemwriter, and item processor checkpoint

      */

     private void initializeChunkArtifacts() {

-        final int itemCount = ChunkHelper.getItemCount(chunk);

-        final int timeInterval = ChunkHelper.getTimeLimit(chunk);

-

         {

             final org.apache.batchee.jaxb.ItemReader itemReader = chunk.getReader();

             final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();

@@ -709,18 +776,6 @@
         }

 

         {

-            final List<Property> propList;

-            if (chunk.getCheckpointAlgorithm() != null) {

-                propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();

-            } else {

-                propList = null;

-            }

-

-            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);

-            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, jobExecutionImpl);

-        }

-

-        {

             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);

 

             this.chunkListeners = jobExecutionImpl.getListenerFactory().getListeners(ChunkListener.class, step, injectionRef, jobExecutionImpl);

@@ -740,16 +795,6 @@
             final List<RetryWriteListener> retryWriteListeners

                     = jobExecutionImpl.getListenerFactory().getListeners(RetryWriteListener.class, step, injectionRef, jobExecutionImpl);

 

-            if ("item".equals(checkpointProxy.getCheckpointType())) {

-                chkptAlg = new ItemCheckpointAlgorithm();

-                ItemCheckpointAlgorithm.class.cast(chkptAlg).setThresholds(itemCount, timeInterval);

-            } else { // custom chkpt alg

-                chkptAlg = checkpointProxy;

-            }

-

-            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(),

-                                                      persistenceManagerService, dataRepresentationService);

-

             skipHandler = new SkipHandler(chunk);

             skipHandler.addSkipProcessListener(skipProcessListeners);

             skipHandler.addSkipReadListener(skipReadListeners);

@@ -763,6 +808,43 @@
         }

     }

 

+    private void setNextChunkTransactionTimeout() {

+        int nextTimeout = 0;

+

+        if (customCheckpointPolicy) {

+            // Even on a retry-with-rollback, we'll continue to let

+            // the custom CheckpointAlgorithm set a tran timeout.

+            //

+            // We're guessing the application could need a smaller timeout than

+            // 180 seconds, (the default established by the batch chunk).

+            nextTimeout = this.checkpointManager.checkpointTimeout();

+        } else  {

+            nextTimeout = stepPropertyTranTimeoutSeconds;

+        }

+        transactionManager.setTransactionTimeout(nextTimeout);

+    }

+

+    /**

+     * Note we can rely on the StepContext properties already having been set at this point.

+     *

+     * @return global transaction timeout defined in step properties. default

+     */

+    private int initStepTransactionTimeout() {

+        Properties p = stepContext.getProperties();

+        int timeout = DEFAULT_TRAN_TIMEOUT_SECONDS; // default as per spec.

+        if (p != null && !p.isEmpty()) {

+

+            String propertyTimeOut = p.getProperty("javax.transaction.global.timeout");

+            if (logger.isLoggable(Level.FINE)) {

+                logger.log(Level.FINE, "javax.transaction.global.timeout = {0}", propertyTimeOut==null ? "<null>" : propertyTimeOut);

+            }

+            if (propertyTimeOut != null && !propertyTimeOut.isEmpty()) {

+                timeout = Integer.parseInt(propertyTimeOut, 10);

+            }

+        }

+        return timeout;

+    }

+

     private void openReaderAndWriter() {

         readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);

         CheckpointData readerChkptData = persistenceManagerService.getCheckpointData(readerChkptDK);

diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
index f95aef3..8467d61 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
@@ -19,16 +19,18 @@
 import javax.batch.api.chunk.CheckpointAlgorithm;

 

 public final class ItemCheckpointAlgorithm implements CheckpointAlgorithm {

-    private long requests = 0;

     private long checkpointBeginTime = 0;

+    private int itemsRead = 0;

 

     private int time;

     private int item;

-    private long currentTime;

 

-    public ItemCheckpointAlgorithm() {

-        checkpointBeginTime = System.currentTimeMillis();

-        currentTime = checkpointBeginTime;

+    public void setItemCount(int itemCount) {

+        this.item = itemCount;

+    }

+

+    public void setTimeLimitSeconds(int timeLimitSeconds) {

+        this.time = timeLimitSeconds;

     }

 

     @Override

@@ -37,18 +39,12 @@
     }

 

     public boolean isReadyToCheckpointItem() throws Exception {

-        requests++;

-

-        final boolean itemready = (requests >= item);

-        if (itemready) {

-            requests = 0;

-        }

-        return itemready;

+        return (itemsRead >= item);

     }

 

     public boolean isReadyToCheckpointTime() throws Exception {

         boolean timeready = false;

-        currentTime = System.currentTimeMillis();

+        final long currentTime = System.currentTimeMillis();

         final long curdiff = currentTime - checkpointBeginTime;

         final int diff = (int) curdiff / 1000;

 

@@ -66,6 +62,8 @@
     public boolean isReadyToCheckpoint() throws Exception {

         boolean ready = false;

 

+        itemsRead++;

+

         if (time == 0) { // no time limit, just check if item count has been reached

             if (isReadyToCheckpointItem()) {

                 ready = true;

@@ -77,14 +75,10 @@
         return ready;

     }

 

-    public void setThresholds(int itemthreshold, int timethreshold) {

-        item = itemthreshold;

-        time = timethreshold;

-    }

-

     @Override

     public void beginCheckpoint() throws Exception {

-        checkpointBeginTime = currentTime;

+        checkpointBeginTime = System.currentTimeMillis();

+        itemsRead = 0;

     }

 

     @Override

diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
index e2aac5b..512c892 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
@@ -17,35 +17,18 @@
 package org.apache.batchee.container.proxy;

 

 import org.apache.batchee.container.exception.BatchContainerRuntimeException;

-import org.apache.batchee.container.impl.controller.chunk.ItemCheckpointAlgorithm;

 

 import javax.batch.api.chunk.CheckpointAlgorithm;

 

 public class CheckpointAlgorithmProxy extends AbstractProxy<CheckpointAlgorithm> implements CheckpointAlgorithm {

-

-    private String checkpointType = null;

-

     /*

      * Allow this to be public as a special case so we can easily treat the built-in algorithms

      * as identical to custom ones.

      */

     public CheckpointAlgorithmProxy(final CheckpointAlgorithm delegate) {

         super(delegate);

-

-        if (delegate instanceof ItemCheckpointAlgorithm) {

-            checkpointType = "item";

-        } else {

-            checkpointType = "custom";

-        }

-

     }

 

-

-    public String getCheckpointType() {

-        return checkpointType;

-    }

-

-

     @Override

     public void beginCheckpoint() {

         try {

diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
index deaa029..2242736 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -185,6 +185,9 @@
         return batchWorkUnits;

     }

 

+    /*

+     * There are some assumptions that all partition subjobs have associated DB entries

+     */

     @Override

     public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config, final JobContextImpl jc, final StepContextImpl sc)

             throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

@@ -201,7 +204,7 @@
             final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];

 

             try {

-                final long execId = getMostRecentExecutionId(parallelJob);

+                final long execId = getMostRecentSubJobExecutionId(parallelJob);

                 final RuntimeJobExecution jobExecution;

                 try {

                     jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);

@@ -244,15 +247,13 @@
         return batchWork;

     }

 

-    private long getMostRecentExecutionId(final JSLJob jobModel) {

+    private long getMostRecentSubJobExecutionId(final JSLJob jobModel) {

 

-        //There can only be one instance associated with a subjob's id since it is generated from an unique

-        //job instance id. So there should be no way to directly start a subjob with particular

-        final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);

+        // Pick off the first, knowing the ordering.  There could be more than one.

+        final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 1);

 

-        // Maybe we should blow up on '0' too?

-        if (instanceIds.size() > 1) {

-            throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened.  Blowing up.");

+        if (instanceIds.size() == 0) {

+            throw new IllegalStateException("Did not find an entry for job name = " + jobModel.getId());

         }

 

         final List<InternalJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));

@@ -271,7 +272,7 @@
         throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

 

         final JSLJob jobModel = config.getJobModel();

-        final long execId = getMostRecentExecutionId(jobModel);

+        final long execId = getMostRecentSubJobExecutionId(jobModel);

         final RuntimeFlowInSplitExecution jobExecution;

         try {

             jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);

diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
index 991a919..67cac44 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
@@ -25,8 +25,6 @@
 import java.util.Properties;

 

 public class DefaultBatchTransactionService implements TransactionManagementService {

-    protected static final int DEFAULT_TRANSACTION_TIMEOUT = 180; // seconds

-

     protected Properties batchConfig = null;

 

     @Override

@@ -45,31 +43,8 @@
 

     @Override

     public TransactionManagerAdapter getTransactionManager(final StepContext stepContext) throws TransactionManagementException {

-        final TransactionManagerAdapter transactionManager = getTransactionManager();

-        try {

-            transactionManager.setTransactionTimeout(getTransactionTimeout(stepContext));

-        } catch (final Exception e) {

-            throw new TransactionManagementException(e);

-        }

-        return transactionManager;

-    }

-

-

-    /**

-     * @param stepContext current step context

-     * @return global transaction timeout defined in step properties. default

-     * timeout value is 180

-     */

-    private int getTransactionTimeout(final StepContext stepContext) {

-        final Properties p = stepContext.getProperties();

-        int timeout = DEFAULT_TRANSACTION_TIMEOUT; // default as per spec.

-        if (p != null && !p.isEmpty()) {

-            final String propertyTimeOut = p.getProperty("javax.transaction.global.timeout");

-            if (propertyTimeOut != null && !propertyTimeOut.isEmpty()) {

-                timeout = Integer.parseInt(propertyTimeOut, 10);

-            }

-        }

-        return timeout;

+        // Doesn't currently make use of stepContext but keeping signature

+        return  getTransactionManager();

     }

 

     @Override

diff --git a/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java b/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
index 20fe0f4..af51253 100644
--- a/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
+++ b/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
@@ -45,7 +45,7 @@
         int checked = 0;
         for (final Metric metric : metrics) {
             if (Metric.MetricType.ROLLBACK_COUNT == metric.getType()) {
-                assertEquals(metric.getValue(), 2);
+                assertEquals(metric.getValue(), 1);
                 checked++;
             } else if (Metric.MetricType.READ_SKIP_COUNT == metric.getType()) {
                 assertEquals(metric.getValue(), 1);