blob: c7533d822e266f227bee518359aeb9e5a6c5d432 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import javax.annotation.Nullable;
import java.util.Collection;
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command
* line options.
*/
public class CliFrontendParser {
static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");
static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
public static final Option CLASS_OPTION = new Option("c", "class", true,
"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
"JAR file does not specify the class in its manifest.");
static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
"classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " +
"times for specifying more than one URL. The protocol must be supported by the " +
"{@link java.net.URLClassLoader}.");
public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
"suppress logging output to standard out.");
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");
/**
* @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
*/
@Deprecated
public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " +
"the job in detached mode (deprecated; use non-YARN specific option instead)");
public static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
"Address of the JobManager (master) to which to connect. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration.");
static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
static final Option RESUME_PATH_OPTION = new Option("r", "resume", true,
"Path to a checkpoint dir to restore the job from latest externalized checkpoint. " +
"If " + CheckpointingOptions.CHCKPOINTS_CREATE_SUBDIRS + " is set to true as default, please give checkpoint directory at job-id level (for example hdfs:///user-defined-dir/job-id); " +
"if " + CheckpointingOptions.CHCKPOINTS_CREATE_SUBDIRS + " is set to false, please just give the user defined checkpoint directory (for example hdfs:///user-defined-dir). " +
"If no valid checkpoint found, job would continue running from scratch.");
static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
"Allow to skip savepoint state that cannot be restored. " +
"You need to allow this if you removed an operator from your " +
"program that was part of the program when the savepoint was triggered.");
static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
"Path of savepoint to dispose.");
// list specific options
static final Option RUNNING_OPTION = new Option("r", "running", false,
"Show only running programs and their JobIDs");
static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false,
"Show only scheduled programs and their JobIDs");
static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option(
"s", "withSavepoint", true, "Trigger savepoint and cancel job. The target " +
"directory is optional. If no directory is specified, the configured default " +
"directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
static final Option MODIFY_PARALLELISM_OPTION = new Option("p", "parallelism", true, "New parallelism for the specified job.");
static final String MULTIPLE_VALUE_SEPARATOR = ",";
static final Option LIBJARS_OPTION = new Option(null, "libjars", true,
"Attach custom library jars for job. Directory could not be supported. " +
"Use '" + MULTIPLE_VALUE_SEPARATOR + "' to separate multiple jars. " +
"The jars could be in local file system or distributed file system. " +
"Use URI schema to specify which file system the jar belongs. " +
"If schema is missing, would try to get the jars in local file system. " +
"(eg: --libjars file:///tmp/dependency1.jar,hdfs:///$namenode_address/tmp/dependency2.jar)");
static final Option FILES_OPTION = new Option(null, "files", true,
"Attach custom files for job. Directory could not be supported. " +
"Use '" + MULTIPLE_VALUE_SEPARATOR + "' to separate multiple files. " +
"The files could be in local file system or distributed file system. " +
"Use URI schema to specify which file system the file belongs. " +
"If schema is missing, would try to get the file in local file system. " +
"Use '#' after the file path to specify retrieval key in runtime. " +
"(eg: --file file:///tmp/a.txt#file_key,hdfs:///$namenode_address/tmp/b.txt)");
static {
HELP_OPTION.setRequired(false);
JAR_OPTION.setRequired(false);
JAR_OPTION.setArgName("jarfile");
CLASS_OPTION.setRequired(false);
CLASS_OPTION.setArgName("classname");
CLASSPATH_OPTION.setRequired(false);
CLASSPATH_OPTION.setArgName("url");
ADDRESS_OPTION.setRequired(false);
ADDRESS_OPTION.setArgName("host:port");
PARALLELISM_OPTION.setRequired(false);
PARALLELISM_OPTION.setArgName("parallelism");
LOGGING_OPTION.setRequired(false);
DETACHED_OPTION.setRequired(false);
YARN_DETACHED_OPTION.setRequired(false);
ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
RUNNING_OPTION.setRequired(false);
SCHEDULED_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");
RESUME_PATH_OPTION.setRequired(false);
RESUME_PATH_OPTION.setArgName("resumePath");
SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);
ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
MODIFY_PARALLELISM_OPTION.setRequired(false);
MODIFY_PARALLELISM_OPTION.setArgName("newParallelism");
LIBJARS_OPTION.setRequired(false);
LIBJARS_OPTION.setArgName("libraryJars");
FILES_OPTION.setRequired(false);
FILES_OPTION.setArgName("files");
}
private static final Options RUN_OPTIONS = getRunCommandOptions();
private static Options buildGeneralOptions(Options options) {
options.addOption(HELP_OPTION);
// backwards compatibility: ignore verbose flag (-v)
options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
return options;
}
private static Options getProgramSpecificOptions(Options options) {
options.addOption(JAR_OPTION);
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
options.addOption(LIBJARS_OPTION);
options.addOption(FILES_OPTION);
return options;
}
private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(LIBJARS_OPTION);
options.addOption(FILES_OPTION);
return options;
}
public static Options getRunCommandOptions() {
Options options = buildGeneralOptions(new Options());
options = getProgramSpecificOptions(options);
options.addOption(SAVEPOINT_PATH_OPTION);
options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
return options.addOption(RESUME_PATH_OPTION);
}
static Options getInfoCommandOptions() {
Options options = buildGeneralOptions(new Options());
return getProgramSpecificOptions(options);
}
static Options getListCommandOptions() {
Options options = buildGeneralOptions(new Options());
options.addOption(RUNNING_OPTION);
return options.addOption(SCHEDULED_OPTION);
}
static Options getCancelCommandOptions() {
Options options = buildGeneralOptions(new Options());
return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
}
static Options getStopCommandOptions() {
return buildGeneralOptions(new Options());
}
static Options getSavepointCommandOptions() {
Options options = buildGeneralOptions(new Options());
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return options.addOption(JAR_OPTION);
}
static Options getModifyOptions() {
final Options options = buildGeneralOptions(new Options());
options.addOption(MODIFY_PARALLELISM_OPTION);
return options;
}
// --------------------------------------------------------------------------------------------
// Help
// --------------------------------------------------------------------------------------------
private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
o.addOption(SAVEPOINT_PATH_OPTION);
o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
return o.addOption(RESUME_PATH_OPTION);
}
private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
return options;
}
private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(RUNNING_OPTION);
return options.addOption(SCHEDULED_OPTION);
}
private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
}
private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
return options;
}
private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(SAVEPOINT_DISPOSE_OPTION);
options.addOption(JAR_OPTION);
return options;
}
/**
* Prints the help for the client.
*/
public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
System.out.println();
System.out.println("The following actions are available:");
printHelpForRun(customCommandLines);
printHelpForInfo();
printHelpForList(customCommandLines);
printHelpForStop(customCommandLines);
printHelpForCancel(customCommandLines);
printHelpForSavepoint(customCommandLines);
printHelpForModify(customCommandLines);
System.out.println();
}
public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"run\" compiles and runs a program.");
System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, true);
System.out.println();
}
public static void printHelpForInfo() {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");
System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
System.out.println();
}
public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"list\" lists running and scheduled programs.");
System.out.println("\n Syntax: list [OPTIONS]");
formatter.setSyntaxPrefix(" \"list\" action options:");
formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"cancel\" cancels a running program.");
System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
public static void printHelpForModify(Collection<CustomCommandLine<?>> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"modify\" modifies a running job (e.g. change of parallelism).");
System.out.println("\n Syntax: modify <Job ID> [OPTIONS]");
formatter.setSyntaxPrefix(" \"modify\" action options:");
formatter.printHelp(" ", getModifyOptions());
printCustomCliOptions(customCommandLines, formatter, false);
System.out.println();
}
/**
* Prints custom cli options.
* @param formatter The formatter to use for printing
* @param runOptions True if the run options should be printed, False to print only general options
*/
private static void printCustomCliOptions(
Collection<CustomCommandLine<?>> customCommandLines,
HelpFormatter formatter,
boolean runOptions) {
// prints options from all available command-line classes
for (CustomCommandLine cli: customCommandLines) {
formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:");
Options customOpts = new Options();
cli.addGeneralOptions(customOpts);
if (runOptions) {
cli.addRunOptions(customOpts);
}
formatter.printHelp(" ", customOpts);
System.out.println();
}
}
// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
public static RunOptions parseRunCommand(String[] args) throws CliArgsException {
try {
DefaultParser parser = new DefaultParser();
CommandLine line = parser.parse(RUN_OPTIONS, args, true);
return new RunOptions(line);
}
catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException {
final DefaultParser parser = new DefaultParser();
try {
return parser.parse(options, args, stopAtNonOptions);
} catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}
/**
* Merges the given {@link Options} into a new Options object.
*
* @param optionsA options to merge, can be null if none
* @param optionsB options to merge, can be null if none
* @return
*/
public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {
final Options resultOptions = new Options();
if (optionsA != null) {
for (Option option : optionsA.getOptions()) {
resultOptions.addOption(option);
}
}
if (optionsB != null) {
for (Option option : optionsB.getOptions()) {
resultOptions.addOption(option);
}
}
return resultOptions;
}
}