BATCHEE-54 BATCHEE-69 applying Scott Kurz's patches, reexecution of partition after a restart and fixes regarding the spec in ChunkStepController
diff --git a/jbatch/pom.xml b/jbatch/pom.xml
index d59f0fe..39e13fd 100644
--- a/jbatch/pom.xml
+++ b/jbatch/pom.xml
@@ -97,7 +97,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck</artifactId>
- <version>1.1-b02</version>
+ <version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -108,7 +108,6 @@
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</exclusion>
- <!-- SNAPSHOTS -->
<exclusion>
<groupId>com.ibm.jbatch</groupId>
<artifactId>com.ibm.jbatch.spi</artifactId>
@@ -118,7 +117,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck.spi</artifactId>
- <version>1.1-b02</version>
+ <version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index 16ed31d..d07d858 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -67,12 +67,16 @@
protected StepContextImpl stepContext;
protected Step step;
+ protected String stepName;
protected StepStatus stepStatus;
protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;
protected long rootJobExecutionId;
+ // Restart of partitioned steps needs to be handled specially
+ protected boolean restartAfterCompletion = false;
+
protected final BatchKernelService kernelService;
protected final PersistenceManagerService persistenceManagerService;
private final JobStatusManagerService statusManagerService;
@@ -90,6 +94,7 @@
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
+ this.stepName = step.getId();
this.txService = servicesManager.service(TransactionManagementService.class);
this.kernelService = servicesManager.service(BatchKernelService.class);
@@ -311,6 +316,8 @@
// boolean, but it should default to 'false', which is the spec'd default.
if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {
return false;
+ } else {
+ restartAfterCompletion = true;
}
}
@@ -340,6 +347,9 @@
return true;
}
+ protected boolean isRestartExecution() {
+ return stepStatus.getStartCount() > 1;
+ }
protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index e072a73..c008523 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -71,6 +71,27 @@
private PartitionReducer partitionReducerProxy = null;
+ private enum ExecutionType {
+ /**
+ * First execution of this step for the job instance (among all job executions)
+ */
+ START,
+ /**
+ * Step previously executed but did not complete successfully, override=false so continue from previous partitions' checkpoints, etc.
+ */
+ RESTART_NORMAL,
+ /**
+ * Step previously executed but did not complete successfully, override=true so start with an entire set of new partitions, checkpoints, etc.
+ */
+ RESTART_OVERRIDE,
+ /**
+ * Step previously completed, but we are re-executing with an entire set of new partitions, checkpoints, etc.
+ */
+ RESTART_AFTER_COMPLETION
+ }
+
+ private ExecutionType executionType = null;
+
// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
int numPreviouslyCompleted = 0;
@@ -247,6 +268,37 @@
return plan;
}
+ private void calculateExecutionType() {
+ // We want to ignore override on the initial execution
+ if (isRestartExecution()) {
+ if (restartAfterCompletion) {
+ executionType = ExecutionType.RESTART_AFTER_COMPLETION;
+ } else if (plan.getPartitionsOverride()) {
+ executionType = ExecutionType.RESTART_OVERRIDE;
+ } else {
+ executionType = ExecutionType.RESTART_NORMAL;
+ }
+ } else {
+ executionType = ExecutionType.START;
+ }
+ }
+
+ private void validateNumberOfPartitions() {
+
+ int currentPlanSize = plan.getPartitions();
+
+ if (executionType == ExecutionType.RESTART_NORMAL) {
+ int previousPlanSize = stepStatus.getNumPartitions();
+ if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) {
+ String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " +
+ "executions' plan specified a different number: " + previousPlanSize + " # of partitions. Failing job.";
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ //persist the partition plan so on restart we have the same plan to reuse
+ stepStatus.setNumPartitions(currentPlanSize);
+ }
@Override
protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
@@ -255,6 +307,9 @@
//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(plan.getPartitions());
+ calculateExecutionType();
+
+ validateNumberOfPartitions();
/* When true is specified, the partition count from the current run
* is used and all results from past partitions are discarded. Any
@@ -263,7 +318,7 @@
* rollbackPartitionedStep method is invoked during restart before any
* partitions begin processing to provide a cleanup hook.
*/
- if (plan.getPartitionsOverride()) {
+ if (executionType == ExecutionType.RESTART_OVERRIDE) {
if (this.partitionReducerProxy != null) {
try {
this.partitionReducerProxy.rollbackPartitionedStep();
@@ -303,9 +358,14 @@
PartitionsBuilderConfig config =
new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
- if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+ if (executionType == ExecutionType.RESTART_NORMAL) {
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
} else {
+ // This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION.
+ //
+ // So we're just going to create new "subjob" job instances in the DB in these cases,
+ // and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart
+ // (of the current execution which is itself a restart)
parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
}
@@ -324,6 +384,11 @@
int numCurrentCompleted = 0;
int numCurrentSubmitted = 0;
+ // All partitions have already completed on a previous execution.
+ if (numTotalForThisExecution == 0) {
+ return;
+ }
+
//Start up to to the max num we are allowed from the num threads attribute
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
deleted file mode 100755
index 49df47e..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.impl.controller.chunk;
-
-
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
-import org.apache.batchee.container.proxy.InjectionReferences;
-import org.apache.batchee.container.proxy.ProxyFactory;
-import org.apache.batchee.jaxb.Chunk;
-import org.apache.batchee.jaxb.Step;
-import org.apache.batchee.spi.BatchArtifactFactory;
-
-public final class CheckpointAlgorithmFactory {
- public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final Step step, final InjectionReferences injectionReferences,
- final RuntimeJobExecution jobExecution) {
- final Chunk chunk = step.getChunk();
- final String checkpointType = chunk.getCheckpointPolicy();
- final CheckpointAlgorithmProxy proxy;
- if ("custom".equalsIgnoreCase(checkpointType)) {
- proxy = ProxyFactory.createCheckpointAlgorithmProxy(factory, chunk.getCheckpointAlgorithm().getRef(), injectionReferences, jobExecution);
- } else /* "item" */ {
- proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
- }
- return proxy;
-
- }
-
- private CheckpointAlgorithmFactory() {
- // no-op
- }
-}
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index b8163e7..7e07450 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -51,6 +51,22 @@
this.dataRepresentationService = dataRepresentationService;
}
+ public void beginCheckpoint() {
+ try {
+ this.checkpointAlgorithm.beginCheckpoint();
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException("Checkpoint algorithm beginCheckpoint() failed", e);
+ }
+ }
+
+ public void endCheckpoint() {
+ try {
+ this.checkpointAlgorithm.endCheckpoint();
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException("Checkpoint algorithm endCheckpoint() failed", e);
+ }
+ }
+
public boolean applyCheckPointPolicy() {
try {
return checkpointAlgorithm.isReadyToCheckpoint();
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
index 2e24855..c70676f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
@@ -48,19 +48,20 @@
return timeLimit;
}
- public static String getCheckpointPolicy(Chunk chunk) {
+ public static boolean isCustomCheckpointPolicy(Chunk chunk) {
String checkpointPolicy = chunk.getCheckpointPolicy();
if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
- if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
+ if (checkpointPolicy.equals("item")) {
+ return false;
+ } else if (checkpointPolicy.equals("custom")) {
+ return true;
+ } else {
throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
}
} else {
- checkpointPolicy = "item";
+ return false;
}
-
- chunk.setCheckpointPolicy(checkpointPolicy);
- return checkpointPolicy;
}
public static int getSkipLimit(Chunk chunk) {
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index 19b1a9e..ba256fa 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -22,7 +22,6 @@
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.services.ServicesManager;
@@ -53,6 +52,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,6 +61,8 @@
private final static Logger logger = Logger.getLogger(ChunkStepController.class.getName());
+ protected static final int DEFAULT_TRAN_TIMEOUT_SECONDS = 180; // From the spec Sec. 9.7
+
private final PersistenceManagerService persistenceManagerService;
private final BatchArtifactFactory artifactFactory;
private final DataRepresentationService dataRepresentationService;
@@ -69,8 +71,6 @@
private ItemReader readerProxy = null;
private ItemProcessor processorProxy = null;
private ItemWriter writerProxy = null;
- private CheckpointAlgorithmProxy checkpointProxy = null;
- private CheckpointAlgorithm chkptAlg = null;
private CheckpointManager checkpointManager;
private SkipHandler skipHandler = null;
private CheckpointDataKey readerChkptDK = null;
@@ -81,7 +81,14 @@
private List<ItemWriteListener> itemWriteListeners = null;
private RetryHandler retryHandler;
- private boolean rollbackRetry = false;
+ protected ChunkStatus currentChunkStatus;
+ protected SingleItemStatus currentItemStatus;
+
+ // Default is item-based policy
+ protected boolean customCheckpointPolicy = false;
+ protected Integer checkpointAtThisItemCount = null; // Default to spec value elsewhere.
+
+ protected int stepPropertyTranTimeoutSeconds = DEFAULT_TRAN_TIMEOUT_SECONDS;
public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
@@ -93,9 +100,10 @@
}
/**
- * Utility Class to hold statuses at each level of Read-Process-Write loop
+ * Utility Class to hold status for a single item as the read-process portion of
+ * the chunk loop interact.
*/
- private class ItemStatus {
+ private class SingleItemStatus {
public boolean isSkipped() {
return skipped;
@@ -113,12 +121,67 @@
this.filtered = filtered;
}
- public boolean isCheckPointed() {
- return checkPointed;
+ private boolean skipped = false;
+ private boolean filtered = false;
+ }
+
+ private enum ChunkStatusType {
+ NORMAL, RETRY_AFTER_ROLLBACK
+ }
+
+ /**
+ * Utility Class to hold status for the chunk as a whole.
+ *
+ * One key usage is to maintain the state reflecting the sequence in which
+ * we catch a retryable exception, rollback the previous chunk, process 1-item-at-a-time
+ * until we reach "where we left off", then revert to normal chunk processing.
+ *
+ * Another usage is simply to communicate that the reader readItem() returned 'null', so
+ * we're done the chunk.
+ */
+ private class ChunkStatus {
+ ChunkStatusType type;
+
+ ChunkStatus() {
+ this(ChunkStatusType.NORMAL);
}
- public void setCheckPointed(boolean checkPointed) {
- this.checkPointed = checkPointed;
+ ChunkStatus(ChunkStatusType type) {
+ this.type = type;
+ }
+
+ public boolean isRetryingAfterRollback() {
+ return type == ChunkStatusType.RETRY_AFTER_ROLLBACK;
+ }
+
+ public boolean wasMarkedForRollbackWithRetry() {
+ return markedForRollbackWithRetry;
+ }
+
+ public Exception getRetryableException() {
+ return retryableException;
+ }
+
+ public void markForRollbackWithRetry(Exception retryableException) {
+ this.markedForRollbackWithRetry = true;
+ this.retryableException = retryableException;
+ }
+
+ public int getItemsTouchedInCurrentChunk() {
+ return itemsTouchedInCurrentChunk;
+ }
+
+ public void incrementItemsTouchedInCurrentChunk() {
+ this.itemsTouchedInCurrentChunk++;
+ }
+
+ public int getItemsToProcessOneByOneAfterRollback() {
+ return itemsToProcessOneByOneAfterRollback;
+ }
+
+ public void setItemsToProcessOneByOneAfterRollback(
+ int itemsToProcessOneByOneAfterRollback) {
+ this.itemsToProcessOneByOneAfterRollback = itemsToProcessOneByOneAfterRollback;
}
public boolean isFinished() {
@@ -129,24 +192,11 @@
this.finished = finished;
}
- public void setRetry(boolean ignored) {
- // no-op
- }
-
- public boolean isRollback() {
- return rollback;
- }
-
- public void setRollback(boolean rollback) {
- this.rollback = rollback;
- }
-
- private boolean skipped = false;
- private boolean filtered = false;
private boolean finished = false;
- private boolean checkPointed = false;
- private boolean rollback = false;
-
+ private Exception retryableException = null;
+ private boolean markedForRollbackWithRetry = false;
+ private int itemsTouchedInCurrentChunk = 0;
+ private int itemsToProcessOneByOneAfterRollback = 0; // For retry with rollback
}
/**
@@ -155,65 +205,57 @@
* reader (not more items to read), or the writer buffer is full or a
* checkpoint is triggered.
*
- * @param chunkSize write buffer size
- * @param theStatus flags when the read-process reached the last record or a
- * checkpoint is required
* @return an array list of objects to write
*/
- private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
+ private List<Object> readAndProcess() {
List<Object> chunkToWrite = new ArrayList<Object>();
Object itemRead;
Object itemProcessed;
int readProcessedCount = 0;
while (true) {
- ItemStatus status = new ItemStatus();
- itemRead = readItem(status);
+ currentItemStatus = new SingleItemStatus();
+ currentChunkStatus.incrementItemsTouchedInCurrentChunk();
+ itemRead = readItem();
- if (status.isRollback()) {
- theStatus.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
break;
}
- if (!status.isSkipped() && !status.isFinished()) {
- itemProcessed = processItem(itemRead, status);
+ if (!currentItemStatus.isSkipped() && !currentChunkStatus.isFinished()) {
+ itemProcessed = processItem(itemRead);
- if (status.isRollback()) {
- theStatus.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
break;
}
- if (!status.isSkipped() && !status.isFiltered()) {
+ if (!currentItemStatus.isSkipped() && !currentItemStatus.isFiltered()) {
chunkToWrite.add(itemProcessed);
- readProcessedCount++;
}
}
- theStatus.setFinished(status.isFinished());
- theStatus.setCheckPointed(checkpointManager.applyCheckPointPolicy());
-
- // This will force the current item to finish processing on a stop
- // request
- if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- theStatus.setFinished(true);
- }
-
- // write buffer size reached
- if ((readProcessedCount == chunkSize) && !("custom".equals(checkpointProxy.getCheckpointType()))) {
+ // Break out of the loop to deliver one-at-a-time processing after rollback.
+ // No point calling isReadyToCheckpoint(), we know we're done. Let's not
+ // complicate the checkpoint algorithm to hold this logic, just break right here.
+ if (currentChunkStatus.isRetryingAfterRollback()) {
break;
}
- // checkpoint reached
- if (theStatus.isCheckPointed()) {
+ // write buffer size reached
+ // This will force the current item to finish processing on a stop request
+ if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ currentChunkStatus.setFinished(true);
+ }
+
+ // The spec, in Sec. 11.10, Chunk with Custom Checkpoint Processing, clearly
+ // outlines that this gets called even when we've already read a null (which
+ // arguably is pointless). But we'll follow the spec.
+ if (checkpointManager.applyCheckPointPolicy()) {
break;
}
// last record in readerProxy reached
- if (theStatus.isFinished()) {
+ if (currentChunkStatus.isFinished()) {
break;
}
@@ -223,11 +265,10 @@
/**
* Reads an item from the reader
- *
- * @param status flags the current read status
+ *h
* @return the item read
*/
- private Object readItem(ItemStatus status) {
+ private Object readItem() {
Object itemRead = null;
try {
@@ -244,10 +285,7 @@
// itemRead == null means we reached the end of
// the readerProxy "resultset"
- status.setFinished(itemRead == null);
- if (!status.isFinished()) {
- stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
- }
+ currentChunkStatus.setFinished(itemRead == null);
} catch (Exception e) {
stepContext.setException(e);
for (ItemReadListener readListenerProxy : itemReadListeners) {
@@ -257,26 +295,16 @@
ExceptionConfig.wrapBatchException(e1);
}
}
- if (!rollbackRetry) {
+ if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryReadException(e)) {
- for (ItemReadListener readListenerProxy : itemReadListeners) {
- try {
- readListenerProxy.onReadError(e);
- } catch (Exception e1) {
- ExceptionConfig.wrapBatchException(e1);
- }
- }
- // if not a rollback exception, just retry the current item
if (!retryHandler.isRollbackException(e)) {
- itemRead = readItem(status);
+ itemRead = readItem();
} else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ // retry with rollback
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipReadException(e)) {
- status.setSkipped(true);
+ currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
} else {
@@ -285,16 +313,16 @@
} else {
// coming from a rollback retry
if (skipReadException(e)) {
- status.setSkipped(true);
+ currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
} else if (retryReadException(e)) {
if (!retryHandler.isRollbackException(e)) {
- itemRead = readItem(status);
+ // retry without rollback
+ itemRead = readItem();
} else {
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ // retry with rollback
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
@@ -313,10 +341,9 @@
* Process an item previously read by the reader
*
* @param itemRead the item read
- * @param status flags the current process status
* @return the processed item
*/
- private Object processItem(final Object itemRead, final ItemStatus status) {
+ private Object processItem(final Object itemRead) {
Object processedItem = null;
// if no processor defined for this chunk
@@ -334,9 +361,7 @@
processedItem = processorProxy.processItem(itemRead);
if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
+ currentItemStatus.setFiltered(true);
}
for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
@@ -350,78 +375,30 @@
ExceptionConfig.wrapBatchException(e1);
}
}
- if (!rollbackRetry) {
+ if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryProcessException(e, itemRead)) {
if (!retryHandler.isRollbackException(e)) {
- // call process listeners before and after the actual
- // process call
- for (ItemProcessListener processListenerProxy : itemProcessListeners) {
- try {
- processListenerProxy.beforeProcess(itemRead);
- } catch (Exception e1) {
- ExceptionConfig.wrapBatchException(e1);
- }
- }
- processedItem = processItem(itemRead, status);
- if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
- }
-
- for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
- try {
- processListenerProxy.afterProcess(itemRead, processedItem);
- } catch (Exception e1) {
- ExceptionConfig.wrapBatchException(e1);
- }
- }
+ processedItem = processItem(itemRead);
} else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipProcessException(e, itemRead)) {
- status.setSkipped(true);
+ currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
} else {
throw new BatchContainerRuntimeException(e);
}
} else {
if (skipProcessException(e, itemRead)) {
- status.setSkipped(true);
+ currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
} else if (retryProcessException(e, itemRead)) {
if (!retryHandler.isRollbackException(e)) {
- // call process listeners before and after the actual
- // process call
- for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
- try {
- processListenerProxy.beforeProcess(itemRead);
- } catch (Exception e1) {
- ExceptionConfig.wrapBatchException(e1);
- }
- }
- processedItem = processItem(itemRead, status);
- if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
- }
-
- for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
- try {
- processListenerProxy.afterProcess(itemRead, processedItem);
- } catch (Exception e1) {
- ExceptionConfig.wrapBatchException(e1);
- }
- }
+ // retry without rollback
+ processedItem = processItem(itemRead);
} else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ // retry with rollback
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
@@ -440,7 +417,7 @@
*
* @param theChunk the array list with all items processed ready to be written
*/
- private void writeChunk(List<Object> theChunk, ItemStatus status) {
+ private void writeChunk(List<Object> theChunk) {
if (!theChunk.isEmpty()) {
try {
@@ -454,7 +431,6 @@
for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
writeListenerProxy.afterWrite(theChunk);
}
- stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
} catch (Exception e) {
this.stepContext.setException(e);
for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
@@ -464,15 +440,14 @@
ExceptionConfig.wrapBatchException(e1);
}
}
- if (!rollbackRetry) {
+ if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryWriteException(e, theChunk)) {
if (!retryHandler.isRollbackException(e)) {
- writeChunk(theChunk, status);
+ // retry without rollback
+ writeChunk(theChunk);
} else {
- rollbackRetry = true;
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ // retry with rollback
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipWriteException(e, theChunk)) {
stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
@@ -485,13 +460,11 @@
stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
} else if (retryWriteException(e, theChunk)) {
if (!retryHandler.isRollbackException(e)) {
- status.setRetry(true);
- writeChunk(theChunk, status);
+ // retry without rollback
+ writeChunk(theChunk);
} else {
- rollbackRetry = true;
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ // retry with rollback
+ currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
@@ -504,13 +477,63 @@
}
}
- private void invokeChunk() {
- int itemCount = ChunkHelper.getItemCount(chunk);
- int timeInterval = ChunkHelper.getTimeLimit(chunk);
- boolean checkPointed = true;
- boolean rollback = false;
+ /**
+ * Prime the next chunk's ChunkStatus based on the previous one
+ * (if there was one), particularly taking into account retry-with-rollback
+ * and the one-at-a-time processing it entails.
+ * @return the upcoming chunk's ChunkStatus
+ */
+ private ChunkStatus getNextChunkStatusBasedOnPrevious() {
+ // If this is the first chunk
+ if (currentChunkStatus == null) {
+ return new ChunkStatus();
+ }
- // begin new transaction at first iteration or after a checkpoint commit
+ ChunkStatus nextChunkStatus = null;
+
+ // At this point the 'current' status is the previous chunk's status.
+ if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
+
+ // Re-position reader & writer
+ transactionManager.begin();
+ positionReaderAtCheckpoint();
+ positionWriterAtCheckpoint();
+ transactionManager.commit();
+
+ nextChunkStatus = new ChunkStatus(ChunkStatusType.RETRY_AFTER_ROLLBACK);
+
+ // What happens if we get a retry-with-rollback on a single item that we were processing
+ // after a prior retry with rollback? We don't want to revert to normal processing
+ // after completing only the single item of the "single item chunk". We want to complete
+ // the full portion of the original chunk. So be careful to propagate this number if
+ // it already exists.
+ int numToProcessOneByOne = currentChunkStatus.getItemsToProcessOneByOneAfterRollback();
+ if (numToProcessOneByOne > 0) {
+ // Retry after rollback AFTER a previous retry after rollback
+ nextChunkStatus.setItemsToProcessOneByOneAfterRollback(numToProcessOneByOne);
+ } else {
+ // "Normal" (i.e. the first) retry after rollback.
+ nextChunkStatus.setItemsToProcessOneByOneAfterRollback(currentChunkStatus.getItemsTouchedInCurrentChunk());
+ }
+ } else if (currentChunkStatus.isRetryingAfterRollback()) {
+ // In this case the 'current' (actually the last) chunk was a single-item retry after rollback chunk,
+ // so we have to see if it's time to revert to normal processing.
+ int numToProcessOneByOne = currentChunkStatus.getItemsToProcessOneByOneAfterRollback();
+ if (numToProcessOneByOne == 1) {
+ // we're done, revert to normal
+ nextChunkStatus = new ChunkStatus();
+ } else {
+ nextChunkStatus = new ChunkStatus(ChunkStatusType.RETRY_AFTER_ROLLBACK);
+ nextChunkStatus.setItemsToProcessOneByOneAfterRollback(numToProcessOneByOne - 1);
+ }
+ } else {
+ nextChunkStatus = new ChunkStatus();
+ }
+
+ return nextChunkStatus;
+ }
+
+ private void invokeChunk() {
try {
transactionManager.begin();
@@ -523,101 +546,67 @@
try {
while (true) {
- if (checkPointed || rollback) {
- if ("custom".equals(checkpointProxy.getCheckpointType())) {
- int newtimeOut = this.checkpointManager.checkpointTimeout();
- transactionManager.setTransactionTimeout(newtimeOut);
- }
+ currentChunkStatus = getNextChunkStatusBasedOnPrevious();
+
+ // Sequence surrounding beginCheckpoint() updated per MR
+ // https://java.net/bugzilla/show_bug.cgi?id=5873
+ setNextChunkTransactionTimeout();
+
+ // Remember we "wrap" the built-in item-count + time-limit "algorithm"
+ // in a CheckpointAlgorithm for ease in keeping the sequence consistent
+ checkpointManager.beginCheckpoint();
+
+ transactionManager.begin();
+
+ for (ChunkListener chunkProxy : chunkListeners) {
+ chunkProxy.beforeChunk();
+ }
+
+ final List<Object> chunkToWrite = readAndProcess();
+
+ if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
+ rollbackAfterRetryableException();
+ continue;
+ }
+
+ // MR 1.0 Rev A clarified we'd only write a chunk with at least one item.
+ // See, e.g. Sec 11.6 of Spec
+ if (chunkToWrite.size() > 0) {
+ writeChunk(chunkToWrite);
+ }
+
+ if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
+ rollbackAfterRetryableException();
+
+ continue;
+ }
+
+ for (ChunkListener chunkProxy : chunkListeners) {
+ chunkProxy.afterChunk();
+ }
+
+ checkpointManager.checkpoint();
+
+ this.persistUserData();
+
+ transactionManager.commit();
+
+ checkpointManager.endCheckpoint();
+
+ invokeCollectorIfPresent();
+
+ updateNormalMetrics(chunkToWrite.size());
+
+ // exit loop when last record is written
+ if (currentChunkStatus.isFinished()) {
transactionManager.begin();
- for (ChunkListener chunkProxy : chunkListeners) {
- chunkProxy.beforeChunk();
- }
-
- if (rollback) {
- positionReaderAtCheckpoint();
- positionWriterAtCheckpoint();
- checkpointManager = new CheckpointManager(readerProxy, writerProxy,
- getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
- .getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);
- }
- }
-
- ItemStatus status = new ItemStatus();
-
- if (rollback) {
- rollback = false;
- }
-
- final List<Object> chunkToWrite = readAndProcess(itemCount, status);
-
- if (status.isRollback()) {
- itemCount = 1;
- rollback = true;
-
- doClose();
-
- transactionManager.rollback();
-
- continue;
- }
-
- writeChunk(chunkToWrite, status);
-
- if (status.isRollback()) {
- itemCount = 1;
- rollback = true;
-
- doClose();
-
- transactionManager.rollback();
-
- continue;
- }
- checkPointed = status.isCheckPointed();
-
- // we could finish the chunk in 3 conditions: buffer is full,
- // checkpoint, not more input
- if (status.isCheckPointed() || status.isFinished()) {
- // TODO: missing before checkpoint listeners
- // 1.- check if spec list proper steps for before checkpoint
- // 2.- ask Andy about retry
- // 3.- when do we stop?
-
- checkpointManager.checkpoint();
-
- for (ChunkListener chunkProxy : chunkListeners) {
- chunkProxy.afterChunk();
- }
-
- this.persistUserData();
-
- this.chkptAlg.beginCheckpoint();
-
- transactionManager.commit();
-
- this.chkptAlg.endCheckpoint();
-
- invokeCollectorIfPresent();
-
- // exit loop when last record is written
- if (status.isFinished()) {
- transactionManager.begin();
-
- if (doClose()) {
- transactionManager.commit();
- stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
- } else {
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- transactionManager.rollback();
- }
- break;
+ if (doClose()) {
+ transactionManager.commit();
} else {
- // increment commitCount
- stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ transactionManager.rollback();
}
-
+ break;
}
-
}
} catch (final Exception e) {
logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
@@ -635,7 +624,24 @@
}
}
- private boolean doClose() throws Exception {
+ private void updateNormalMetrics(int writeCount) {
+ int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
+ if (currentChunkStatus.isFinished()) {
+ readCount--;
+ }
+ int filterCount = readCount - writeCount;
+
+ if (readCount < 0 || filterCount < 0 || writeCount < 0) {
+ throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
+ ", Filter count: " + filterCount + ", Write count: " + writeCount);
+ }
+ stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValueBy(readCount);
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValueBy(filterCount);
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(writeCount);
+ }
+
+ private boolean doClose() {
try {
readerProxy.close();
writerProxy.close();
@@ -646,45 +652,106 @@
}
}
+ /**
+ * Reflect spec order in Sec. 11.9 "Rollback Procedure".
+ *
+ * Also do final rollback in try-finally
+ */
private void rollback(final Throwable t) {
- transactionManager.setRollbackOnly();
try {
- doClose();
- } catch (Exception e) {
// ignore, we blow up anyway
+ transactionManager.setRollbackOnly();
+ try {
+ doClose();
+ } catch (Exception e) {
+ // ignore, we blow up anyway
+ }
+
+ if (t instanceof Exception) {
+ Exception e = (Exception) t;
+ for (ChunkListener chunkProxy : chunkListeners) {
+ try {
+ chunkProxy.onError(e);
+ } catch (final Exception e1) {
+ logger.log(Level.SEVERE, e1.getMessage(), e1);
+ }
+ }
+ }
+ // Count non-retryable rollback against metric as well. Not sure this has
+ // ever come up in the spec, but seems marginally more useful.
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ } finally {
+ transactionManager.rollback();
+ throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
}
- transactionManager.rollback();
- throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
}
+ private void rollbackAfterRetryableException() throws Exception {
+ doClose();
+
+ for (ChunkListener chunkProxy : chunkListeners) {
+ try {
+ chunkProxy.onError(currentChunkStatus.getRetryableException());
+ } catch (final Exception e1) {
+ logger.log(Level.SEVERE, e1.getMessage(), e1);
+ }
+ }
+ transactionManager.rollback();
+
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+
+ @Override
protected void invokeCoreStep() throws BatchContainerServiceException {
this.chunk = step.getChunk();
initializeChunkArtifacts();
+ initializeCheckpointManager();
+
invokeChunk();
}
- private CheckpointAlgorithm getCheckpointAlgorithm(final int itemCount, final int timeInterval) {
- final CheckpointAlgorithm alg;
- if ("item".equals(checkpointProxy.getCheckpointType())) {
- alg = new ItemCheckpointAlgorithm();
- ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
- } else { // custom chkpt alg
- alg = checkpointProxy;
+ private void initializeCheckpointManager() {
+ CheckpointAlgorithm checkpointAlgorithm = null;
+
+ checkpointAtThisItemCount = ChunkHelper.getItemCount(chunk);
+ int timeLimitSeconds = ChunkHelper.getTimeLimit(chunk);
+ customCheckpointPolicy = ChunkHelper.isCustomCheckpointPolicy(chunk); // Supplies default if needed
+
+ if (!customCheckpointPolicy) {
+ ItemCheckpointAlgorithm ica = new ItemCheckpointAlgorithm();
+ ica.setItemCount(checkpointAtThisItemCount);
+ ica.setTimeLimitSeconds(timeLimitSeconds);
+ checkpointAlgorithm = ica;
+
+ } else {
+ final List<Property> propList;
+
+ if (chunk.getCheckpointAlgorithm() == null) {
+ throw new IllegalArgumentException("Configured checkpoint-policy of 'custom' but without a corresponding <checkpoint-algorithm> element.");
+ } else {
+ propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();
+ }
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ checkpointAlgorithm = ProxyFactory.createCheckpointAlgorithmProxy(artifactFactory, chunk.getCheckpointAlgorithm().getRef(), injectionRef, jobExecutionImpl);
}
- return alg;
+ // Finally, for both policies now
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy, checkpointAlgorithm,
+ jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);
+
+ // A related piece of data we'll calculate here is the tran timeout. Though we won't include
+ // it in the checkpoint manager since we'll set it directly on the tran mgr before each chunk.
+ stepPropertyTranTimeoutSeconds = initStepTransactionTimeout();
}
/*
* Initialize itemreader, itemwriter, and item processor checkpoint
*/
private void initializeChunkArtifacts() {
- final int itemCount = ChunkHelper.getItemCount(chunk);
- final int timeInterval = ChunkHelper.getTimeLimit(chunk);
-
{
final org.apache.batchee.jaxb.ItemReader itemReader = chunk.getReader();
final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
@@ -709,18 +776,6 @@
}
{
- final List<Property> propList;
- if (chunk.getCheckpointAlgorithm() != null) {
- propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();
- } else {
- propList = null;
- }
-
- final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
- checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(artifactFactory, step, injectionRef, jobExecutionImpl);
- }
-
- {
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
this.chunkListeners = jobExecutionImpl.getListenerFactory().getListeners(ChunkListener.class, step, injectionRef, jobExecutionImpl);
@@ -740,16 +795,6 @@
final List<RetryWriteListener> retryWriteListeners
= jobExecutionImpl.getListenerFactory().getListeners(RetryWriteListener.class, step, injectionRef, jobExecutionImpl);
- if ("item".equals(checkpointProxy.getCheckpointType())) {
- chkptAlg = new ItemCheckpointAlgorithm();
- ItemCheckpointAlgorithm.class.cast(chkptAlg).setThresholds(itemCount, timeInterval);
- } else { // custom chkpt alg
- chkptAlg = checkpointProxy;
- }
-
- checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(),
- persistenceManagerService, dataRepresentationService);
-
skipHandler = new SkipHandler(chunk);
skipHandler.addSkipProcessListener(skipProcessListeners);
skipHandler.addSkipReadListener(skipReadListeners);
@@ -763,6 +808,43 @@
}
}
+ private void setNextChunkTransactionTimeout() {
+ int nextTimeout = 0;
+
+ if (customCheckpointPolicy) {
+ // Even on a retry-with-rollback, we'll continue to let
+ // the custom CheckpointAlgorithm set a tran timeout.
+ //
+ // We're guessing the application could need a smaller timeout than
+ // 180 seconds, (the default established by the batch chunk).
+ nextTimeout = this.checkpointManager.checkpointTimeout();
+ } else {
+ nextTimeout = stepPropertyTranTimeoutSeconds;
+ }
+ transactionManager.setTransactionTimeout(nextTimeout);
+ }
+
+ /**
+ * Note we can rely on the StepContext properties already having been set at this point.
+ *
+ * @return global transaction timeout defined in step properties. default
+ */
+ private int initStepTransactionTimeout() {
+ Properties p = stepContext.getProperties();
+ int timeout = DEFAULT_TRAN_TIMEOUT_SECONDS; // default as per spec.
+ if (p != null && !p.isEmpty()) {
+
+ String propertyTimeOut = p.getProperty("javax.transaction.global.timeout");
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "javax.transaction.global.timeout = {0}", propertyTimeOut==null ? "<null>" : propertyTimeOut);
+ }
+ if (propertyTimeOut != null && !propertyTimeOut.isEmpty()) {
+ timeout = Integer.parseInt(propertyTimeOut, 10);
+ }
+ }
+ return timeout;
+ }
+
private void openReaderAndWriter() {
readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);
CheckpointData readerChkptData = persistenceManagerService.getCheckpointData(readerChkptDK);
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
index f95aef3..8467d61 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
@@ -19,16 +19,18 @@
import javax.batch.api.chunk.CheckpointAlgorithm;
public final class ItemCheckpointAlgorithm implements CheckpointAlgorithm {
- private long requests = 0;
private long checkpointBeginTime = 0;
+ private int itemsRead = 0;
private int time;
private int item;
- private long currentTime;
- public ItemCheckpointAlgorithm() {
- checkpointBeginTime = System.currentTimeMillis();
- currentTime = checkpointBeginTime;
+ public void setItemCount(int itemCount) {
+ this.item = itemCount;
+ }
+
+ public void setTimeLimitSeconds(int timeLimitSeconds) {
+ this.time = timeLimitSeconds;
}
@Override
@@ -37,18 +39,12 @@
}
public boolean isReadyToCheckpointItem() throws Exception {
- requests++;
-
- final boolean itemready = (requests >= item);
- if (itemready) {
- requests = 0;
- }
- return itemready;
+ return (itemsRead >= item);
}
public boolean isReadyToCheckpointTime() throws Exception {
boolean timeready = false;
- currentTime = System.currentTimeMillis();
+ final long currentTime = System.currentTimeMillis();
final long curdiff = currentTime - checkpointBeginTime;
final int diff = (int) curdiff / 1000;
@@ -66,6 +62,8 @@
public boolean isReadyToCheckpoint() throws Exception {
boolean ready = false;
+ itemsRead++;
+
if (time == 0) { // no time limit, just check if item count has been reached
if (isReadyToCheckpointItem()) {
ready = true;
@@ -77,14 +75,10 @@
return ready;
}
- public void setThresholds(int itemthreshold, int timethreshold) {
- item = itemthreshold;
- time = timethreshold;
- }
-
@Override
public void beginCheckpoint() throws Exception {
- checkpointBeginTime = currentTime;
+ checkpointBeginTime = System.currentTimeMillis();
+ itemsRead = 0;
}
@Override
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
index e2aac5b..512c892 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/CheckpointAlgorithmProxy.java
@@ -17,35 +17,18 @@
package org.apache.batchee.container.proxy;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-import org.apache.batchee.container.impl.controller.chunk.ItemCheckpointAlgorithm;
import javax.batch.api.chunk.CheckpointAlgorithm;
public class CheckpointAlgorithmProxy extends AbstractProxy<CheckpointAlgorithm> implements CheckpointAlgorithm {
-
- private String checkpointType = null;
-
/*
* Allow this to be public as a special case so we can easily treat the built-in algorithms
* as identical to custom ones.
*/
public CheckpointAlgorithmProxy(final CheckpointAlgorithm delegate) {
super(delegate);
-
- if (delegate instanceof ItemCheckpointAlgorithm) {
- checkpointType = "item";
- } else {
- checkpointType = "custom";
- }
-
}
-
- public String getCheckpointType() {
- return checkpointType;
- }
-
-
@Override
public void beginCheckpoint() {
try {
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
index deaa029..2242736 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/kernel/DefaultBatchKernel.java
@@ -185,6 +185,9 @@
return batchWorkUnits;
}
+ /*
+ * There are some assumptions that all partition subjobs have associated DB entries
+ */
@Override
public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config, final JobContextImpl jc, final StepContextImpl sc)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
@@ -201,7 +204,7 @@
final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];
try {
- final long execId = getMostRecentExecutionId(parallelJob);
+ final long execId = getMostRecentSubJobExecutionId(parallelJob);
final RuntimeJobExecution jobExecution;
try {
jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);
@@ -244,15 +247,13 @@
return batchWork;
}
- private long getMostRecentExecutionId(final JSLJob jobModel) {
+ private long getMostRecentSubJobExecutionId(final JSLJob jobModel) {
- //There can only be one instance associated with a subjob's id since it is generated from an unique
- //job instance id. So there should be no way to directly start a subjob with particular
- final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
+ // Pick off the first, knowing the ordering. There could be more than one.
+ final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 1);
- // Maybe we should blow up on '0' too?
- if (instanceIds.size() > 1) {
- throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.");
+ if (instanceIds.size() == 0) {
+ throw new IllegalStateException("Did not find an entry for job name = " + jobModel.getId());
}
final List<InternalJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
@@ -271,7 +272,7 @@
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
final JSLJob jobModel = config.getJobModel();
- final long execId = getMostRecentExecutionId(jobModel);
+ final long execId = getMostRecentSubJobExecutionId(jobModel);
final RuntimeFlowInSplitExecution jobExecution;
try {
jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
index 991a919..67cac44 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/transaction/DefaultBatchTransactionService.java
@@ -25,8 +25,6 @@
import java.util.Properties;
public class DefaultBatchTransactionService implements TransactionManagementService {
- protected static final int DEFAULT_TRANSACTION_TIMEOUT = 180; // seconds
-
protected Properties batchConfig = null;
@Override
@@ -45,31 +43,8 @@
@Override
public TransactionManagerAdapter getTransactionManager(final StepContext stepContext) throws TransactionManagementException {
- final TransactionManagerAdapter transactionManager = getTransactionManager();
- try {
- transactionManager.setTransactionTimeout(getTransactionTimeout(stepContext));
- } catch (final Exception e) {
- throw new TransactionManagementException(e);
- }
- return transactionManager;
- }
-
-
- /**
- * @param stepContext current step context
- * @return global transaction timeout defined in step properties. default
- * timeout value is 180
- */
- private int getTransactionTimeout(final StepContext stepContext) {
- final Properties p = stepContext.getProperties();
- int timeout = DEFAULT_TRANSACTION_TIMEOUT; // default as per spec.
- if (p != null && !p.isEmpty()) {
- final String propertyTimeOut = p.getProperty("javax.transaction.global.timeout");
- if (propertyTimeOut != null && !propertyTimeOut.isEmpty()) {
- timeout = Integer.parseInt(propertyTimeOut, 10);
- }
- }
- return timeout;
+ // Doesn't currently make use of stepContext but keeping signature
+ return getTransactionManager();
}
@Override
diff --git a/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java b/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
index 20fe0f4..af51253 100644
--- a/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
+++ b/jbatch/src/test/java/org/apache/batchee/test/metric/PartitionMetricsTest.java
@@ -45,7 +45,7 @@
int checked = 0;
for (final Metric metric : metrics) {
if (Metric.MetricType.ROLLBACK_COUNT == metric.getType()) {
- assertEquals(metric.getValue(), 2);
+ assertEquals(metric.getValue(), 1);
checked++;
} else if (Metric.MetricType.READ_SKIP_COUNT == metric.getType()) {
assertEquals(metric.getValue(), 1);