blob: dfd951f21d8a4c8b23817db15ea01220ca424317 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.client.cli.runjob;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.AbstractCli;
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
import org.apache.hadoop.yarn.submarine.client.cli.Command;
import org.apache.hadoop.yarn.submarine.client.cli.param.ParametersHolder;
import org.apache.hadoop.yarn.submarine.client.cli.param.runjob.RunJobParameters;
import org.apache.hadoop.yarn.submarine.client.cli.param.runjob.RunJobParameters.UnderscoreConverterPropertyUtils;
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlConfigFile;
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlParseException;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* This purpose of this class is to handle / parse CLI arguments related to
* the run job Submarine command.
*/
public class RunJobCli extends AbstractCli {
private static final Logger LOG =
LoggerFactory.getLogger(RunJobCli.class);
private static final String TENSORFLOW = "TensorFlow";
private static final String PYTORCH = "PyTorch";
private static final String PS = "PS";
private static final String WORKER = "worker";
private static final String TENSORBOARD = "TensorBoard";
private static final String CAN_BE_USED_WITH_TF_PYTORCH =
String.format("Can be used with %s or %s frameworks.",
TENSORFLOW, PYTORCH);
private static final String TENSORFLOW_ONLY =
String.format("Can only be used with %s framework.", TENSORFLOW);
public static final String YAML_PARSE_FAILED = "Failed to parse " +
"YAML config";
private static final String LOCAL_OR_ANY_FS_DIRECTORY = "Could be a local " +
"directory or any other directory on the file system.";
private Options options;
private JobSubmitter jobSubmitter;
private JobMonitor jobMonitor;
private ParametersHolder parametersHolder;
public RunJobCli(ClientContext cliContext) {
this(cliContext, cliContext.getRuntimeFactory().getJobSubmitterInstance(),
cliContext.getRuntimeFactory().getJobMonitorInstance());
}
@VisibleForTesting
public RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
JobMonitor jobMonitor) {
super(cliContext);
this.options = generateOptions();
this.jobSubmitter = jobSubmitter;
this.jobMonitor = jobMonitor;
}
public void printUsages() {
new HelpFormatter().printHelp("job run", options);
}
private Options generateOptions() {
Options options = new Options();
options.addOption(CliConstants.YAML_CONFIG, true,
"Config file (in YAML format)");
options.addOption(CliConstants.FRAMEWORK, true,
String.format("Framework to use. Valid values are: %s! " +
"The default framework is Tensorflow.",
Framework.getValues()));
options.addOption(CliConstants.NAME, true, "Name of the job");
options.addOption(CliConstants.INPUT_PATH, true,
"Input of the job. " + LOCAL_OR_ANY_FS_DIRECTORY);
options.addOption(CliConstants.CHECKPOINT_PATH, true,
"Training output directory of the job. " + LOCAL_OR_ANY_FS_DIRECTORY +
"This typically includes checkpoint files and exported model");
options.addOption(CliConstants.SAVED_MODEL_PATH, true,
"Model exported path (saved model) of the job, which is needed when " +
"exported model is not placed under ${checkpoint_path}. " +
LOCAL_OR_ANY_FS_DIRECTORY + "This will be used to serve");
options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
getDockerImageMessage(PS));
options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
getDockerImageMessage(WORKER));
options.addOption(CliConstants.QUEUE, true,
"Name of queue to run the job. By default, the default queue is used");
addWorkerOptions(options);
addPSOptions(options);
addTensorboardOptions(options);
options.addOption(CliConstants.ENV, true,
"Common environment variable passed to worker / PS");
options.addOption(CliConstants.VERBOSE, false,
"Print verbose log for troubleshooting");
options.addOption(CliConstants.WAIT_JOB_FINISH, false,
"Specified when user wants to wait for jobs to finish");
options.addOption(CliConstants.QUICKLINK, true, "Specify quicklink so YARN "
+ "web UI shows link to the given role instance and port. " +
"When --tensorboard is specified, quicklink to the " +
TENSORBOARD + " instance will be added automatically. " +
"The format of quick link is: "
+ "Quick_link_label=http(or https)://role-name:port. " +
"For example, if users want to link to the first worker's 7070 port, " +
"and text of quicklink is Notebook_UI, " +
"users need to specify --quicklink Notebook_UI=https://master-0:7070");
options.addOption(CliConstants.LOCALIZATION, true, "Specify"
+ " localization to make remote/local file/directory available to"
+ " all container(Docker)."
+ " Argument format is: \"RemoteUri:LocalFilePath[:rw] \" "
+ "(ro permission is not supported yet)."
+ " The RemoteUri can be a local file or directory on the filesystem."
+ " Alternatively, the following remote file systems / "
+ "transmit mechanisms can be used: "
+ " HDFS, S3 or abfs, HTTP, etc."
+ " The LocalFilePath can be absolute or relative."
+ " If it is a relative path, it will be"
+ " under container's implied working directory"
+ " but sub-directory is not supported yet."
+ " This option can be set multiple times."
+ " Examples are \n"
+ "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n"
+ "-localization \"s3a:///a/b/myfile1:./\"\n"
+ "-localization \"https:///a/b/myfile2:./myfile\"\n"
+ "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n"
+ "-localization \"./mydir1:.\"\n");
options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
"job under a secured environment");
options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
"by the job under a secured environment");
options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " +
"local keytab to cluster machines for service authentication. " +
"If not specified, pre-distributed keytab of which path specified by" +
" parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
"used");
options.addOption("h", "help", false, "Print help");
options.addOption("insecure", false, "Cluster is not Kerberos enabled.");
options.addOption("conf", true,
"User specified configuration, as key=val pairs.");
return options;
}
private void addWorkerOptions(Options options) {
options.addOption(CliConstants.N_WORKERS, true,
getNumberOfServiceMessage(WORKER, 1) +
CAN_BE_USED_WITH_TF_PYTORCH);
options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
getDockerImageMessage(WORKER) +
CAN_BE_USED_WITH_TF_PYTORCH);
options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
getLaunchCommandMessage(WORKER) +
CAN_BE_USED_WITH_TF_PYTORCH);
options.addOption(CliConstants.WORKER_RES, true,
getServiceResourceMessage(WORKER) +
CAN_BE_USED_WITH_TF_PYTORCH);
}
private void addPSOptions(Options options) {
options.addOption(CliConstants.N_PS, true,
getNumberOfServiceMessage("PS", 0) +
TENSORFLOW_ONLY);
options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
getDockerImageMessage(PS) +
TENSORFLOW_ONLY);
options.addOption(CliConstants.PS_LAUNCH_CMD, true,
getLaunchCommandMessage("PS") +
TENSORFLOW_ONLY);
options.addOption(CliConstants.PS_RES, true,
getServiceResourceMessage("PS") +
TENSORFLOW_ONLY);
}
private void addTensorboardOptions(Options options) {
options.addOption(CliConstants.TENSORBOARD, false,
"Should we run TensorBoard for this job? " +
"By default, TensorBoard is disabled." +
TENSORFLOW_ONLY);
options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
"Specifies resources of Tensorboard. The default resource is: "
+ CliConstants.TENSORBOARD_DEFAULT_RESOURCES + "." +
TENSORFLOW_ONLY);
options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
getDockerImageMessage(TENSORBOARD));
}
private String getLaunchCommandMessage(String service) {
return String.format("Launch command of the %s, arguments will be "
+ "directly used to launch the %s", service, service);
}
private String getServiceResourceMessage(String serviceType) {
return String.format("Resource of each %s process, for example: "
+ "memory-mb=2048,vcores=2,yarn.io/gpu=2", serviceType);
}
private String getNumberOfServiceMessage(String serviceType,
int defaultValue) {
return String.format("Number of %s processes for the job. " +
"The default value is %d.", serviceType, defaultValue);
}
private String getDockerImageMessage(String serviceType) {
return String.format("Specifies docker image for the %s process. " +
"When not specified, %s uses --%s as a default value.",
serviceType, serviceType, CliConstants.DOCKER_IMAGE);
}
private void parseCommandLineAndGetRunJobParameters(String[] args)
throws ParseException, IOException, YarnException {
try {
GnuParser parser = new GnuParser();
CommandLine cli = parser.parse(options, args);
parametersHolder = createParametersHolder(cli);
parametersHolder.updateParameters(clientContext);
} catch (ParseException e) {
LOG.error("Exception in parse: {}", e.getMessage());
printUsages();
throw e;
}
}
private ParametersHolder createParametersHolder(CommandLine cli)
throws ParseException, YarnException {
String yamlConfigFile =
cli.getOptionValue(CliConstants.YAML_CONFIG);
if (yamlConfigFile != null) {
YamlConfigFile yamlConfig = readYamlConfigFile(yamlConfigFile);
checkYamlConfig(yamlConfigFile, yamlConfig);
LOG.info("Using YAML configuration!");
return ParametersHolder.createWithCmdLineAndYaml(cli, yamlConfig,
Command.RUN_JOB);
} else {
LOG.info("Using CLI configuration!");
return ParametersHolder.createWithCmdLine(cli, Command.RUN_JOB);
}
}
private void checkYamlConfig(String yamlConfigFile,
YamlConfigFile yamlConfig) {
if (yamlConfig == null) {
throw new YamlParseException(String.format(
YAML_PARSE_FAILED + ", file is empty: %s", yamlConfigFile));
} else if (yamlConfig.getConfigs() == null) {
throw new YamlParseException(String.format(YAML_PARSE_FAILED +
", config section should be defined, but it cannot be found in " +
"YAML file '%s'!", yamlConfigFile));
}
}
private YamlConfigFile readYamlConfigFile(String filename) {
Constructor constructor = new Constructor(YamlConfigFile.class);
constructor.setPropertyUtils(new UnderscoreConverterPropertyUtils());
try {
LOG.info("Reading YAML configuration from file: {}", filename);
Yaml yaml = new Yaml(constructor);
return yaml.loadAs(FileUtils.openInputStream(new File(filename)),
YamlConfigFile.class);
} catch (FileNotFoundException e) {
logExceptionOfYamlParse(filename, e);
throw new YamlParseException(YAML_PARSE_FAILED +
", file does not exist!");
} catch (Exception e) {
logExceptionOfYamlParse(filename, e);
throw new YamlParseException(
String.format(YAML_PARSE_FAILED + ", details: %s", e.getMessage()));
}
}
private void logExceptionOfYamlParse(String filename, Exception e) {
LOG.error(String.format("Exception while parsing YAML file %s", filename),
e);
}
private void storeJobInformation(RunJobParameters parameters,
ApplicationId applicationId, String[] args) throws IOException {
String jobName = parameters.getName();
Map<String, String> jobInfo = new HashMap<>();
jobInfo.put(StorageKeyConstants.JOB_NAME, jobName);
jobInfo.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
if (parameters.getCheckpointPath() != null) {
jobInfo.put(StorageKeyConstants.CHECKPOINT_PATH,
parameters.getCheckpointPath());
}
if (parameters.getInputPath() != null) {
jobInfo.put(StorageKeyConstants.INPUT_PATH,
parameters.getInputPath());
}
if (parameters.getSavedModelPath() != null) {
jobInfo.put(StorageKeyConstants.SAVED_MODEL_PATH,
parameters.getSavedModelPath());
}
String joinedArgs = String.join(" ", args);
jobInfo.put(StorageKeyConstants.JOB_RUN_ARGS, joinedArgs);
clientContext.getRuntimeFactory().getSubmarineStorage().addNewJob(jobName,
jobInfo);
}
@Override
public int run(String[] args)
throws ParseException, IOException, YarnException, SubmarineException {
if (CliUtils.argsForHelp(args)) {
printUsages();
return 0;
}
parseCommandLineAndGetRunJobParameters(args);
ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
RunJobParameters parameters =
(RunJobParameters) parametersHolder.getParameters();
storeJobInformation(parameters, applicationId, args);
if (parameters.isWaitJobFinish()) {
this.jobMonitor.waitTrainingFinal(parameters.getName());
}
return 0;
}
@VisibleForTesting
public JobSubmitter getJobSubmitter() {
return jobSubmitter;
}
@VisibleForTesting
public RunJobParameters getRunJobParameters() {
return (RunJobParameters) parametersHolder.getParameters();
}
}