blob: 4a76e1d07640ec5caa0fce5ac0739ff8446c7796 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.submission;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.*;
import org.apache.airavata.model.workspace.GatewayUsageReportingCommand;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;
@TaskDef(name = "Default Job Submission")
public class DefaultJobSubmissionTask extends JobSubmissionTask {
private final static Logger logger = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
private final static CountMonitor defaultJSTaskCounter = new CountMonitor("default_js_task_counter");
private static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
defaultJSTaskCounter.inc();
String jobId = null;
AgentAdaptor adaptor;
try {
adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getComputeResourceCredentialToken(),
getTaskContext().getComputeResourceLoginUserName());
} catch (Exception e) {
return onFail("Failed to fetch adaptor to connect to " + getTaskContext().getComputeResourceId(), true, e);
}
try {
List<JobModel> jobsOfTask = getTaskContext().getRegistryClient().getJobs("taskId", getTaskId());
if (jobsOfTask.size() > 0) {
logger.warn("A job is already available for task " + getTaskId());
return onSuccess("A job is already available for task " + getTaskId());
}
saveAndPublishProcessStatus(ProcessState.EXECUTING);
GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
JobModel jobModel = new JobModel();
jobModel.setProcessId(getProcessId());
jobModel.setWorkingDir(mapData.getWorkingDirectory());
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setTaskId(getTaskId());
jobModel.setJobName(mapData.getJobName());
jobModel.setJobDescription("Sample description");
JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
jobModel.setJobDescription(submissionOutput.getDescription());
jobModel.setExitCode(submissionOutput.getExitCode());
jobModel.setStdErr(submissionOutput.getStdErr());
jobModel.setStdOut(submissionOutput.getStdOut());
jobId = submissionOutput.getJobId();
if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) {
jobModel.setJobId(DEFAULT_JOB_ID);
if (submissionOutput.isJobSubmissionFailed()) {
List<JobStatus> statusList = new ArrayList<>();
statusList.add(new JobStatus(JobState.FAILED));
statusList.get(0).setReason(submissionOutput.getFailureReason());
jobModel.setJobStatuses(statusList);
saveJobModel(jobModel);
logger.error("Job submission failed for job name " + jobModel.getJobName()
+ ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : "
+ submissionOutput.isJobSubmissionFailed());
logger.error("Standard error message : " + submissionOutput.getStdErr());
logger.error("Standard out message : " + submissionOutput.getStdOut());
return onFail("Job submission command didn't return a jobId. Reason " + submissionOutput.getFailureReason(),
false, null);
} else {
String msg;
saveJobModel(jobModel);
ErrorModel errorModel = new ErrorModel();
if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
msg = "Returned non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
", with failure reason : " + submissionOutput.getFailureReason()
+ " Hence changing job state to Failed." ;
errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
} else {
msg = "Didn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
", with failure reason : stdout ->" + submissionOutput.getStdOut() +
" stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ;
errorModel.setActualErrorMessage(msg);
}
logger.error(msg);
return onFail(msg, false, null);
}
} else if (jobId != null && !jobId.isEmpty()) {
logger.info("Received job id " + jobId + " from compute resource");
jobModel.setJobId(jobId);
saveJobModel(jobModel);
JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(JobState.SUBMITTED);
jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName());
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Collections.singletonList(jobStatus));
saveAndPublishJobStatus(jobModel);
if (verifyJobSubmissionByJobId(adaptor, jobId)) {
jobStatus.setJobState(JobState.QUEUED);
jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Collections.singletonList(jobStatus));
saveAndPublishJobStatus(jobModel);
}
} else {
int verificationTryCount = 0;
while (verificationTryCount++ < 3) {
String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName());
if (verifyJobId != null && !verifyJobId.isEmpty()) {
// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
jobId = verifyJobId;
jobModel.setJobId(jobId);
saveJobModel(jobModel);
JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(JobState.QUEUED);
jobStatus.setReason("Verification step succeeded");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Collections.singletonList(jobStatus));
saveAndPublishJobStatus(jobModel);
logger.info("Job id " + verifyJobId + " verification succeeded");
break;
}
logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs");
Thread.sleep(verificationTryCount * 10000);
}
}
if (jobId == null || jobId.isEmpty()) {
jobModel.setJobId(DEFAULT_JOB_ID);
saveJobModel(jobModel);
String msg = "expId:" + getExperimentId() + " Couldn't find " +
"remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
"doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
logger.error(msg);
return onFail("Couldn't find job id in both submitted and verified steps. " + msg, false, null);
} else {
// usage reporting as the last step of job submission task
try {
mapData.setJobId(jobId);
boolean reportingAvailable = getRegistryServiceClient()
.isGatewayUsageReportingAvailable(getGatewayId(), taskContext.getComputeResourceId());
if (reportingAvailable) {
GatewayUsageReportingCommand reportingCommand = getRegistryServiceClient()
.getGatewayReportingCommand(getGatewayId(), taskContext.getComputeResourceId());
String parsedCommand = mapData.loadFromString(reportingCommand.getCommand());
logger.debug("Parsed usage reporting command {}", parsedCommand);
Process commandSubmit = Runtime.getRuntime().exec(parsedCommand);
BufferedReader reader = new BufferedReader(new InputStreamReader(commandSubmit.getInputStream()));
StringBuffer output = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
output.append(line);
output.append("\n");
}
logger.info("Usage reporting output " + output.toString());
commandSubmit.waitFor();
logger.info("Usage reporting completed");
} else {
logger.info("No usage reporting found for gateway {} and compute resource id {}",
getGatewayId(), taskContext.getComputeResourceId());
}
} catch (Exception e) {
logger.error("Usage reporting failed but continuing. ", e);
}
return onSuccess("Submitted job to compute resource");
}
} catch (Exception e) {
logger.error("Task failed due to unexpected issue. Trying to control damage", e);
if (jobId != null && !jobId.isEmpty()) {
logger.warn("Job " + jobId + " has already being submitted. Trying to cancel the job");
try {
boolean cancelled = cancelJob(adaptor, jobId);
if (cancelled) {
logger.info("Job " + jobId + " cancellation triggered");
} else {
logger.error("Failed to cancel job " + jobId + ". Please contact system admins");
}
} catch (Exception e1) {
logger.error("Error while cancelling the job " + jobId + ". Please contact system admins");
// ignore as we have nothing to do at this point
}
}
return onFail("Task failed due to unexpected issue", false, e);
}
}
private boolean verifyJobSubmissionByJobId(AgentAdaptor agentAdaptor, String jobID) {
JobStatus status = null;
try {
status = getJobStatus(agentAdaptor, jobID);
} catch (Exception e) {
logger.warn("Error while fetching the job status for id " + jobID);
}
return status != null && status.getJobState() != JobState.UNKNOWN;
}
private String verifyJobSubmission(AgentAdaptor agentAdaptor, String jobName, String userName) {
String jobId = null;
try {
jobId = getJobIdByJobName(agentAdaptor, jobName, userName);
} catch (Exception e) {
logger.warn("Error while verifying JobId from JobName " + jobName);
}
return jobId;
}
@Override
public void onCancel(TaskContext taskContext) {
}
}