blob: 937c0e6c2e47174c72a7e6dbb7da38e98d113fb5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.submission.config;
import groovy.text.GStringTemplateEngine;
import groovy.text.TemplateEngine;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.parallelism.ApplicationParallelismType;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.task.JobSubmissionTaskModel;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class GroovyMapBuilder {
private final static Logger logger = LoggerFactory.getLogger(GroovyMapBuilder.class);
public static final String MULTIPLE_INPUTS_SPLITTER = ",";
private TaskContext taskContext;
public GroovyMapBuilder(TaskContext taskContext) {
this.taskContext = taskContext;
}
public GroovyMapData build() throws Exception {
GroovyMapData mapData = new GroovyMapData();
setMailAddresses(taskContext, mapData);
mapData.setInputDir(taskContext.getInputDir());
mapData.setOutputDir(taskContext.getOutputDir());
mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath());
mapData.setStdoutFile(taskContext.getStdoutLocation());
mapData.setStderrFile(taskContext.getStderrLocation());
mapData.setScratchLocation(taskContext.getScratchLocation());
mapData.setGatewayId(taskContext.getGatewayId());
mapData.setGatewayUserName(taskContext.getProcessModel().getUserName());
mapData.setApplicationName(taskContext.getApplicationInterfaceDescription().getApplicationName());
mapData.setQueueSpecificMacros(taskContext.getQueueSpecificMacros());
mapData.setAccountString(taskContext.getAllocationProjectNumber());
mapData.setReservation(taskContext.getReservation());
mapData.setJobName("A" + String.valueOf(generateJobName()));
mapData.setWorkingDirectory(taskContext.getWorkingDir());
mapData.setTaskId(taskContext.getTaskId());
mapData.setExperimentDataDir(taskContext.getProcessModel().getExperimentDataDir());
SimpleDateFormat gmtDateFormat = new SimpleDateFormat("yyyy-MM-dd+HH:mmZ");
gmtDateFormat.setTimeZone(TimeZone.getTimeZone("EST"));
mapData.setCurrentTime(gmtDateFormat.format(new Date()));
//List<String> emails = taskContext.getUserProfile().getEmails();
//if (emails != null && emails.size() > 0) {
// mapData.setGatewayUserEmail(emails.get(0));
//}
List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true);
inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true));
mapData.setInputs(inputValues);
List<String> inputValuesAll = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false);
inputValuesAll.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false));
mapData.setInputsAll(inputValuesAll);
mapData.setUserName(taskContext.getComputeResourceLoginUserName());
mapData.setShellName("/bin/bash");
String hostName = taskContext.getComputeResourceDescription().getHostName();
List<String> hostAliases = taskContext.getComputeResourceDescription().getHostAliases();
if (hostAliases != null && hostAliases.size() > 0) {
hostName = hostAliases.get(0);
}
mapData.setComputeHostName(hostName);
if (taskContext != null) {
try {
JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
if (jobSubmissionTaskModel.getWallTime() > 0) {
mapData.setMaxWallTime(maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime()));
// TODO fix this
/*if (resourceJobManager != null) {
if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
groovyMap.add(Script.MAX_WALL_TIME,
GFacUtils.maxWallTimeCalculatorForLSF(jobSubmissionTaskModel.getWallTime()));
}
}*/
}
} catch (TException e) {
logger.error("Error while getting job submission sub task model", e);
}
}
// NOTE: Give precedence to data comes with experiment
// qos per queue
String qoS = getQoS(taskContext.getQualityOfService(), taskContext.getQueueName());
if (qoS != null) {
mapData.setQualityOfService(qoS);
}
ComputationalResourceSchedulingModel scheduling = taskContext.getProcessModel().getProcessResourceSchedule();
if (scheduling != null) {
int totalNodeCount = scheduling.getNodeCount();
int totalCPUCount = scheduling.getTotalCPUCount();
if (isValid(scheduling.getQueueName())) {
mapData.setQueueName(scheduling.getQueueName());
}
if (totalNodeCount > 0) {
mapData.setNodes(totalNodeCount);
}
if (totalCPUCount > 0) {
int ppn = totalCPUCount / totalNodeCount;
mapData.setProcessPerNode(ppn);
mapData.setCpuCount(totalCPUCount);
}
// max wall time may be set before this level if jobsubmission task has wall time configured to this job,
// if so we ignore scheduling configuration.
if (scheduling.getWallTimeLimit() > 0 && mapData.getMaxWallTime() == null) {
mapData.setMaxWallTime(maxWallTimeCalculator(scheduling.getWallTimeLimit()));
// TODO fix this
/*
if (resourceJobManager != null) {
if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
mapData.setMaxWallTime(maxWallTimeCalculatorForLSF(scheduling.getWallTimeLimit()));
}
}
*/
}
if (scheduling.getTotalPhysicalMemory() > 0) {
mapData.setUsedMem(scheduling.getTotalPhysicalMemory());
}
if (isValid(scheduling.getOverrideLoginUserName())) {
mapData.setUserName(scheduling.getOverrideLoginUserName());
}
if (isValid(scheduling.getOverrideAllocationProjectNumber())) {
mapData.setAccountString(scheduling.getOverrideAllocationProjectNumber());
}
if (isValid(scheduling.getStaticWorkingDir())) {
mapData.setWorkingDirectory(scheduling.getStaticWorkingDir());
}
} else {
logger.error("Task scheduling cannot be null at this point..");
}
ApplicationDeploymentDescription appDepDescription = taskContext.getApplicationDeploymentDescription();
List<SetEnvPaths> exportCommands = appDepDescription.getSetEnvironment();
if (exportCommands != null) {
List<String> exportCommandList = exportCommands.stream()
.sorted((e1, e2) -> e1.getEnvPathOrder() - e2.getEnvPathOrder())
.map(map -> map.getName() + "=" + map.getValue())
.collect(Collectors.toList());
mapData.setExports(exportCommandList);
}
List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds();
if (moduleCmds != null) {
List<String> modulesCmdCollect = moduleCmds.stream()
.sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
.map(map -> map.getCommand())
.collect(Collectors.toList());
mapData.setModuleCommands(modulesCmdCollect);
}
List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands();
if (preJobCommands != null) {
List<String> preJobCmdCollect = preJobCommands.stream()
.sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
.map(map -> parseCommands(map.getCommand(), mapData))
.collect(Collectors.toList());
mapData.setPreJobCommands(preJobCmdCollect);
}
List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands();
if (postJobCommands != null) {
List<String> postJobCmdCollect = postJobCommands.stream()
.sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
.map(map -> parseCommands(map.getCommand(), mapData))
.collect(Collectors.toList());
mapData.setPostJobCommands(postJobCmdCollect);
}
ApplicationParallelismType parallelism = appDepDescription.getParallelism();
if (parallelism != null) {
if (parallelism != ApplicationParallelismType.SERIAL) {
Map<ApplicationParallelismType, String> parallelismPrefix = taskContext.getResourceJobManager().getParallelismPrefix();
if (parallelismPrefix != null){
String parallelismCommand = parallelismPrefix.get(parallelism);
if (parallelismCommand != null){
mapData.setJobSubmitterCommand(parallelismCommand);
}else {
throw new Exception("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog");
}
}
}
}
return mapData;
}
public static int generateJobName() {
Random random = new Random();
int i = random.nextInt(Integer.MAX_VALUE);
i = i + 99999999;
if (i < 0) {
i = i * (-1);
}
return i;
}
private static List<String> getProcessInputValues(List<InputDataObjectType> processInputs, boolean commandLineOnly) {
List<String> inputValues = new ArrayList<String>();
if (processInputs != null) {
// sort the inputs first and then build the command ListR
Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
@Override
public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
return inputDataObjectType.getInputOrder() - t1.getInputOrder();
}
};
Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
for (InputDataObjectType input : processInputs) {
sortedInputSet.add(input);
}
for (InputDataObjectType inputDataObjectType : sortedInputSet) {
if (commandLineOnly && !inputDataObjectType.isRequiredToAddedToCommandLine()) {
continue;
}
if (!inputDataObjectType.isIsRequired() &&
(inputDataObjectType.getValue() == null || "".equals(inputDataObjectType.getValue()))) {
// For URI/ Collection non required inputs, if the value is empty, ignore it. Fix for airavata-3276
continue;
}
if (inputDataObjectType.getApplicationArgument() != null
&& !inputDataObjectType.getApplicationArgument().equals("")) {
inputValues.add(inputDataObjectType.getApplicationArgument());
}
if (inputDataObjectType.getValue() != null
&& !inputDataObjectType.getValue().equals("")) {
if (inputDataObjectType.getType() == DataType.URI) {
if (inputDataObjectType.getOverrideFilename() != null) {
inputValues.add(inputDataObjectType.getOverrideFilename());
} else {
// set only the relative path
String filePath = inputDataObjectType.getValue();
filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
inputValues.add(filePath);
}
} else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) {
String filePaths = inputDataObjectType.getValue();
String[] paths = filePaths.split(MULTIPLE_INPUTS_SPLITTER);
for (int i = 0; i < paths.length; i++) {
paths[i] = paths[i].substring(paths[i].lastIndexOf(File.separatorChar) + 1);
}
inputValues.add(String.join(" ", paths));
} else {
inputValues.add(inputDataObjectType.getValue());
}
}
}
}
return inputValues;
}
private static List<String> getProcessOutputValues(List<OutputDataObjectType> processOutputs, boolean commandLineOnly) {
List<String> inputValues = new ArrayList<>();
if (processOutputs != null) {
for (OutputDataObjectType output : processOutputs) {
if (output.getApplicationArgument() != null
&& !output.getApplicationArgument().equals("")) {
inputValues.add(output.getApplicationArgument());
}
if(commandLineOnly){
if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
if (output.getType() == DataType.URI) {
String filePath = output.getValue();
filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
inputValues.add(filePath);
}
}
}else{
if (output.getValue() != null && !output.getValue().equals("")) {
if (output.getType() == DataType.URI) {
String filePath = output.getValue();
filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
inputValues.add(filePath);
}
}
}
}
}
return inputValues;
}
static String getQoS(String qualityOfService, String preferredBatchQueue) {
if(preferredBatchQueue == null || preferredBatchQueue.isEmpty()
|| qualityOfService == null || qualityOfService.isEmpty()) return null;
final String qos = "qos";
Pattern pattern = Pattern.compile(preferredBatchQueue + "=(?<" + qos + ">[^,]*)");
Matcher matcher = pattern.matcher(qualityOfService);
if (matcher.find()) {
return matcher.group(qos);
}
return null;
}
public static String maxWallTimeCalculator(int maxWalltime) {
if (maxWalltime < 60) {
return "00:" + maxWalltime + ":00";
} else {
int minutes = maxWalltime % 60;
int hours = maxWalltime / 60;
return hours + ":" + minutes + ":00";
}
}
public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
if (maxWalltime < 60) {
return "00:" + maxWalltime;
} else {
int minutes = maxWalltime % 60;
int hours = maxWalltime / 60;
return hours + ":" + minutes;
}
}
private static boolean isValid(String str) {
return str != null && !str.isEmpty();
}
static String parseCommands(String value, GroovyMapData bindMap) {
TemplateEngine templateEngine = new GStringTemplateEngine();
try {
return templateEngine.createTemplate(value).make(bindMap.toImmutableMap()).toString();
} catch (ClassNotFoundException | IOException e) {
throw new IllegalArgumentException("Error while parsing command " + value
+ " , Invalid command or incomplete bind map");
}
}
private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws
ApplicationSettingsException, TException, TaskOnFailException {
ProcessModel processModel = taskContext.getProcessModel();
String emailIds = null;
if (isEmailBasedJobMonitor(taskContext)) {
emailIds = ServerSettings.getEmailBasedMonitorAddress();
}
if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
if (emailIds != null && !emailIds.isEmpty()) {
emailIds += ("," + userJobNotifEmailIds);
} else {
emailIds = userJobNotifEmailIds;
}
}
if (processModel.isEnableEmailNotification()) {
List<String> emailList = processModel.getEmailAddresses();
if (emailList == null) {
throw new TaskOnFailException("At least one email should be provided as the email notification is turned on", false, null);
}
String elist = listToCsv(emailList, ',');
if (elist != null && !elist.isEmpty()) {
if (emailIds != null && !emailIds.isEmpty()) {
emailIds = emailIds + "," + elist;
} else {
emailIds = elist;
}
}
}
}
if (emailIds != null && !emailIds.isEmpty()) {
logger.info("Email list: " + emailIds);
groovyMap.setMailAddress(emailIds);
}
}
public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws TException, TaskOnFailException {
JobSubmissionProtocol jobSubmissionProtocol = taskContext.getPreferredJobSubmissionProtocol();
JobSubmissionInterface jobSubmissionInterface = taskContext.getPreferredJobSubmissionInterface();
if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
SSHJobSubmission sshJobSubmission = taskContext.getRegistryClient().getSSHJobSubmission(jobSubmissionInterfaceId);
MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
} else {
return false;
}
}
public static String listToCsv(List<String> listOfStrings, char separator) {
StringBuilder sb = new StringBuilder();
// all but last
for (int i = 0; i < listOfStrings.size() - 1; i++) {
sb.append(listOfStrings.get(i));
sb.append(separator);
}
// last string, no separator
if (listOfStrings.size() > 0) {
sb.append(listOfStrings.get(listOfStrings.size() - 1));
}
return sb.toString();
}
}