blob: 35f30970832b013d87d4ce7accdd62ecae6f19a0 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.orchestrator.cpi.impl;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.error.ValidationResults;
import org.apache.airavata.model.error.ValidatorResult;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.*;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ExecutorService;
public class SimpleOrchestratorImpl extends AbstractOrchestrator{
private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
private ExecutorService executor;
// this is going to be null unless the thread count is 0
private JobSubmitter jobSubmitter = null;
public SimpleOrchestratorImpl() throws OrchestratorException, TException {
try {
try {
// We are only going to use GFacPassiveJobSubmitter
jobSubmitter = new GFACPassiveJobSubmitter();
jobSubmitter.initialize(this.orchestratorContext);
} catch (Exception e) {
String error = "Error creating JobSubmitter in non threaded mode ";
logger.error(error);
throw new OrchestratorException(error, e);
}
} catch (OrchestratorException e) {
logger.error("Error Constructing the Orchestrator");
throw e;
}
}
public boolean launchProcess(ProcessModel processModel, String tokenId) throws OrchestratorException {
try {
return jobSubmitter.submit(processModel.getExperimentId(), processModel.getProcessId(), tokenId);
} catch (Exception e) {
throw new OrchestratorException("Error launching the job", e);
}
}
public ValidationResults validateExperiment(ExperimentModel experiment)
throws OrchestratorException,LaunchValidationException {
org.apache.airavata.model.error.ValidationResults validationResults =
new org.apache.airavata.model.error.ValidationResults();
validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
if (this.orchestratorConfiguration.isEnableValidation()) {
List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
for (String validator : validatorClasses) {
try {
Class<? extends JobMetadataValidator> vClass =
Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
JobMetadataValidator jobMetadataValidator = vClass.newInstance();
validationResults = jobMetadataValidator.validate(experiment, null);
if (validationResults.isValidationState()) {
logger.info("Validation of " + validator + " is SUCCESSFUL");
} else {
List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
for (ValidatorResult result : validationResultList){
if (!result.isResult()){
String validationError = result.getErrorDetails();
if (validationError != null){
errorMsg += validationError + " ";
}
}
}
logger.error("Validation of " + validator + " for experiment Id " +
experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
validationResults.setValidationState(false);
try {
ErrorModel details = new ErrorModel();
details.setActualErrorMessage(errorMsg);
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
registryClient
.addErrors(OrchestratorConstants.EXPERIMENT_ERROR, details, experiment.getExperimentId());
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
} catch (RegistryServiceException e) {
logger.error("Error while saving error details to registry", e);
throw new RuntimeException("Error while saving error details to registry", e);
} catch (TException e) {
throw new RuntimeException("Error while saving error details to registry", e);
}
break;
}
} catch (ClassNotFoundException e) {
logger.error("Error loading the validation class: ", validator, e);
validationResults.setValidationState(false);
} catch (InstantiationException e) {
logger.error("Error loading the validation class: ", validator, e);
validationResults.setValidationState(false);
} catch (IllegalAccessException e) {
logger.error("Error loading the validation class: ", validator, e);
validationResults.setValidationState(false);
}
}
}
if(validationResults.isValidationState()){
return validationResults;
}else {
//atleast one validation has failed, so we throw an exception
LaunchValidationException launchValidationException = new LaunchValidationException();
launchValidationException.setValidationResult(validationResults);
launchValidationException.setErrorMessage("Validation failed refer the validationResults list for " +
"detail error. Validation errors : " + errorMsg);
throw launchValidationException;
}
}
public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel)
throws OrchestratorException, LaunchValidationException {
org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
if (this.orchestratorConfiguration.isEnableValidation()) {
List<String> validatorClzzez = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
for (String validator : validatorClzzez) {
try {
Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
JobMetadataValidator jobMetadataValidator = vClass.newInstance();
validationResults = jobMetadataValidator.validate(experiment, processModel);
if (validationResults.isValidationState()) {
logger.info("Validation of " + validator + " is SUCCESSFUL");
} else {
List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
for (ValidatorResult result : validationResultList) {
if (!result.isResult()) {
String validationError = result.getErrorDetails();
if (validationError != null) {
errorMsg += validationError + " ";
}
}
}
logger.error("Validation of " + validator + " for experiment Id " +
experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
validationResults.setValidationState(false);
try {
ErrorModel details = new ErrorModel();
details.setActualErrorMessage(errorMsg);
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
registryClient
.addErrors(OrchestratorConstants.PROCESS_ERROR, details, processModel.getProcessId());
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
} catch (RegistryServiceException e) {
logger.error("Error while saving error details to registry", e);
throw new RuntimeException("Error while saving error details to registry", e);
} catch (TException e) {
throw new RuntimeException("Error while saving error details to registry", e);
}
break;
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
logger.error("Error loading the validation class: ", validator, e);
validationResults.setValidationState(false);
}
}
}
if (validationResults.isValidationState()) {
return validationResults;
} else {
//atleast one validation has failed, so we throw an exception
LaunchValidationException launchValidationException = new LaunchValidationException();
launchValidationException.setValidationResult(validationResults);
launchValidationException.setErrorMessage("Validation failed refer the validationResults " +
"list for detail error. Validation errors : " + errorMsg);
throw launchValidationException;
}
}
public void cancelExperiment(ExperimentModel experiment, String tokenId) throws OrchestratorException {
logger.info("Terminating experiment " + experiment.getExperimentId());
RegistryService.Client registryServiceClient = getRegistryServiceClient();
try {
List<String> processIds = registryServiceClient.getProcessIds(experiment.getExperimentId());
if (processIds != null && processIds.size() > 0) {
for (String processId : processIds) {
logger.info("Terminating process " + processId + " of experiment " + experiment.getExperimentId());
jobSubmitter.terminate(experiment.getExperimentId(), processId, tokenId);
}
} else {
logger.warn("No processes found for experiment " + experiment.getExperimentId() + " to cancel");
}
} catch (TException e) {
logger.error("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
throw new OrchestratorException("Failed to fetch process ids for experiment " + experiment.getExperimentId(), e);
}
}
public ExecutorService getExecutor() {
return executor;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
public JobSubmitter getJobSubmitter() {
return jobSubmitter;
}
public void setJobSubmitter(JobSubmitter jobSubmitter) {
this.jobSubmitter = jobSubmitter;
}
public void initialize() throws OrchestratorException {
}
public List<ProcessModel> createProcesses (String experimentId, String gatewayId) throws OrchestratorException {
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
ExperimentModel experimentModel = registryClient.getExperiment(experimentId);
List<ProcessModel> processModels = registryClient.getProcessList(experimentId);
if (processModels == null || processModels.isEmpty()){
ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
String processId = registryClient.addProcess(processModel, experimentId);
processModel.setProcessId(processId);
processModels = new ArrayList<>();
processModels.add(processModel);
}
return processModels;
} catch (Exception e) {
throw new OrchestratorException("Error during creating process", e);
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
}
public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException {
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
ComputationalResourceSchedulingModel resourceSchedule = processModel.getProcessResourceSchedule();
String userGivenQueueName = resourceSchedule.getQueueName();
int userGivenWallTime = resourceSchedule.getWallTimeLimit();
String resourceHostId = resourceSchedule.getResourceHostId();
if (resourceHostId == null){
throw new OrchestratorException("Compute Resource Id cannot be null at this point");
}
ComputeResourceDescription computeResource = registryClient.getComputeResource(resourceHostId);
JobSubmissionInterface preferredJobSubmissionInterface =
OrchestratorUtils.getPreferredJobSubmissionInterface(processModel, gatewayId);
JobSubmissionProtocol preferredJobSubmissionProtocol = OrchestratorUtils.getPreferredJobSubmissionProtocol(processModel, gatewayId);
List<String> taskIdList = new ArrayList<>();
if (preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
// TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here.
taskIdList.addAll(createAndSaveSubmissionTasks(registryClient, gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
} else {
taskIdList.addAll(createAndSaveEnvSetupTask(registryClient, gatewayId, processModel));
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
if (autoSchedule) {
List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
for (BatchQueue batchQueue : definedBatchQueues) {
if (batchQueue.getQueueName().equals(userGivenQueueName)) {
int maxRunTime = batchQueue.getMaxRunTime();
if (maxRunTime < userGivenWallTime) {
resourceSchedule.setWallTimeLimit(maxRunTime);
// need to create more job submissions
int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
taskIdList.addAll(
createAndSaveSubmissionTasks(registryClient, gatewayId, preferredJobSubmissionInterface, processModel, maxRunTime));
}
int leftWallTime = userGivenWallTime % maxRunTime;
if (leftWallTime != 0) {
taskIdList.addAll(
createAndSaveSubmissionTasks(registryClient, gatewayId, preferredJobSubmissionInterface, processModel, leftWallTime));
}
} else {
taskIdList.addAll(
createAndSaveSubmissionTasks(registryClient, gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
}
}
} else {
taskIdList.addAll(createAndSaveSubmissionTasks(registryClient, gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
}
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
}
// update process scheduling
registryClient.updateProcess(processModel, processModel.getProcessId());
return getTaskDag(taskIdList);
} catch (Exception e) {
throw new OrchestratorException("Error during creating process", e);
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
}
public String createAndSaveIntermediateOutputFetchingTasks(String gatewayId, ProcessModel processModel,
ProcessModel parentProcess) throws OrchestratorException {
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
List<String> taskIdList = new ArrayList<>();
taskIdList.addAll(createAndSaveIntermediateOutputDataStagingTasks(processModel, gatewayId, parentProcess));
// update process scheduling
registryClient.updateProcess(processModel, processModel.getProcessId());
return getTaskDag(taskIdList);
} catch (Exception e) {
throw new OrchestratorException("Error during creating process", e);
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
}
private String getTaskDag(List<String> taskIdList) {
if (taskIdList.isEmpty()) {
return "";
}
StringBuilder sb = new StringBuilder();
for (String s : taskIdList) {
sb.append(s).append(","); // comma separated values
}
String dag = sb.toString();
return dag.substring(0, dag.length() - 1); // remove last comma
}
private List<String> createAndSaveEnvSetupTask(RegistryService.Client registryClient, String gatewayId,
ProcessModel processModel)
throws TException, AiravataException, OrchestratorException {
List<String> envTaskIds = new ArrayList<>();
TaskModel envSetupTask = new TaskModel();
envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
envSetupTask.setTaskStatuses(Arrays.asList(new TaskStatus(TaskState.CREATED)));
envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
envSetupTask.setLastUpdateTime(AiravataUtils.getCurrentTimestamp().getTime());
envSetupTask.setParentProcessId(processModel.getProcessId());
EnvironmentSetupTaskModel envSetupSubModel = new EnvironmentSetupTaskModel();
envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(processModel, gatewayId));
String scratchLocation = OrchestratorUtils.getScratchLocation(processModel, gatewayId);
String workingDir = scratchLocation + File.separator + processModel.getProcessId();
envSetupSubModel.setLocation(workingDir);
byte[] envSetupSub = ThriftUtils.serializeThriftObject(envSetupSubModel);
envSetupTask.setSubTaskModel(envSetupSub);
envSetupTask.setMaxRetry(3);
envSetupTask.setCurrentRetry(0);
String envSetupTaskId = (String) registryClient.addTask(envSetupTask, processModel.getProcessId());
envSetupTask.setTaskId(envSetupTaskId);
envTaskIds.add(envSetupTaskId);
return envTaskIds;
}
public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId)
throws AiravataException, OrchestratorException {
List<String> dataStagingTaskIds = new ArrayList<>();
List<InputDataObjectType> processInputs = processModel.getProcessInputs();
sortByInputOrder(processInputs);
if (processInputs != null) {
for (InputDataObjectType processInput : processInputs) {
DataType type = processInput.getType();
switch (type) {
case STDERR:
break;
case STDOUT:
break;
case URI:
case URI_COLLECTION:
if ((processInput.getValue() == null || processInput.getValue().equals("")) && !processInput.isIsRequired()) {
logger.debug("Skipping input data staging task for {} since value is empty and not required", processInput.getName());
break;
}
final RegistryService.Client registryClient = getRegistryServiceClient();
try {
TaskModel inputDataStagingTask = getInputDataStagingTask(registryClient, processModel, processInput, gatewayId);
String taskId = registryClient
.addTask( inputDataStagingTask, processModel.getProcessId());
inputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
} catch (Exception e) {
throw new AiravataException("Error while serializing data staging sub task model", e);
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
break;
default:
// nothing to do
break;
}
}
}
return dataStagingTaskIds;
}
public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId)
throws AiravataException, TException, OrchestratorException {
final RegistryService.Client registryClient = getRegistryServiceClient();
List<String> dataStagingTaskIds = new ArrayList<>();
try {
List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
String appName = OrchestratorUtils.getApplicationInterfaceName(processModel);
if (processOutputs != null) {
for (OutputDataObjectType processOutput : processOutputs) {
DataType type = processOutput.getType();
switch (type) {
case STDOUT:
if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stdout");
}
createOutputDataSatagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
case STDERR:
if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stderr");
}
createOutputDataSatagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
case URI:
case URI_COLLECTION:
createOutputDataSatagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
default:
// nothing to do
break;
}
}
}
try {
if (isArchive(registryClient, processModel, orchestratorContext)) {
createArchiveDataStatgingTask(registryClient, processModel, gatewayId, dataStagingTaskIds);
}
} catch (Exception e) {
throw new AiravataException("Error! Application interface retrieval failed", e);
}
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
return dataStagingTaskIds;
}
public List<String> createAndSaveIntermediateOutputDataStagingTasks(ProcessModel processModel, String gatewayId,
ProcessModel parentProcess)
throws AiravataException, TException, OrchestratorException {
final RegistryService.Client registryClient = getRegistryServiceClient();
List<String> dataStagingTaskIds = new ArrayList<>();
try {
List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
String appName = OrchestratorUtils.getApplicationInterfaceName(processModel);
if (processOutputs != null) {
for (OutputDataObjectType processOutput : processOutputs) {
DataType type = processOutput.getType();
switch (type) {
case STDOUT:
if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stdout");
}
createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId,
parentProcess, dataStagingTaskIds, processOutput);
break;
case STDERR:
if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stderr");
}
createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId,
parentProcess, dataStagingTaskIds, processOutput);
break;
case URI:
case URI_COLLECTION:
createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId,
parentProcess, dataStagingTaskIds, processOutput);
break;
default:
// nothing to do
break;
}
}
}
} finally {
if (registryClient != null) {
ThriftUtils.close(registryClient);
}
}
return dataStagingTaskIds;
}
private boolean isArchive(RegistryService.Client registryClient, ProcessModel processModel, OrchestratorContext orchestratorContext) throws TException {
ApplicationInterfaceDescription appInterface = registryClient
.getApplicationInterface(processModel.getApplicationInterfaceId());
return appInterface.isArchiveWorkingDirectory();
}
private void createArchiveDataStatgingTask(RegistryService.Client registryClient, ProcessModel processModel,
String gatewayId,
List<String> dataStagingTaskIds) throws AiravataException, TException, OrchestratorException {
TaskModel archiveTask = null;
try {
archiveTask = getOutputDataStagingTask(registryClient, processModel, null, gatewayId, null);
} catch (TException e) {
throw new AiravataException("Error! DataStaging sub task serialization failed", e);
}
String taskId = registryClient
.addTask(archiveTask, processModel.getProcessId());
archiveTask.setTaskId(taskId);
dataStagingTaskIds.add(archiveTask.getTaskId());
}
private void createOutputDataSatagingTasks(RegistryService.Client registryClient, ProcessModel processModel,
String gatewayId,
List<String> dataStagingTaskIds,
OutputDataObjectType processOutput) throws AiravataException, OrchestratorException {
try {
TaskModel outputDataStagingTask = getOutputDataStagingTask(registryClient, processModel, processOutput, gatewayId, null);
String taskId = registryClient
.addTask(outputDataStagingTask, processModel.getProcessId());
outputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
} catch (TException e) {
throw new AiravataException("Error while serializing data staging sub task model", e);
}
}
private void createIntermediateOutputDataStagingTasks(RegistryService.Client registryClient,
ProcessModel processModel,
String gatewayId,
ProcessModel parentProcess,
List<String> dataStagingTaskIds,
OutputDataObjectType processOutput) throws AiravataException, OrchestratorException {
try {
TaskModel outputDataStagingTask = getOutputDataStagingTask(registryClient, processModel, processOutput,
gatewayId, parentProcess);
outputDataStagingTask.setTaskType(TaskTypes.OUTPUT_FETCHING);
String taskId = registryClient
.addTask(outputDataStagingTask, processModel.getProcessId());
outputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
} catch (TException e) {
throw new AiravataException("Error while serializing data staging sub task model", e);
}
}
private List<String> createAndSaveSubmissionTasks(RegistryService.Client registryClient, String gatewayId,
JobSubmissionInterface jobSubmissionInterface,
ProcessModel processModel,
int wallTime)
throws TException, OrchestratorException {
JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
MonitorMode monitorMode = null;
if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
monitorMode = sshJobSubmission.getMonitorMode();
} else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
monitorMode = MonitorMode.FORK;
} else if(jobSubmissionProtocol == JobSubmissionProtocol.LOCAL){
monitorMode = MonitorMode.LOCAL;
} else if (jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) {
monitorMode = MonitorMode.CLOUD_JOB_MONITOR;
}else {
logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
}
List<String> submissionTaskIds = new ArrayList<>();
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
taskModel.setCreationTime(System.currentTimeMillis());
taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatuses(Arrays.asList(taskStatus));
taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
submissionSubTask.setMonitorMode(monitorMode);
submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
submissionSubTask.setWallTime(wallTime);
byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
taskModel.setSubTaskModel(bytes);
taskModel.setMaxRetry(1);
taskModel.setCurrentRetry(0);
String taskId = registryClient.addTask(taskModel, processModel.getProcessId());
taskModel.setTaskId(taskId);
submissionTaskIds.add(taskModel.getTaskId());
// create monitor task for this Email based monitor mode job
if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR
|| monitorMode == MonitorMode.CLOUD_JOB_MONITOR) {
TaskModel monitorTaskModel = new TaskModel();
monitorTaskModel.setParentProcessId(processModel.getProcessId());
monitorTaskModel.setCreationTime(System.currentTimeMillis());
monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
monitorTaskModel.setTaskStatuses(Arrays.asList(monitorTaskStatus));
monitorTaskModel.setTaskType(TaskTypes.MONITORING);
MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
monitorSubTaskModel.setMonitorMode(monitorMode);
monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
String mTaskId = (String) registryClient.addTask(monitorTaskModel, processModel.getProcessId());
monitorTaskModel.setTaskId(mTaskId);
submissionTaskIds.add(monitorTaskModel.getTaskId());
}
return submissionTaskIds;
}
private void sortByInputOrder(List<InputDataObjectType> processInputs) {
Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
@Override
public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
}
});
}
private TaskModel getInputDataStagingTask(RegistryService.Client registryClient, ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws TException, AiravataException, OrchestratorException {
// create new task model for this task
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatuses(Arrays.asList(taskStatus));
taskModel.setTaskType(TaskTypes.DATA_STAGING);
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
ComputeResourceDescription computeResource = registryClient.
getComputeResource(processModel.getComputeResourceId());
String scratchLocation = OrchestratorUtils.getScratchLocation(processModel, gatewayId);
String workingDir = (scratchLocation.endsWith(File.separator) ? scratchLocation : scratchLocation + File.separator) +
processModel.getProcessId() + File.separator;
URI destination = null;
try {
DataMovementProtocol dataMovementProtocol =
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
String loginUserName = OrchestratorUtils.getLoginUserName(processModel, gatewayId);
StringBuilder destinationPath = new StringBuilder(workingDir);
Optional.ofNullable(processInput.getOverrideFilename()).ifPresent(destinationPath::append); //If an override filename is provided
destination = new URI(dataMovementProtocol.name(),
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
destinationPath.toString(), null, null);
} catch (URISyntaxException e) {
throw new OrchestratorException("Error while constructing destination file URI", e);
}
submodel.setType(DataStageType.INPUT);
submodel.setSource(processInput.getValue());
submodel.setProcessInput(processInput);
submodel.setDestination(destination.toString());
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
taskModel.setMaxRetry(3);
taskModel.setCurrentRetry(0);
return taskModel;
}
private TaskModel getOutputDataStagingTask(RegistryService.Client registryClient, ProcessModel processModel,
OutputDataObjectType processOutput, String gatewayId,
ProcessModel parentProcess) throws TException, AiravataException, OrchestratorException {
try {
// create new task model for this task
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatuses(Arrays.asList(taskStatus));
taskModel.setTaskType(TaskTypes.DATA_STAGING);
ComputeResourceDescription computeResource = registryClient.
getComputeResource(processModel.getComputeResourceId());
String workingDir = OrchestratorUtils.getScratchLocation(processModel, gatewayId)
+ File.separator + (parentProcess == null ? processModel.getProcessId() : parentProcess.getProcessId()) + File.separator;
DataStagingTaskModel submodel = new DataStagingTaskModel();
DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
URI source = null;
try {
String loginUserName = OrchestratorUtils.getLoginUserName(processModel, gatewayId);
if (processOutput != null) {
submodel.setType(DataStageType.OUPUT);
submodel.setProcessOutput(processOutput);
source = new URI(dataMovementProtocol.name(),
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
workingDir + processOutput.getValue(), null, null);
} else {
// archive
submodel.setType(DataStageType.ARCHIVE_OUTPUT);
source = new URI(dataMovementProtocol.name(),
loginUserName,
computeResource.getHostName(),
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
workingDir, null, null);
}
} catch (URISyntaxException e) {
throw new OrchestratorException("Error while constructing source file URI", e);
}
// We don't know destination location at this time, data staging task will set this.
// because destination is required field we set dummy destination
submodel.setSource(source.toString());
// We don't know destination location at this time, data staging task will set this.
// because destination is required field we set dummy destination
submodel.setDestination("dummy://temp/file/location");
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
taskModel.setMaxRetry(3);
taskModel.setCurrentRetry(0);
return taskModel;
} catch (OrchestratorException e) {
throw new OrchestratorException("Error occurred while retrieving data movement from app catalog", e);
}
}
private RegistryService.Client getRegistryServiceClient() {
try {
final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
final String serverHost = ServerSettings.getRegistryServerHost();
return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
} catch (RegistryServiceException|ApplicationSettingsException e) {
throw new RuntimeException("Unable to create registry client...", e);
}
}
}