blob: 03ea79440b6631daa659a31c37d737e377b290c5 [file] [log] [blame]
/*
* Copyright 2012 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.impl.jobinstance;
import org.apache.batchee.container.impl.JobContextImpl;
import org.apache.batchee.container.impl.JobInstanceImpl;
import org.apache.batchee.container.jsl.JobModelResolver;
import org.apache.batchee.container.modelresolver.PropertyResolver;
import org.apache.batchee.container.modelresolver.PropertyResolverFactory;
import org.apache.batchee.container.navigator.ModelNavigator;
import org.apache.batchee.container.navigator.NavigatorFactory;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.jaxb.JSLJob;
import org.apache.batchee.jaxb.JSLProperties;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionNotMostRecentException;
import javax.batch.operations.JobRestartException;
import javax.batch.operations.JobStartException;
import javax.batch.operations.NoSuchJobExecutionException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import java.util.Properties;
public class JobExecutionHelper {
private JobExecutionHelper() {
// private utility class ct
}
private static ModelNavigator<JSLJob> getResolvedJobNavigator(final String jobXml, final Properties jobParameters, final boolean parallelExecution) {
final JSLJob jobModel = new JobModelResolver().resolveModel(jobXml);
final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
propResolver.substituteProperties(jobModel, jobParameters);
return NavigatorFactory.createJobNavigator(jobModel);
}
private static ModelNavigator<JSLJob> getResolvedJobNavigator(final JSLJob jobModel, final Properties jobParameters, final boolean parallelExecution) {
final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
propResolver.substituteProperties(jobModel, jobParameters);
return NavigatorFactory.createJobNavigator(jobModel);
}
private static JobContextImpl getJobContext(ModelNavigator<JSLJob> jobNavigator) {
final JSLProperties jslProperties;
if (jobNavigator.getRootModelElement() != null) {
jslProperties = jobNavigator.getRootModelElement().getProperties();
} else {
jslProperties = new JSLProperties();
}
return new JobContextImpl(jobNavigator, jslProperties);
}
private static JobInstance getNewJobInstance(final ServicesManager servicesManager, final String name, final String jobXml) {
return servicesManager.service(PersistenceManagerService.class).createJobInstance(
name, jobXml);
}
private static JobInstance getNewSubJobInstance(final ServicesManager servicesManager, final String name) {
return servicesManager.service(PersistenceManagerService.class).createSubJobInstance(name);
}
private static JobStatus createNewJobStatus(final JobStatusManagerService statusManagerService, final JobInstance jobInstance) {
final long instanceId = jobInstance.getInstanceId();
final JobStatus jobStatus = statusManagerService.createJobStatus(instanceId);
jobStatus.setJobInstance(jobInstance);
return jobStatus;
}
private static void validateRestartableFalseJobsDoNotRestart(final JSLJob jobModel)
throws JobRestartException {
if (jobModel.getRestartable() != null && jobModel.getRestartable().equalsIgnoreCase("false")) {
throw new JobRestartException("Job Restartable attribute is false, Job cannot be restarted.");
}
}
public static RuntimeJobExecution startJob(final ServicesManager servicesManager, final String jobXML, final Properties jobParameters) throws JobStartException {
final JSLJob jobModel = new JobModelResolver().resolveModel(jobXML);
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, false);
final JobContextImpl jobContext = getJobContext(jobNavigator);
final JobInstance jobInstance = getNewJobInstance(servicesManager, jobNavigator.getRootModelElement().getId(), jobXML);
final RuntimeJobExecution executionHelper =
servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
public static RuntimeFlowInSplitExecution startFlowInSplit(final ServicesManager servicesManager, final JSLJob jobModel) throws JobStartException {
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, null, true);
final JobContextImpl jobContext = getJobContext(jobNavigator);
final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
final RuntimeFlowInSplitExecution executionHelper =
servicesManager.service(PersistenceManagerService.class).createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
public static RuntimeJobExecution startPartition(final ServicesManager servicesManager, final JSLJob jobModel, final Properties jobParameters) throws JobStartException {
final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, true);
final JobContextImpl jobContext = getJobContext(jobNavigator);
final JobInstance jobInstance = getNewSubJobInstance(servicesManager, jobNavigator.getRootModelElement().getId());
final RuntimeJobExecution executionHelper =
servicesManager.service(PersistenceManagerService.class).createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
executionHelper.prepareForExecution(jobContext);
final JobStatusManagerService statusManagerService = servicesManager.service(JobStatusManagerService.class);
final JobStatus jobStatus = createNewJobStatus(statusManagerService, jobInstance);
statusManagerService.updateJobStatus(jobStatus);
return executionHelper;
}
private static void validateJobInstanceNotCompleteOrAbandonded(final JobStatus jobStatus) throws JobRestartException, JobExecutionAlreadyCompleteException {
if (jobStatus.getBatchStatus() == null) {
throw new IllegalStateException("On restart, we didn't find an earlier batch status.");
}
if (jobStatus.getBatchStatus().equals(BatchStatus.COMPLETED)) {
throw new JobExecutionAlreadyCompleteException("Already completed job instance = " + jobStatus.getJobInstanceId());
} else if (jobStatus.getBatchStatus().equals(BatchStatus.ABANDONED)) {
throw new JobRestartException("Abandoned job instance = " + jobStatus.getJobInstanceId());
}
}
private static void validateJobExecutionIsMostRecent(final PersistenceManagerService persistenceManagerService, final long jobInstanceId, final long executionId)
throws JobExecutionNotMostRecentException {
final long mostRecentExecutionId = persistenceManagerService.getMostRecentExecutionId(jobInstanceId);
if (mostRecentExecutionId != executionId) {
throw new JobExecutionNotMostRecentException("ExecutionId: " + executionId + " is not the most recent execution.");
}
}
public static RuntimeJobExecution restartPartition(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel, final Properties partitionProps)
throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
return restartExecution(servicesManager, execId, gennedJobModel, partitionProps, true, false);
}
public static RuntimeFlowInSplitExecution restartFlowInSplit(final ServicesManager servicesManager, final long execId, final JSLJob gennedJobModel)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
return (RuntimeFlowInSplitExecution) restartExecution(servicesManager, execId, gennedJobModel, null, true, true);
}
public static RuntimeJobExecution restartJob(final ServicesManager servicesManager, final long executionId, final Properties restartJobParameters)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
return restartExecution(servicesManager, executionId, null, restartJobParameters, false, false);
}
private static RuntimeJobExecution restartExecution(final ServicesManager servicesManager, final long executionId, final JSLJob gennedJobModel,
final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
final PersistenceManagerService persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
final JobStatusManagerService jobStatusManagerService = servicesManager.service(JobStatusManagerService.class);
final long jobInstanceId = persistenceManagerService.getJobInstanceIdByExecutionId(executionId);
final JobStatus jobStatus = jobStatusManagerService.getJobStatus(jobInstanceId);
validateJobExecutionIsMostRecent(persistenceManagerService, jobInstanceId, executionId);
validateJobInstanceNotCompleteOrAbandonded(jobStatus);
final JobInstanceImpl jobInstance = jobStatus.getJobInstance();
final ModelNavigator<JSLJob> jobNavigator;
// If we are in a parallel job that is genned use the regenned JSL.
if (gennedJobModel == null) {
jobNavigator = getResolvedJobNavigator(jobInstance.getJobXML(), restartJobParameters, parallelExecution);
} else {
jobNavigator = getResolvedJobNavigator(gennedJobModel, restartJobParameters, parallelExecution);
}
// JSLJob jobModel = ModelResolverFactory.createJobResolver().resolveModel(jobInstance.getJobXML());
validateRestartableFalseJobsDoNotRestart(jobNavigator.getRootModelElement());
final JobContextImpl jobContext = getJobContext(jobNavigator);
final RuntimeJobExecution executionHelper;
if (flowInSplit) {
executionHelper = persistenceManagerService.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
} else {
executionHelper = persistenceManagerService.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
}
executionHelper.prepareForExecution(jobContext, jobStatus.getRestartOn());
jobStatusManagerService.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
return executionHelper;
}
public static InternalJobExecution getPersistedJobOperatorJobExecution(final PersistenceManagerService persistenceManagerService, final long jobExecutionId)
throws NoSuchJobExecutionException {
return persistenceManagerService.jobOperatorGetJobExecution(jobExecutionId);
}
public static JobInstance getJobInstance(final JobStatusManagerService statusManagerService, final long executionId) {
return statusManagerService.getJobStatusFromExecutionId(executionId).getJobInstance();
}
}