| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.airavata.gfac.impl; |
| |
| import org.apache.airavata.common.exception.AiravataException; |
| import org.apache.airavata.common.utils.AiravataUtils; |
| import org.apache.airavata.common.utils.ServerSettings; |
| import org.apache.airavata.common.utils.ThriftUtils; |
| import org.apache.airavata.common.utils.ZkConstants; |
| import org.apache.airavata.gfac.core.GFacConstants; |
| import org.apache.airavata.gfac.core.GFacEngine; |
| import org.apache.airavata.gfac.core.GFacException; |
| import org.apache.airavata.gfac.core.GFacUtils; |
| import org.apache.airavata.gfac.core.cluster.ServerInfo; |
| import org.apache.airavata.gfac.core.context.ProcessContext; |
| import org.apache.airavata.gfac.core.context.TaskContext; |
| import org.apache.airavata.gfac.core.monitor.JobMonitor; |
| import org.apache.airavata.gfac.core.task.JobSubmissionTask; |
| import org.apache.airavata.gfac.core.task.Task; |
| import org.apache.airavata.gfac.core.task.TaskException; |
| import org.apache.airavata.gfac.impl.task.DataStageTask; |
| import org.apache.airavata.gfac.impl.task.DataStreamingTask; |
| import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask; |
| import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; |
| import org.apache.airavata.model.appcatalog.computeresource.*; |
| import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; |
| import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; |
| import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; |
| import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; |
| import org.apache.airavata.model.application.io.DataType; |
| import org.apache.airavata.model.application.io.InputDataObjectType; |
| import org.apache.airavata.model.application.io.OutputDataObjectType; |
| import org.apache.airavata.model.commons.ErrorModel; |
| import org.apache.airavata.model.data.movement.SecurityProtocol; |
| import org.apache.airavata.model.job.JobModel; |
| import org.apache.airavata.model.process.ProcessModel; |
| import org.apache.airavata.model.status.JobState; |
| import org.apache.airavata.model.status.JobStatus; |
| import org.apache.airavata.model.status.ProcessState; |
| import org.apache.airavata.model.status.ProcessStatus; |
| import org.apache.airavata.model.status.TaskState; |
| import org.apache.airavata.model.status.TaskStatus; |
| import org.apache.airavata.model.task.*; |
| import org.apache.airavata.registry.cpi.AppCatalog; |
| import org.apache.airavata.registry.cpi.AppCatalogException; |
| import org.apache.airavata.registry.cpi.ExpCatChildDataType; |
| import org.apache.airavata.registry.cpi.ExperimentCatalog; |
| import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; |
| import org.apache.airavata.registry.cpi.RegistryException; |
| import org.apache.airavata.registry.cpi.utils.Constants; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.utils.ZKPaths; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.*; |
| |
| public class GFacEngineImpl implements GFacEngine { |
| |
| private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class); |
| |
| public GFacEngineImpl() throws GFacException { |
| |
| } |
| |
| @Override |
| public ProcessContext populateProcessContext(String processId, String gatewayId, String |
| tokenId) throws GFacException { |
| ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId); |
| try { |
| AppCatalog appCatalog = Factory.getDefaultAppCatalog(); |
| processContext.setAppCatalog(appCatalog); |
| ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog(); |
| processContext.setExperimentCatalog(expCatalog); |
| processContext.setCuratorClient(Factory.getCuratorClient()); |
| processContext.setStatusPublisher(Factory.getStatusPublisher()); |
| |
| ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId); |
| processContext.setProcessModel(processModel); |
| |
| try { |
| checkRecoveryWithCancel(processContext); |
| } catch (Exception e) { |
| log.error("expId: {}, processId: {}, Error while checking process cancel data in zookeeper", |
| processContext.getExperimentId(), processContext.getProcessId()); |
| } |
| |
| GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId); |
| processContext.setGatewayResourceProfile(gatewayProfile); |
| ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().getComputeResourcePreference |
| (gatewayId, processModel.getComputeResourceId()); |
| //FIXME: Temporary revert, this needs a proper fix. |
| // String scratchLocation = Factory.getScratchLocation(processContext); |
| String scratchLocation = computeResourcePreference.getScratchLocation(); |
| scratchLocation = scratchLocation + File.separator + processId + File.separator; |
| processContext.setComputeResourcePreference(computeResourcePreference); |
| StoragePreference storagePreference = appCatalog.getGatewayProfile().getStoragePreference(gatewayId, processModel.getStorageResourceId()); |
| StorageResourceDescription storageResource = appCatalog.getStorageResource().getStorageResource(processModel.getStorageResourceId()); |
| if (storageResource != null){ |
| processContext.setStorageResource(storageResource); |
| }else { |
| // we need to fail the process which will fail the experiment |
| processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED)); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| throw new GFacException("expId: " + processModel.getExperimentId() + ", processId: " + processId + |
| ":- Couldn't find storage resource for storage resource id :" + processModel.getStorageResourceId()); |
| } |
| if (storagePreference != null) { |
| processContext.setStoragePreference(storagePreference); |
| } else { |
| // we need to fail the process which will fail the experiment |
| processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED)); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| throw new GFacException("expId: " + processModel.getExperimentId() + ", processId: " + processId + |
| ":- Couldn't find storage preference for storage resource id :" + processModel.getStorageResourceId()); |
| |
| } |
| |
| |
| /* StorageResourceDescription storageResource = appCatalog.getStorageResource().getStorageResource(processModel.getStorageResourceId()); |
| if (storageResource != null){ |
| processContext.setStorageResource(storageResource); |
| }*/ |
| processContext.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource |
| (processContext.getComputeResourcePreference().getComputeResourceId())); |
| processContext.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment() |
| .getApplicationDeployement(processModel.getApplicationDeploymentId())); |
| ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface() |
| .getApplicationInterface(processModel.getApplicationInterfaceId()); |
| processContext.setApplicationInterfaceDescription(applicationInterface); |
| String computeResourceId = processContext.getComputeResourceDescription().getComputeResourceId(); |
| String hostName = Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(computeResourceId).getHostName(); |
| ServerInfo serverInfo = new ServerInfo(Factory.getLoginUserName(processContext), hostName); |
| processContext.setServerInfo(serverInfo); |
| List<OutputDataObjectType> applicationOutputs = applicationInterface.getApplicationOutputs(); |
| if (applicationOutputs != null && !applicationOutputs.isEmpty()) { |
| for (OutputDataObjectType outputDataObjectType : applicationOutputs) { |
| if (outputDataObjectType.getType().equals(DataType.STDOUT)) { |
| if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) { |
| outputDataObjectType.setValue(scratchLocation + applicationInterface.getApplicationName() + ".stdout"); |
| processContext.setStdoutLocation(scratchLocation + applicationInterface.getApplicationName() + ".stdout"); |
| } else { |
| processContext.setStdoutLocation(outputDataObjectType.getValue()); |
| } |
| } |
| if (outputDataObjectType.getType().equals(DataType.STDERR)) { |
| if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) { |
| String stderrLocation = scratchLocation + applicationInterface.getApplicationName() + ".stderr"; |
| outputDataObjectType.setValue(stderrLocation); |
| processContext.setStderrLocation(stderrLocation); |
| } else { |
| processContext.setStderrLocation(outputDataObjectType.getValue()); |
| } |
| } |
| } |
| } |
| expCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId); |
| processModel.setProcessOutputs(applicationOutputs); |
| |
| processContext.setSshKeyAuthentication(Factory.getComputerResourceSSHKeyAuthentication(processContext)); |
| if (processContext.getJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) { |
| // process monitor mode set in getResourceJobManager method, but unicore doesn't have resource job manager. |
| // hence we set process monitor mode here. |
| processContext.setMonitorMode(MonitorMode.FORK); |
| } else { |
| processContext.setResourceJobManager(getResourceJobManager(processContext)); |
| processContext.setJobSubmissionRemoteCluster(Factory.getJobSubmissionRemoteCluster(processContext)); |
| processContext.setDataMovementRemoteCluster(Factory.getDataMovementRemoteCluster(processContext)); |
| } |
| |
| String inputPath = ServerSettings.getLocalDataLocation(); |
| if (inputPath != null) { |
| processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") + |
| processContext.getProcessId()); |
| } |
| |
| List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId); |
| if (jobModels != null && !jobModels.isEmpty()) { |
| if (jobModels.size() > 1) { |
| log.warn("Process has more than one job model, take first one"); |
| } |
| processContext.setJobModel(((JobModel) jobModels.get(0))); |
| } |
| return processContext; |
| } catch (AppCatalogException e) { |
| String msg = "App catalog access exception "; |
| updateProcessFailure(processContext, msg); |
| throw new GFacException(msg, e); |
| } catch (RegistryException e) { |
| String msg = "Registry access exception"; |
| updateProcessFailure(processContext, msg); |
| throw new GFacException(msg, e); |
| } catch (AiravataException e) { |
| String msg = "Remote cluster initialization error"; |
| updateProcessFailure(processContext, msg); |
| throw new GFacException(msg, e); |
| } |
| |
| } |
| |
| private void checkRecoveryWithCancel(ProcessContext processContext) throws Exception { |
| CuratorFramework curatorClient = processContext.getCuratorClient(); |
| String experimentId = processContext.getExperimentId(); |
| String processId = processContext.getProcessId(); |
| String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath( |
| ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); |
| log.info("expId: {}, processId: {}, get process cancel data from zookeeper node {}", experimentId, processId, processCancelNodePath); |
| byte[] bytes = curatorClient.getData().forPath(processCancelNodePath); |
| if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) { |
| processContext.setRecoveryWithCancel(true); |
| } |
| } |
| |
| @Override |
| public void executeProcess(ProcessContext processContext) throws GFacException { |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| String taskDag = processContext.getTaskDag(); |
| List<String> taskIds = GFacUtils.parseTaskDag(taskDag); |
| processContext.setTaskExecutionOrder(taskIds); |
| executeTaskListFrom(processContext, taskIds.get(0)); |
| } |
| |
| private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException { |
| // checkpoint |
| if (processContext.isInterrupted() && processContext.getProcessState() != ProcessState.MONITORING) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| List<TaskModel> taskList = processContext.getTaskList(); |
| Map<String, TaskModel> taskMap = processContext.getTaskMap(); |
| boolean fastForward = true; |
| for (String taskId : processContext.getTaskExecutionOrder()) { |
| if (fastForward) { |
| if (taskId.equalsIgnoreCase(startingTaskId)) { |
| fastForward = false; |
| } else { |
| continue; |
| } |
| } |
| |
| TaskModel taskModel = taskMap.get(taskId); |
| processContext.setCurrentExecutingTaskModel(taskModel); |
| TaskTypes taskType = taskModel.getTaskType(); |
| TaskContext taskContext = getTaskContext(processContext); |
| taskContext.setTaskModel(taskModel); |
| ProcessStatus status = null; |
| switch (taskType) { |
| case ENV_SETUP: |
| status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| // checkpoint |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| configureWorkspace(taskContext, processContext.isRecovery()); |
| // checkpoint |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| break; |
| case DATA_STAGING: |
| try { |
| // checkpoint |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| DataStagingTaskModel subTaskModel = (DataStagingTaskModel) taskContext.getSubTaskModel(); |
| DataStageType type = subTaskModel.getType(); |
| switch (type) { |
| case INPUT: |
| status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| taskContext.setProcessInput(subTaskModel.getProcessInput()); |
| inputDataStaging(taskContext, processContext.isRecovery()); |
| break; |
| case OUPUT: |
| status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| taskContext.setProcessOutput(subTaskModel.getProcessOutput()); |
| outputDataStaging(taskContext, processContext.isRecovery(), false); |
| break; |
| case ARCHIVE_OUTPUT: |
| status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| outputDataStaging(taskContext, processContext.isRecovery(), true); |
| break; |
| |
| } |
| // checkpoint |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| } catch (TException e) { |
| throw new GFacException(e); |
| } |
| break; |
| |
| case JOB_SUBMISSION: |
| // checkpoint |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return; |
| } |
| status = new ProcessStatus(ProcessState.EXECUTING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| executeJobSubmission(taskContext, processContext.isRecovery()); |
| // Don't put any checkpoint in between JobSubmission and Monitoring tasks |
| |
| JobStatus jobStatus = processContext.getJobModel().getJobStatus(); |
| if (jobStatus != null && (jobStatus.getJobState() == JobState.SUBMITTED |
| || jobStatus.getJobState() == JobState.QUEUED || jobStatus.getJobState() == JobState.ACTIVE)) { |
| |
| List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); |
| if (processOutputs != null && !processOutputs.isEmpty()){ |
| for (OutputDataObjectType output : processOutputs){ |
| try { |
| if (output.isOutputStreaming()){ |
| TaskModel streamingTaskModel = new TaskModel(); |
| streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING); |
| streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED)); |
| streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); |
| streamingTaskModel.setParentProcessId(processContext.getProcessId()); |
| TaskContext streamingTaskContext = getTaskContext(processContext); |
| DataStagingTaskModel submodel = new DataStagingTaskModel(); |
| submodel.setType(DataStageType.OUPUT); |
| submodel.setProcessOutput(output); |
| URI source = new URI(processContext.getDataMovementProtocol().name(), |
| Factory.getLoginUserName(processContext), |
| processContext.getComputeResourceDescription().getHostName(), |
| 22, |
| processContext.getWorkingDir() + output.getValue(), null, null); |
| submodel.setSource(source.getPath()); |
| submodel.setDestination("dummy://temp/file/location"); |
| streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); |
| String streamTaskId = (String) processContext.getExperimentCatalog() |
| .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId()); |
| streamingTaskModel.setTaskId(streamTaskId); |
| streamingTaskContext.setTaskModel(streamingTaskModel); |
| executeDataStreaming(streamingTaskContext, processContext.isRecovery()); |
| } |
| } catch (URISyntaxException | TException | RegistryException e) { |
| log.error("Error while streaming output " + output.getValue()); |
| } |
| } |
| } |
| } |
| break; |
| |
| case MONITORING: |
| status = new ProcessStatus(ProcessState.MONITORING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| executeJobMonitoring(taskContext, processContext.isRecovery()); |
| break; |
| |
| case ENV_CLEANUP: |
| // TODO implement environment clean up task logic |
| break; |
| |
| default: |
| throw new GFacException("Unsupported Task type"); |
| |
| } |
| |
| |
| if (processContext.isPauseTaskExecution()) { |
| return; // If any task put processContext to wait, the same task must continue processContext execution. |
| } |
| |
| } |
| processContext.setComplete(true); |
| } |
| |
| private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException { |
| ProcessContext processContext = taskContext.getParentProcessContext(); |
| TaskStatus taskStatus; |
| JobMonitor monitorService = null; |
| try { |
| taskStatus = new TaskStatus(TaskState.EXECUTING); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel()); |
| monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode()); |
| if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) { |
| monitorService.monitor(processContext.getJobModel().getJobId(), taskContext); |
| } else { |
| log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId()); |
| } |
| } catch (AiravataException | TException e) { |
| taskStatus = new TaskStatus(TaskState.FAILED); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskStatus.setReason("Couldn't handover jobId {} to monitor service, monitor service type {}"); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ") |
| .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) |
| .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ") |
| .append(taskStatus.getReason()).toString(); |
| ErrorModel errorModel = new ErrorModel(); |
| errorModel.setUserFriendlyMessage("Error while staging output data"); |
| errorModel.setActualErrorMessage(errorMsg); |
| GFacUtils.saveTaskError(taskContext, errorModel); |
| throw new GFacException(e); |
| } |
| if (processContext.isPauseTaskExecution()) { |
| // we won't update task status to complete, job monitor will update task status to complete after it complete monitoring for this job id. |
| return; |
| } |
| taskStatus = new TaskStatus(TaskState.COMPLETED); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskStatus.setReason("Successfully handed over job id to job monitor service."); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| } |
| |
| private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException { |
| TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| try { |
| JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel()); |
| JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol()); |
| |
| ProcessContext processContext = taskContext.getParentProcessContext(); |
| taskStatus = executeTask(taskContext, jobSubmissionTask, recovery); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| checkFailures(taskContext, taskStatus, jobSubmissionTask); |
| return false; |
| } catch (TException e) { |
| throw new GFacException(e); |
| } |
| } |
| |
| private void executeDataStreaming(TaskContext taskContext, boolean recovery) throws GFacException { |
| TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| try { |
| DataStreamingTask dataStreamingTask = new DataStreamingTask(); |
| taskStatus = executeTask(taskContext, dataStreamingTask, recovery); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| } catch (Exception e) { |
| throw new GFacException(e); |
| } |
| } |
| |
| private boolean configureWorkspace(TaskContext taskContext, boolean recover) throws GFacException { |
| |
| try { |
| EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel(); |
| Task envSetupTask = null; |
| if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) { |
| envSetupTask = new EnvironmentSetupTask(); |
| } else { |
| throw new GFacException("Unsupported security protocol, Airavata doesn't support " + |
| subTaskModel.getProtocol().name() + " protocol yet."); |
| } |
| TaskStatus status = new TaskStatus(TaskState.EXECUTING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(status); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| TaskStatus taskStatus = executeTask(taskContext, envSetupTask, recover); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| if (taskStatus.getState() == TaskState.FAILED) { |
| log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " + |
| "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext |
| .getParentProcessContext().getProcessId(), taskContext.getTaskId(), envSetupTask.getType |
| ().name(), taskStatus.getReason()); |
| ProcessContext processContext = taskContext.getParentProcessContext(); |
| String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ") |
| .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) |
| .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Environment Setup failed. Reason: ") |
| .append(taskStatus.getReason()).toString(); |
| ErrorModel errorModel = new ErrorModel(); |
| errorModel.setUserFriendlyMessage("Error while environment setup"); |
| errorModel.setActualErrorMessage(errorMsg); |
| GFacUtils.saveTaskError(taskContext, errorModel); |
| throw new GFacException("Error while environment setup"); |
| } |
| } catch (TException e) { |
| throw new GFacException("Couldn't get environment setup task model", e); |
| } |
| return false; |
| } |
| |
| private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException, TException { |
| TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| ProcessContext processContext = taskContext.getParentProcessContext(); |
| // handle URI_COLLECTION input data type |
| Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); |
| if (taskContext.getProcessInput().getType() == DataType.URI_COLLECTION) { |
| String values = taskContext.getProcessInput().getValue(); |
| String[] multiple_inputs = values.split(GFacConstants.MULTIPLE_INPUTS_SPLITTER); |
| DataStagingTaskModel subTaskModel = (DataStagingTaskModel) taskContext.getSubTaskModel(); |
| for (String input : multiple_inputs) { |
| taskContext.getProcessInput().setValue(input); |
| subTaskModel.setSource(input); |
| taskStatus = executeTask(taskContext, dMoveTask, false); |
| } |
| taskContext.getProcessInput().setValue(values); |
| } else { |
| taskStatus = executeTask(taskContext, dMoveTask, false); |
| } |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| checkFailures(taskContext, taskStatus, dMoveTask); |
| return false; |
| } |
| |
| private void checkFailures(TaskContext taskContext, TaskStatus taskStatus, Task task) throws GFacException { |
| if (taskStatus.getState() == TaskState.FAILED) { |
| log.error("expId: {}, processId: {}, taskId: {} type: {},:- " + task.getType().toString() + " failed, " + |
| "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext |
| .getParentProcessContext().getProcessId(), taskContext.getTaskId(), task.getType |
| ().name(), taskStatus.getReason()); |
| String errorMsg = new StringBuilder("expId: ").append(taskContext.getParentProcessContext().getExperimentId()).append(", processId: ") |
| .append(taskContext.getParentProcessContext().getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) |
| .append(", type: ").append(taskContext.getTaskType().name()).append(" :- " + task.getType().toString() + " failed. Reason: ") |
| .append(taskStatus.getReason()).toString(); |
| ErrorModel errorModel = new ErrorModel(); |
| errorModel.setUserFriendlyMessage("Error while executing " + task.getType() + " task" ); |
| errorModel.setActualErrorMessage(errorMsg); |
| GFacUtils.saveTaskError(taskContext, errorModel); |
| throw new GFacException("Error: userFriendly msg :" + errorModel.getUserFriendlyMessage() + ", actual msg :" |
| + errorModel.getActualErrorMessage()); |
| } |
| } |
| |
| @Override |
| public void recoverProcess(ProcessContext processContext) throws GFacException { |
| processContext.setRecovery(true); |
| String taskDag = processContext.getProcessModel().getTaskDag(); |
| List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag); |
| processContext.setTaskExecutionOrder(taskExecutionOrder); |
| Map<String, TaskModel> taskMap = processContext.getTaskMap(); |
| String recoverTaskId = null; |
| String previousTaskId = null; |
| TaskModel taskModel = null; |
| for (String taskId : taskExecutionOrder) { |
| taskModel = taskMap.get(taskId); |
| TaskState state = taskModel.getTaskStatus().getState(); |
| if (state == TaskState.CREATED || state == TaskState.EXECUTING) { |
| recoverTaskId = taskId; |
| break; |
| } |
| previousTaskId = taskId; |
| } |
| final String rTaskId = recoverTaskId; |
| final String pTaskId = previousTaskId; |
| if (recoverTaskId != null) { |
| if (processContext.isRecoveryWithCancel()) { |
| cancelJobSubmission(processContext, rTaskId, pTaskId); |
| } |
| continueProcess(processContext, recoverTaskId); |
| } else { |
| log.error("expId: {}, processId: {}, Error while recovering process, couldn't find recovery task", |
| processContext.getExperimentId(), processContext.getProcessId()); |
| } |
| |
| |
| } |
| |
| private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) { |
| new Thread(() -> { |
| try { |
| processContext.setCancel(true); |
| ProcessState processState = processContext.getProcessState(); |
| List<Object> jobModels = null; |
| switch (processState) { |
| case EXECUTING: |
| jobModels = processContext.getExperimentCatalog().get( |
| ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID, |
| rTaskId); |
| break; |
| case MONITORING: |
| if (pTaskId != null) { |
| jobModels = processContext.getExperimentCatalog().get( |
| ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID, |
| pTaskId); |
| } |
| } |
| |
| if (jobModels != null && !jobModels.isEmpty()) { |
| JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1); |
| if (jobModel.getJobId() != null) { |
| processContext.setJobModel(jobModel); |
| log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(), |
| processContext.getProcessId(), jobModel.getJobId()); |
| cancelProcess(processContext); |
| log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(), |
| processContext.getProcessId(), jobModel.getJobId()); |
| } else { |
| log.error("expId: {}, processId: {}, Couldn't find jobId in jobModel, aborting process recovery", |
| processContext.getExperimentId(), processContext.getProcessId()); |
| } |
| } |
| } catch (GFacException e) { |
| log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode", |
| processContext.getExperimentId(), processContext.getProcessId()); |
| } catch (RegistryException e) { |
| log.error("expId: {}, processId: {}, Error while getting job model for taskId {}, " + |
| "couldn't cancel process which is in recovery mode", processContext.getExperimentId(), |
| processContext.getProcessId(), rTaskId); |
| } |
| }).start(); |
| } |
| |
| private JobModel getJobModel(ProcessContext processContext) { |
| try { |
| return GFacUtils.getJobModel(processContext); |
| } catch (RegistryException e) { |
| log.error("Error while retrieving jobId,", e); |
| return null; |
| } |
| } |
| |
| @Override |
| public void continueProcess(ProcessContext processContext, String taskId) throws GFacException { |
| executeTaskListFrom(processContext, taskId); |
| } |
| |
| /** |
| * @param processContext |
| * @param recovery |
| * @return <code>true</code> if you need to interrupt processing <code>false</code> otherwise. |
| * @throws GFacException |
| */ |
| private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException { |
| ProcessStatus status = new ProcessStatus(ProcessState.POST_PROCESSING); |
| status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| processContext.setProcessStatus(status); |
| GFacUtils.saveAndPublishProcessStatus(processContext); |
| // taskCtx = getEnvCleanupTaskContext(processContext); |
| if (processContext.isInterrupted()) { |
| GFacUtils.handleProcessInterrupt(processContext); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * @param taskContext |
| * @param recovery |
| * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise. |
| * @throws GFacException |
| */ |
| private boolean outputDataStaging(TaskContext taskContext, boolean recovery, boolean isArchive) throws GFacException { |
| TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| ProcessContext processContext = taskContext.getParentProcessContext(); |
| Task dMoveTask = null; |
| if (isArchive) { |
| dMoveTask = Factory.getArchiveTask(); |
| } else { |
| dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); |
| } |
| taskStatus = executeTask(taskContext, dMoveTask, recovery); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskContext.setTaskStatus(taskStatus); |
| GFacUtils.saveAndPublishTaskStatus(taskContext); |
| |
| if (taskStatus.getState() == TaskState.FAILED) { |
| log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " + |
| "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext |
| .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType |
| ().name(), taskStatus.getReason()); |
| |
| String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ") |
| .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) |
| .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Output staging failed. Reason: ") |
| .append(taskStatus.getReason()).toString(); |
| ErrorModel errorModel = new ErrorModel(); |
| errorModel.setUserFriendlyMessage("Error while staging output data"); |
| errorModel.setActualErrorMessage(errorMsg); |
| GFacUtils.saveTaskError(taskContext, errorModel); |
| } |
| return false; |
| } |
| |
| |
| @Override |
| public void cancelProcess(ProcessContext processContext) throws GFacException { |
| if (processContext != null) { |
| switch (processContext.getProcessState()) { |
| case MONITORING: case EXECUTING: |
| // get job submission task and invoke cancel |
| JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()); |
| TaskContext taskCtx = getJobSubmissionTaskContext(processContext); |
| executeCancel(taskCtx, jobSubmissionTask); |
| break; |
| case COMPLETED: case FAILED: case CANCELED : case CANCELLING: |
| log.warn("Process cancel trigger for already {} process", processContext.getProcessState().name()); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException { |
| TaskStatus taskStatus = null; |
| if (recover) { |
| taskStatus = task.recover(taskCtx); |
| } else { |
| taskStatus = task.execute(taskCtx); |
| } |
| return taskStatus; |
| } |
| |
| private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException { |
| try { |
| JobStatus oldJobStatus = jSTask.cancel(taskContext); |
| // If Job was in Queued state when cancel command runs, then there won't be any email from this job. |
| ProcessContext pc = taskContext.getParentProcessContext(); |
| JobMonitor monitorService = Factory.getMonitorService(pc.getMonitorMode()); |
| monitorService.canceledJob(pc.getJobModel().getJobId()); |
| |
| } catch (TaskException e) { |
| throw new GFacException("Error while cancelling job"); |
| } catch (AiravataException e) { |
| throw new GFacException("Error wile getting monitoring service"); |
| } |
| } |
| |
| private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException { |
| TaskContext taskCtx = new TaskContext(); |
| taskCtx.setParentProcessContext(processContext); |
| |
| TaskModel taskModel = new TaskModel(); |
| taskModel.setParentProcessId(processContext.getProcessId()); |
| taskModel.setCreationTime(new Date().getTime()); |
| taskModel.setLastUpdateTime(taskModel.getCreationTime()); |
| TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskModel.setTaskStatus(taskStatus); |
| taskModel.setTaskType(TaskTypes.JOB_SUBMISSION); |
| taskCtx.setTaskModel(taskModel); |
| return taskCtx; |
| } |
| |
| private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput) |
| throws TException, TaskException { |
| TaskContext taskCtx = new TaskContext(); |
| taskCtx.setParentProcessContext(processContext); |
| // create new task model for this task |
| TaskModel taskModel = new TaskModel(); |
| taskModel.setParentProcessId(processContext.getProcessId()); |
| taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskModel.setLastUpdateTime(taskModel.getCreationTime()); |
| TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); |
| taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); |
| taskModel.setTaskStatus(taskStatus); |
| taskModel.setTaskType(TaskTypes.DATA_STAGING); |
| // create data staging sub task model |
| String remoteOutputDir = processContext.getOutputDir(); |
| remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; |
| DataStagingTaskModel submodel = new DataStagingTaskModel(); |
| ServerInfo serverInfo = processContext.getServerInfo(); |
| URI source = null; |
| try { |
| source = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getHost(), |
| serverInfo.getUserName(), serverInfo.getPort(), remoteOutputDir + processOutput.getValue(), null, null); |
| } catch (URISyntaxException e) { |
| throw new TaskException("Error while constructing source file URI"); |
| } |
| submodel.setSource(source.toString()); |
| // We don't know destination location at this time, data staging task will set this. |
| // because destination is required field we set dummy destination |
| submodel.setDestination("dummy://temp/file/location"); |
| taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); |
| taskCtx.setTaskModel(taskModel); |
| taskCtx.setProcessOutput(processOutput); |
| return taskCtx; |
| } |
| |
| /** |
| * Persist task model |
| */ |
| private void saveTaskModel(TaskContext taskContext) throws GFacException { |
| try { |
| TaskModel taskModel = taskContext.getTaskModel(); |
| taskContext.getParentProcessContext().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel, |
| taskModel.getParentProcessId()); |
| } catch (RegistryException e) { |
| throw new GFacException("Error while saving task model", e); |
| } |
| } |
| |
| private TaskContext getTaskContext(ProcessContext processContext) { |
| TaskContext taskCtx = new TaskContext(); |
| taskCtx.setParentProcessContext(processContext); |
| return taskCtx; |
| } |
| |
| |
| /** |
| * Sort input data type by input order. |
| */ |
| private void sortByInputOrder(List<InputDataObjectType> processInputs) { |
| Collections.sort(processInputs, new Comparator<InputDataObjectType>() { |
| @Override |
| public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) { |
| return inputDT_1.getInputOrder() - inputDT_2.getInputOrder(); |
| } |
| }); |
| } |
| |
| private void updateProcessFailure(ProcessContext pc, String reason){ |
| ProcessStatus status = new ProcessStatus(ProcessState.FAILED); |
| status.setReason(reason); |
| pc.setProcessStatus(status); |
| try { |
| GFacUtils.saveAndPublishProcessStatus(pc); |
| } catch (GFacException e) { |
| log.error("Error while save and publishing process failed status event"); |
| } |
| } |
| |
| public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException, GFacException { |
| List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource() |
| .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces(); |
| |
| ResourceJobManager resourceJobManager = null; |
| JobSubmissionInterface jsInterface = null; |
| for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) { |
| if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) { |
| jsInterface = jobSubmissionInterface; |
| break; |
| } |
| } |
| if (jsInterface == null) { |
| throw new GFacException("Job Submission interface cannot be empty at this point"); |
| } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { |
| SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission |
| (jsInterface.getJobSubmissionInterfaceId()); |
| processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process |
| // context method. |
| resourceJobManager = sshJobSubmission.getResourceJobManager(); |
| } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) { |
| LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission |
| (jsInterface.getJobSubmissionInterfaceId()); |
| resourceJobManager = localSubmission.getResourceJobManager(); |
| } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) { |
| SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission |
| (jsInterface.getJobSubmissionInterfaceId()); |
| processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process |
| resourceJobManager = sshJobSubmission.getResourceJobManager(); |
| } else { |
| throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol() |
| .name()); |
| } |
| |
| if (resourceJobManager == null) { |
| throw new GFacException("Resource Job Manager is empty."); |
| } |
| return resourceJobManager; |
| } |
| } |