blob: 6e6705e4f8145fa62b0a1d881c9feb3398e7178d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang3.StringUtils;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.FILES_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LIBJARS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.MULTIPLE_VALUE_SEPARATOR;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.RESUME_PATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
/**
* Base class for command line options that refer to a JAR file program.
*/
public abstract class ProgramOptions extends CommandLineOptions {
private final String jarFilePath;
private final String entryPointClass;
private final List<URL> classpaths;
private final String[] programArgs;
private final int parallelism;
private final boolean stdoutLogging;
private final boolean detachedMode;
private final SavepointRestoreSettings savepointSettings;
private final List<URI> libjars;
private final List<URI> files;
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
line.getOptionValues(ARGS_OPTION.getOpt()) :
line.getArgs();
if (line.hasOption(JAR_OPTION.getOpt())) {
this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
}
else if (args.length > 0) {
jarFilePath = args[0];
args = Arrays.copyOfRange(args, 1, args.length);
}
else {
jarFilePath = null;
}
this.programArgs = args;
List<URL> classpaths = new ArrayList<URL>();
if (line.hasOption(CLASSPATH_OPTION.getOpt())) {
for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {
try {
classpaths.add(new URL(path));
} catch (MalformedURLException e) {
throw new CliArgsException("Bad syntax for classpath: " + path);
}
}
}
this.classpaths = classpaths;
this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
line.getOptionValue(CLASS_OPTION.getOpt()) : null;
if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
try {
parallelism = Integer.parseInt(parString);
if (parallelism <= 0) {
throw new NumberFormatException();
}
}
catch (NumberFormatException e) {
throw new CliArgsException("The parallelism must be a positive number: " + parString);
}
}
else {
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
}
stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
YARN_DETACHED_OPTION.getOpt());
if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt()) && line.hasOption(RESUME_PATH_OPTION.getOpt())) {
throw new CliArgsException("Please only offer either savepoint path or resume path.");
}
if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
} else if (line.hasOption(RESUME_PATH_OPTION.getOpt())){
String checkpointPath = line.getOptionValue(RESUME_PATH_OPTION.getOpt());
boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
this.savepointSettings = SavepointRestoreSettings.forResumePath(checkpointPath, allowNonRestoredState);
} else {
this.savepointSettings = SavepointRestoreSettings.none();
}
libjars = extractMultipleURIOption(LIBJARS_OPTION.getLongOpt(), line);
files = extractMultipleURIOption(FILES_OPTION.getLongOpt(), line);
}
public String getJarFilePath() {
return jarFilePath;
}
public String getEntryPointClassName() {
return entryPointClass;
}
public List<URL> getClasspaths() {
return classpaths;
}
public String[] getProgramArgs() {
return programArgs;
}
public int getParallelism() {
return parallelism;
}
public boolean getStdoutLogging() {
return stdoutLogging;
}
public boolean getDetachedMode() {
return detachedMode;
}
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
public List<URI> getLibjars() {
return libjars;
}
public List<URI> getFiles() {
return files;
}
private List<URI> extractMultipleURIOption(String opt, CommandLine line) throws CliArgsException {
final List<URI> uris = new ArrayList<>();
if (line.hasOption(opt)) {
for (String items : line.getOptionValues(opt)) {
if (!StringUtils.isEmpty(items)) {
for (String item : items.split(MULTIPLE_VALUE_SEPARATOR)) {
try {
URI uri = new URI(item);
uris.add(uri.isAbsolute() ? uri : new URI("file://" + item));
} catch (URISyntaxException e) {
throw new CliArgsException("Bad syntax for URI: " + item);
}
}
}
}
}
return uris;
}
}