blob: f65e4d32150a9df83140035c6bf14c14a7a8b60f [file] [log] [blame]
/*
* Copyright 2012 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.impl.controller;
import org.apache.batchee.container.ExecutionElementController;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.JobContextImpl;
import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.status.ExecutionStatus;
import org.apache.batchee.container.status.ExtendedBatchStatus;
import org.apache.batchee.container.status.SplitExecutionStatus;
import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
import org.apache.batchee.container.util.BatchParallelWorkUnit;
import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
import org.apache.batchee.jaxb.Flow;
import org.apache.batchee.jaxb.JSLJob;
import org.apache.batchee.jaxb.Split;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionNotMostRecentException;
import javax.batch.operations.JobRestartException;
import javax.batch.operations.JobStartException;
import javax.batch.operations.NoSuchJobExecutionException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
public class SplitController implements ExecutionElementController {
private final static Logger logger = Logger.getLogger(SplitController.class.getName());
private final RuntimeJobExecution jobExecution;
private volatile List<BatchFlowInSplitWorkUnit> parallelBatchWorkUnits;
private final BatchKernelService batchKernel;
private final JobContextImpl jobContext;
private final BlockingQueue<BatchFlowInSplitWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchFlowInSplitWorkUnit>();
private final long rootJobExecutionId;
private final List<JSLJob> subJobs = new ArrayList<JSLJob>();
protected Split split;
public SplitController(final RuntimeJobExecution jobExecution, final Split split, final long rootJobExecutionId,
final BatchKernelService kernelService) {
this.jobExecution = jobExecution;
this.jobContext = jobExecution.getJobContext();
this.rootJobExecutionId = rootJobExecutionId;
this.split = split;
this.batchKernel = kernelService;
}
@Override
public void stop() {
// It's possible we may try to stop a split before any
// sub steps have been started.
synchronized (subJobs) {
if (parallelBatchWorkUnits != null) {
for (BatchParallelWorkUnit subJob : parallelBatchWorkUnits) {
try {
batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
} catch (final Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
throw new IllegalStateException(e);
}
}
}
}
}
@Override
public SplitExecutionStatus execute()
throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
// Build all sub jobs from partitioned step
buildSubJobBatchWorkUnits();
// kick off the threads
executeWorkUnits();
// Deal with the results.
return waitForCompletionAndAggregateStatus();
}
/**
* Note we restart all flows. There is no concept of "the flow completed". It is only steps
* within the flows that may have already completed and so may not have needed to be rerun.
*/
private void buildSubJobBatchWorkUnits() {
List<Flow> flows = this.split.getFlows();
parallelBatchWorkUnits = new ArrayList<BatchFlowInSplitWorkUnit>();
// Build all sub jobs from flows in split
synchronized (subJobs) {
for (Flow flow : flows) {
subJobs.add(PartitionedStepBuilder.buildFlowInSplitSubJob(jobExecution.getExecutionId(), jobContext, this.split, flow));
}
for (JSLJob job : subJobs) {
int count = batchKernel.getJobInstanceCount(job.getId());
FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
if (count == 0) {
parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else if (count == 1) {
parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else {
throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
}
}
}
}
private void executeWorkUnits() {
// Then start or restart all subjobs in parallel
for (BatchParallelWorkUnit work : parallelBatchWorkUnits) {
int count = batchKernel.getJobInstanceCount(work.getJobExecutionImpl().getJobInstance().getJobName());
assert (count <= 1);
if (count == 1) {
batchKernel.startGeneratedJob(work);
} else if (count > 1) {
batchKernel.restartGeneratedJob(work);
} else {
throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
}
}
}
private SplitExecutionStatus waitForCompletionAndAggregateStatus() {
final SplitExecutionStatus splitStatus = new SplitExecutionStatus();
for (final JSLJob ignored : subJobs) {
final BatchFlowInSplitWorkUnit batchWork;
try {
batchWork = completedWorkQueue.take(); //wait for each thread to finish and then look at it's status
} catch (InterruptedException e) {
throw new BatchContainerRuntimeException(e);
}
final RuntimeFlowInSplitExecution flowExecution = batchWork.getJobExecutionImpl();
final ExecutionStatus flowStatus = flowExecution.getFlowStatus();
aggregateTerminatingStatusFromSingleFlow(null, flowStatus, splitStatus);
}
// If this is still set to 'null' that means all flows completed normally without terminating the job.
splitStatus.setExtendedBatchStatus(ExtendedBatchStatus.NORMAL_COMPLETION);
return splitStatus;
}
//
// A <fail> and an uncaught exception are peers. They each take precedence over a <stop>, which take precedence over an <end>.
// Among peers the last one seen gets to set the exit stauts.
//
private ExtendedBatchStatus aggregateTerminatingStatusFromSingleFlow(final ExtendedBatchStatus aggregateStatus, final ExecutionStatus flowStatus,
final SplitExecutionStatus splitStatus) {
final String exitStatus = flowStatus.getExitStatus();
final String restartOn = flowStatus.getRestartOn();
final ExtendedBatchStatus flowBatchStatus = flowStatus.getExtendedBatchStatus();
if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_END) || flowBatchStatus.equals(ExtendedBatchStatus.JSL_STOP) ||
flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
if (aggregateStatus == null) {
setInJobContext(flowBatchStatus, exitStatus, restartOn);
return flowBatchStatus;
} else {
splitStatus.setCouldMoreThanOneFlowHaveTerminatedJob(true);
if (aggregateStatus.equals(ExtendedBatchStatus.JSL_END)) {
logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <end> transition element. " +
"Overriding, setting exit status if non-null and preparing to end job.");
setInJobContext(flowBatchStatus, exitStatus, restartOn);
return flowBatchStatus;
} else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_STOP)) {
// Everything but an <end> overrides a <stop>
if (!(flowBatchStatus.equals(ExtendedBatchStatus.JSL_END))) {
logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <stop> transition element. " +
"Overriding, setting exit status if non-null and preparing to end job.");
setInJobContext(flowBatchStatus, exitStatus, restartOn);
return flowBatchStatus;
}
} else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_FAIL) || aggregateStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <fail> transition element " +
"or exception thrown. Overriding, setting exit status if non-null and preparing to end job.");
setInJobContext(flowBatchStatus, exitStatus, restartOn);
return flowBatchStatus;
}
}
}
}
return null;
}
private void setInJobContext(ExtendedBatchStatus flowBatchStatus, String exitStatus, String restartOn) {
if (exitStatus != null) {
jobContext.setExitStatus(exitStatus);
}
if (ExtendedBatchStatus.JSL_STOP.equals(flowBatchStatus)) {
if (restartOn != null) {
jobContext.setRestartOn(restartOn);
}
}
}
@Override
public List<Long> getLastRunStepExecutions() {
final List<Long> stepExecIdList = new ArrayList<Long>();
for (final BatchFlowInSplitWorkUnit workUnit : parallelBatchWorkUnits) {
final List<Long> stepExecIds = workUnit.getController().getLastRunStepExecutions();
stepExecIdList.addAll(stepExecIds);
}
return stepExecIdList;
}
}