blob: 63381aa97b459e278879f0c3bae836597fa1cc03 [file] [log] [blame]
/*
* Copyright 2012 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.ExecutionElementController;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.impl.MetricImpl;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.StepExecutionImpl;
import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.status.StepStatus;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.JSLProperties;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.spi.TransactionManagementService;
import org.apache.batchee.spi.TransactionManagerAdapter;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionNotMostRecentException;
import javax.batch.operations.JobRestartException;
import javax.batch.operations.JobStartException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Change the name of this class to something else!! Or change BaseStepController.
*/
public abstract class BaseStepController implements ExecutionElementController {
private static final Logger LOGGER = Logger.getLogger(BaseStepController.class.getName());
protected RuntimeJobExecution jobExecutionImpl;
protected JobInstance jobInstance;
protected StepContextImpl stepContext;
protected Step step;
protected String stepName;
protected StepStatus stepStatus;
protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;
protected long rootJobExecutionId;
// Restart of partitioned steps needs to be handled specially
protected boolean restartAfterCompletion = false;
protected final BatchKernelService kernelService;
protected final PersistenceManagerService persistenceManagerService;
private final JobStatusManagerService statusManagerService;
protected TransactionManagerAdapter transactionManager = null;
private TransactionManagementService txService;
protected BaseStepController(final RuntimeJobExecution jobExecution, final Step step, final StepContextImpl stepContext, final long rootJobExecutionId,
final ServicesManager servicesManager) {
this.jobExecutionImpl = jobExecution;
this.jobInstance = jobExecution.getJobInstance();
this.stepContext = stepContext;
this.rootJobExecutionId = rootJobExecutionId;
if (step == null) {
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
this.stepName = step.getId();
this.txService = servicesManager.service(TransactionManagementService.class);
this.kernelService = servicesManager.service(BatchKernelService.class);
this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
this.statusManagerService = servicesManager.service(JobStatusManagerService.class);
}
protected BaseStepController(final RuntimeJobExecution jobExecution,
final Step step, final StepContextImpl stepContext,
final long rootJobExecutionId,
final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue,
final ServicesManager servicesManager) {
this(jobExecution, step, stepContext, rootJobExecutionId, servicesManager);
this.analyzerStatusQueue = analyzerStatusQueue;
}
///////////////////////////
// ABSTRACT METHODS ARE HERE
///////////////////////////
protected abstract void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException;
protected abstract void setupStepArtifacts();
protected abstract void invokePreStepArtifacts();
protected abstract void invokePostStepArtifacts();
// This is only useful from the partition threads
protected abstract void sendStatusFromPartitionToAnalyzerIfPresent();
@Override
public ExecutionStatus execute() {
// Here we're just setting up to decide if we're going to run the step or not (if it's already complete and
// allow-start-if-complete=false.
try {
boolean executeStep = shouldStepBeExecuted();
if (!executeStep) {
return new ExecutionStatus(ExtendedBatchStatus.DO_NOT_RUN, stepStatus.getExitStatus());
}
} catch (final Throwable t) {
// Treat an error at this point as unrecoverable, so fail job too.
markJobAndStepFailed();
LOGGER.log(Level.SEVERE, t.getMessage(), t);
rethrowWithMsg("Caught throwable while determining if step should be executed. Failing job.", t);
}
// At this point we have a StepExecution. Setup so that we're ready to invoke artifacts.
try {
startStep();
} catch (final Throwable t) {
// Treat an error at this point as unrecoverable, so fail job too.
markJobAndStepFailed();
LOGGER.log(Level.SEVERE, t.getMessage(), t);
rethrowWithMsg("Caught throwable while starting step. Failing job.", t);
}
// At this point artifacts are in the picture so we want to try to invoke afterStep() on a failure.
try {
invokePreStepArtifacts(); //Call PartitionReducer and StepListener(s)
invokeCoreStep();
} catch (final Exception e) {
// We're going to continue on so that we can execute the afterStep() and analyzer
try {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
markStepFailed();
} catch (final Throwable t) {
// Since the first one is the original first failure, let's rethrow t1 and not the second error,
// but we'll log a severe error pointing out that the failure didn't get persisted..
// We won't try to call the afterStep() in this case either.
rethrowWithMsg("ERROR. PERSISTING BATCH STATUS FAILED. STEP EXECUTION STATUS TABLES MIGHT HAVE CONSISTENCY ISSUES" +
"AND/OR UNEXPECTED ENTRIES.", t);
}
} catch (final Throwable t) {
LOGGER.log(Level.SEVERE, t.getMessage(), t);
markJobAndStepFailed();
}
//
// At this point we may have already failed the step, but we still try to invoke the end of step artifacts.
//
try {
//Call PartitionAnalyzer, PartitionReducer and StepListener(s)
invokePostStepArtifacts();
} catch (final Throwable t) {
LOGGER.log(Level.SEVERE, t.getMessage(), t);
markStepFailed();
}
//
// No more application code is on the path from here on out (excluding the call to the PartitionAnalyzer
// analyzeStatus(). If an exception bubbles up and leaves the statuses inconsistent or incorrect then so be it;
// maybe there's a runtime bug that will need to be fixed.
//
try {
// Now that all step-level artifacts have had a chance to run,
// we set the exit status to one of the defaults if it is still unset.
// This is going to be the very last sequence of calls from the step running on the main thread,
// since the call back to the partition analyzer only happens on the partition threads.
// On the partition threads, then, we harden the status at the partition level before we
// send it back to the main thread.
persistUserData();
transitionToFinalBatchStatus();
defaultExitStatusIfNecessary();
persistExitStatusAndEndTimestamp();
} catch (final Throwable t) {
// Don't let an exception caught here prevent us from persisting the failed batch status.
markJobAndStepFailed();
rethrowWithMsg("Failure ending step execution", t);
}
//
// Only happens on main thread.
//
sendStatusFromPartitionToAnalyzerIfPresent();
if (stepStatus.getBatchStatus().equals(BatchStatus.FAILED)) {
return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, stepStatus.getExitStatus());
}
return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, stepStatus.getExitStatus());
}
private void defaultExitStatusIfNecessary() {
final String stepExitStatus = stepContext.getExitStatus();
final String processRetVal = stepContext.getBatchletProcessRetVal();
if (stepExitStatus == null) {
if (processRetVal != null) {
stepContext.setExitStatus(processRetVal);
} else {
stepContext.setExitStatus(stepContext.getBatchStatus().name());
}
}
}
private void markStepFailed() {
updateBatchStatus(BatchStatus.FAILED);
}
protected void markJobAndStepFailed() {
jobExecutionImpl.getJobContext().setBatchStatus(BatchStatus.FAILED);
markStepFailed();
}
private void startStep() {
// Update status
statusStarting();
//Set Step context properties
setContextProperties();
//Set up step artifacts like step listeners, partition reducers
setupStepArtifacts();
// Move batch status to started.
updateBatchStatus(BatchStatus.STARTED);
long time = System.currentTimeMillis();
Timestamp startTS = new Timestamp(time);
stepContext.setStartTime(startTS);
persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
/**
* The only valid states at this point are STARTED,STOPPING, or FAILED.
* been able to get to STOPPED, or COMPLETED yet at this point in the code.
*/
private void transitionToFinalBatchStatus() {
BatchStatus currentBatchStatus = stepContext.getBatchStatus();
if (currentBatchStatus.equals(BatchStatus.STARTED)) {
updateBatchStatus(BatchStatus.COMPLETED);
} else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
updateBatchStatus(BatchStatus.STOPPED);
} else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
updateBatchStatus(BatchStatus.FAILED); // Should have already been done but maybe better for possible code refactoring to have it here.
} else {
throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
}
}
protected void updateBatchStatus(final BatchStatus updatedBatchStatus) {
stepStatus.setBatchStatus(updatedBatchStatus);
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(updatedBatchStatus);
}
protected boolean shouldStepBeExecuted() {
this.stepStatus = statusManagerService.getStepStatus(jobInstance.getInstanceId(), step.getId());
if (stepStatus == null) {
// create new step execution
final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
// create new step status for this run
stepStatus = statusManagerService.createStepStatus(stepExecution.getStepExecutionId());
stepContext.setInternalStepExecutionId(stepExecution.getStepExecutionId());
stepContext.setStepExecutionId(stepExecution.getStepExecutionId());
return true;
} else {
// if a step status already exists for this instance id. It means this
// is a restart and we need to get the previously persisted data
stepContext.setPersistentUserData(stepStatus.getPersistentUserData());
if (shouldStepBeExecutedOnRestart()) {
// Seems better to let the start count get incremented without getting a step execution than
// vice versa (in an unexpected error case).
stepStatus.incrementStartCount();
// create new step execution
final StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
final long stepExecutionId = stepExecution.getStepExecutionId();
this.stepStatus.setLastRunStepExecutionId(stepExecutionId);
stepContext.setStepExecutionId(stepExecutionId);
stepContext.setInternalStepExecutionId(stepExecutionId);
return true;
}
return false;
}
}
private boolean shouldStepBeExecutedOnRestart() {
BatchStatus stepBatchStatus = stepStatus.getBatchStatus();
if (stepBatchStatus.equals(BatchStatus.COMPLETED)) {
// A bit of parsing involved since the model gives us a String not a
// boolean, but it should default to 'false', which is the spec'd default.
if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {
return false;
} else {
restartAfterCompletion = true;
}
}
// The spec default is '0', which we get by initializing to '0' in the next line
int startLimit = 0;
String startLimitString = step.getStartLimit();
if (startLimitString != null) {
try {
startLimit = Integer.parseInt(startLimitString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Could not parse start limit value. Received NumberFormatException for start-limit value: " + startLimitString
+ " for stepId: " + step.getId() + ", with start-limit=" + step.getStartLimit());
}
}
if (startLimit < 0) {
throw new IllegalArgumentException("Found negative start-limit of " + startLimit + "for stepId: " + step.getId());
}
if (startLimit > 0) {
final int newStepStartCount = stepStatus.getStartCount() + 1;
if (newStepStartCount > startLimit) {
throw new IllegalStateException("For stepId: " + step.getId() + ", tried to start step for the " + newStepStartCount
+ " time, but startLimit = " + startLimit);
}
}
return true;
}
protected boolean isRestartExecution() {
return stepStatus.getStartCount() > 1;
}
protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
statusManagerService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
stepContext.setBatchStatus(BatchStatus.STARTING);
}
protected void persistUserData() {
PersistentDataWrapper userData = resolveUserData();
storeUserData(userData);
}
protected PersistentDataWrapper resolveUserData() {
final ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();
final ObjectOutputStream persistentDataOOS;
try {
persistentDataOOS = new ObjectOutputStream(persistentBAOS);
persistentDataOOS.writeObject(stepContext.getPersistentUserData());
persistentDataOOS.close();
} catch (final Exception e) {
throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
}
return new PersistentDataWrapper(persistentBAOS.toByteArray());
}
protected void storeUserData(PersistentDataWrapper userData) {
try {
stepStatus.setPersistentUserData(userData);
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
} catch (final Exception e) {
throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
}
}
protected void persistExitStatusAndEndTimestamp() {
stepStatus.setExitStatus(stepContext.getExitStatus());
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
// set the end time metric before flushing
long time = System.currentTimeMillis();
Timestamp endTS = new Timestamp(time);
stepContext.setEndTime(endTS);
persistenceManagerService.updateStepExecution(rootJobExecutionId, stepContext);
}
private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
return persistenceManagerService.createStepExecution(rootJobExecutionId, stepContext);
}
private void setContextProperties() {
JSLProperties jslProps = step.getProperties();
if (jslProps != null) {
for (Property property : jslProps.getPropertyList()) {
Properties contextProps = stepContext.getProperties();
contextProps.setProperty(property.getName(), property.getValue());
}
}
// set up metrics
stepContext.addMetric(MetricImpl.MetricType.READ_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.WRITE_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.READ_SKIP_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.FILTER_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
transactionManager = txService.getTransactionManager(stepContext);
}
public void setStepContext(final StepContextImpl stepContext) {
this.stepContext = stepContext;
}
@Override
public List<Long> getLastRunStepExecutions() {
final List<Long> stepExecIdList = new ArrayList<Long>(1);
stepExecIdList.add(this.stepStatus.getLastRunStepExecutionId());
return stepExecIdList;
}
private void rethrowWithMsg(final String msgBeginning, final Throwable t) {
final String errorMsg = msgBeginning + " ; Caught exception/error: " + t.getLocalizedMessage();
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
t.printStackTrace(pw);
throw new BatchContainerRuntimeException(errorMsg, t);
}
public String toString() {
return "BaseStepController for step = " + step.getId();
}
public void setParentStepContext(final StepContextImpl parentStepContext) {
if (parentStepContext != null) {
stepContext.setStepExecutionId(parentStepContext.getStepExecutionId());
}
}
}