blob: 77f4d72a9caa2ca3201f822ee383cc12f0cc2c54 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.submission;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobStatus;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.security.SecureRandom;
import java.util.*;
public abstract class JobSubmissionTask extends AiravataTask {
private final static Logger logger = LoggerFactory.getLogger(JobSubmissionTask.class);
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
}
@SuppressWarnings("WeakerAccess")
protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
addMonitoringCommands(groovyMapData);
String scriptAsString = groovyMapData.loadFromFile(jobManagerConfiguration.getJobDescriptionTemplateName());
logger.info("Generated job submission script : " + scriptAsString);
int number = new SecureRandom().nextInt();
number = (number < 0 ? -number : number);
File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
FileUtils.writeStringToFile(tempJobFile, scriptAsString);
logger.info("Job submission file for process " + getProcessId() + " was created at : " + tempJobFile.getAbsolutePath());
logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory +
" of compute resource " + getTaskContext().getComputeResourceId());
agentAdaptor.uploadFile(tempJobFile.getAbsolutePath(), workingDirectory);
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
logger.info("Submit command for process id " + getProcessId() + " : " + submitCommand.getRawCommand());
logger.debug("Working directory for process id " + getProcessId() + " : " + workingDirectory);
CommandOutput commandOutput = submitCommandWithRecording(submitCommand, agentAdaptor, groovyMapData, workingDirectory);
logger.info("Job " + groovyMapData.getJobName() + " submitted to compute resource");
logger.info("Submission stdout: " + commandOutput.getStdOut() + ", stderr: " + commandOutput.getStdError());
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
jsoutput.setDescription(scriptAsString);
jsoutput.setJobId(jobManagerConfiguration.getParser().parseJobSubmission(commandOutput.getStdOut()));
if (jsoutput.getJobId() == null) {
if (jobManagerConfiguration.getParser().isJobSubmissionFailed(commandOutput.getStdOut())) {
jsoutput.setJobSubmissionFailed(true);
jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
"\n stderr : " + commandOutput.getStdError());
}
}
jsoutput.setExitCode(commandOutput.getExitCode());
if (jsoutput.getExitCode() != 0) {
jsoutput.setJobSubmissionFailed(true);
jsoutput.setFailureReason("stdout : " + commandOutput.getStdOut() +
"\n stderr : " + commandOutput.getStdError());
}
jsoutput.setStdOut(commandOutput.getStdOut());
jsoutput.setStdErr(commandOutput.getStdError());
return jsoutput;
}
/**
* This will write the standard output of the command to a file inside the working directory of the process and
* if the agent does not receive the output through first invocation, it retries by looking into the output file.
*
* @param submitCommand command to submit
* @param agentAdaptor agent adaptor to communicate with compute resource
* @param groovyMapData metadata object of the job
* @param workingDirectory working directory for the process
* @return {@link CommandOutput} of the submitted command
* @throws AgentException if agent failed to communicate with the compute host
*/
private CommandOutput submitCommandWithRecording(RawCommandInfo submitCommand, AgentAdaptor agentAdaptor,
GroovyMapData groovyMapData, String workingDirectory) throws AgentException {
String modifiedCommand = submitCommand.getCommand() + " | tee " + getJobCommandRecordingFile(groovyMapData);
logger.info("Modified the submit command to support recording : " + modifiedCommand);
CommandOutput commandOutput = agentAdaptor.executeCommand(modifiedCommand, workingDirectory);
if (commandOutput.getStdOut() == null || "".equals(commandOutput.getStdOut())) {
logger.warn("command submission returned empty response so reading recording file at " + getJobCommandRecordingFile(groovyMapData));
CommandOutput recordingFileReadCommandOutput = agentAdaptor.executeCommand("cat " + getJobCommandRecordingFile(groovyMapData),
groovyMapData.getWorkingDirectory());
if (recordingFileReadCommandOutput.getStdOut() != null && !"".equals(recordingFileReadCommandOutput.getStdOut())) {
logger.info("Received non empty output form recording file : " + recordingFileReadCommandOutput.getStdOut());
return recordingFileReadCommandOutput;
} else {
return commandOutput;
}
} else {
return commandOutput;
}
}
private String getJobCommandRecordingFile(GroovyMapData mapData) {
return (mapData.getWorkingDirectory().endsWith(File.separator) ?
mapData.getWorkingDirectory() : mapData.getWorkingDirectory() + File.separator) +
mapData.getJobName();
}
@SuppressWarnings("WeakerAccess")
public File getLocalDataDir() {
String outputPath = ServerSettings.getLocalDataLocation();
outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
return new File(outputPath + getProcessId());
}
@SuppressWarnings("WeakerAccess")
public boolean cancelJob(AgentAdaptor agentAdaptor, String jobId) throws Exception {
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getCancelCommand(jobId).getRawCommand(), null);
return commandOutput.getExitCode() == 0;
}
@SuppressWarnings("WeakerAccess")
public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobId) throws Exception {
ResourceJobManager resourceJobManager = JobFactory.getResourceJobManager(
getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface());
if (resourceJobManager == null) {
throw new Exception("Resource job manager can not be null for protocol " + getTaskContext().getJobSubmissionProtocol() + " and job id " + jobId);
}
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(resourceJobManager);
CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobId).getRawCommand(), null);
return jobManagerConfiguration.getParser().parseJobStatus(jobId, commandOutput.getStdOut());
}
@SuppressWarnings("WeakerAccess")
public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception {
ResourceJobManager resourceJobManager = JobFactory.getResourceJobManager(
getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface());
if (resourceJobManager == null) {
throw new Exception("Resource job manager can not be null for protocol " + getTaskContext().getJobSubmissionProtocol()
+ " and job name " + jobName + " and user " + userName);
}
JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(resourceJobManager);
RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null);
return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut());
}
@SuppressWarnings("WeakerAccess")
public void saveJobModel(JobModel jobModel) throws TException {
getRegistryServiceClient().addJob(jobModel, getProcessId());
}
@SuppressWarnings("WeakerAccess")
public void saveAndPublishJobStatus(JobModel jobModel) throws Exception {
try {
// first we save job jobModel to the registry for sa and then save the job status.
JobStatus jobStatus;
if (jobModel.getJobStatuses() != null && jobModel.getJobStatuses().size() > 0) {
jobStatus = jobModel.getJobStatuses().get(0);
} else {
logger.error("Job statuses can not be empty");
return;
}
List<JobStatus> statuses = new ArrayList<>();
statuses.add(jobStatus);
jobModel.setJobStatuses(statuses);
if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) {
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
} else {
jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
}
getRegistryServiceClient().addJobStatus(jobStatus, jobModel.getTaskId(), jobModel.getJobId());
/*JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
getProcessId(), getProcessModel().getExperimentId(), getGatewayId());
JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
getStatusPublisher().publish(msgCtx);*/
} catch (Exception e) {
throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
}
}
private void addMonitoringCommands(GroovyMapData mapData) throws ApplicationSettingsException {
if (Boolean.parseBoolean(ServerSettings.getSetting("enable.realtime.monitor"))) {
if (mapData.getPreJobCommands() == null) {
mapData.setPreJobCommands(new ArrayList<>());
}
mapData.getPreJobCommands().add(0, "curl -X POST -H \"Content-Type: application/vnd.kafka.json.v2+json\" " +
"-H \"Accept: application/vnd.kafka.v2+json\" " +
"--data '{\"records\":[{\"value\":{\"jobName\":\"" + mapData.getJobName() + "\", \"status\":\"RUNNING\", \"task\":\"" + mapData.getTaskId() + "\"}}]}' \"" +
ServerSettings.getSetting("job.status.publish.endpoint") + "\" > /dev/null || true");
if (mapData.getPostJobCommands() == null) {
mapData.setPostJobCommands(new ArrayList<>());
}
mapData.getPostJobCommands().add("curl -X POST -H \"Content-Type: application/vnd.kafka.json.v2+json\" " +
"-H \"Accept: application/vnd.kafka.v2+json\" " +
"--data '{\"records\":[{\"value\":{\"jobName\":\"" + mapData.getJobName() + "\", \"status\":\"COMPLETED\", \"task\":\"" + mapData.getTaskId() + "\"}}]}' \"" +
ServerSettings.getSetting("job.status.publish.endpoint") + "\" > /dev/null || true");
}
}
}