blob: 8a1256f73416ee1af2146e818ecb204b328ef03b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
/**
* Base class for launcher implementations.
*
* @since Spark 2.3.0
*/
public abstract class AbstractLauncher<T extends AbstractLauncher<T>> {
final SparkSubmitCommandBuilder builder;
AbstractLauncher() {
this.builder = new SparkSubmitCommandBuilder();
}
/**
* Set a custom properties file with Spark configuration for the application.
*
* @param path Path to custom properties file to use.
* @return This launcher.
*/
public T setPropertiesFile(String path) {
checkNotNull(path, "path");
builder.setPropertiesFile(path);
return self();
}
/**
* Set a single configuration value for the application.
*
* @param key Configuration key.
* @param value The value to use.
* @return This launcher.
*/
public T setConf(String key, String value) {
checkNotNull(key, "key");
checkNotNull(value, "value");
checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
builder.conf.put(key, value);
return self();
}
/**
* Set the application name.
*
* @param appName Application name.
* @return This launcher.
*/
public T setAppName(String appName) {
checkNotNull(appName, "appName");
builder.appName = appName;
return self();
}
/**
* Set the Spark master for the application.
*
* @param master Spark master.
* @return This launcher.
*/
public T setMaster(String master) {
checkNotNull(master, "master");
builder.master = master;
return self();
}
/**
* Set the deploy mode for the application.
*
* @param mode Deploy mode.
* @return This launcher.
*/
public T setDeployMode(String mode) {
checkNotNull(mode, "mode");
builder.deployMode = mode;
return self();
}
/**
* Set the main application resource. This should be the location of a jar file for Scala/Java
* applications, or a python script for PySpark applications.
*
* @param resource Path to the main application resource.
* @return This launcher.
*/
public T setAppResource(String resource) {
checkNotNull(resource, "resource");
builder.appResource = resource;
return self();
}
/**
* Sets the application class name for Java/Scala applications.
*
* @param mainClass Application's main class.
* @return This launcher.
*/
public T setMainClass(String mainClass) {
checkNotNull(mainClass, "mainClass");
builder.mainClass = mainClass;
return self();
}
/**
* Adds a no-value argument to the Spark invocation. If the argument is known, this method
* validates whether the argument is indeed a no-value argument, and throws an exception
* otherwise.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param arg Argument to add.
* @return This launcher.
*/
public T addSparkArg(String arg) {
SparkSubmitOptionParser validator = new ArgumentValidator(false);
validator.parse(Arrays.asList(arg));
builder.userArgs.add(arg);
return self();
}
/**
* Adds an argument with a value to the Spark invocation. If the argument name corresponds to
* a known argument, the code validates that the argument actually expects a value, and throws
* an exception otherwise.
* <p>
* It is safe to add arguments modified by other methods in this class (such as
* {@link #setMaster(String)} - the last invocation will be the one to take effect.
* <p>
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
* @since 1.5.0
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
*/
public T addSparkArg(String name, String value) {
SparkSubmitOptionParser validator = new ArgumentValidator(true);
if (validator.MASTER.equals(name)) {
setMaster(value);
} else if (validator.PROPERTIES_FILE.equals(name)) {
setPropertiesFile(value);
} else if (validator.CONF.equals(name)) {
String[] vals = value.split("=", 2);
setConf(vals[0], vals[1]);
} else if (validator.CLASS.equals(name)) {
setMainClass(value);
} else if (validator.JARS.equals(name)) {
builder.jars.clear();
for (String jar : value.split(",")) {
addJar(jar);
}
} else if (validator.FILES.equals(name)) {
builder.files.clear();
for (String file : value.split(",")) {
addFile(file);
}
} else if (validator.PY_FILES.equals(name)) {
builder.pyFiles.clear();
for (String file : value.split(",")) {
addPyFile(file);
}
} else {
validator.parse(Arrays.asList(name, value));
builder.userArgs.add(name);
builder.userArgs.add(value);
}
return self();
}
/**
* Adds command line arguments for the application.
*
* @param args Arguments to pass to the application's main class.
* @return This launcher.
*/
public T addAppArgs(String... args) {
for (String arg : args) {
checkNotNull(arg, "arg");
builder.appArgs.add(arg);
}
return self();
}
/**
* Adds a jar file to be submitted with the application.
*
* @param jar Path to the jar file.
* @return This launcher.
*/
public T addJar(String jar) {
checkNotNull(jar, "jar");
builder.jars.add(jar);
return self();
}
/**
* Adds a file to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public T addFile(String file) {
checkNotNull(file, "file");
builder.files.add(file);
return self();
}
/**
* Adds a python file / zip / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
*/
public T addPyFile(String file) {
checkNotNull(file, "file");
builder.pyFiles.add(file);
return self();
}
/**
* Enables verbose reporting for SparkSubmit.
*
* @param verbose Whether to enable verbose output.
* @return This launcher.
*/
public T setVerbose(boolean verbose) {
builder.verbose = verbose;
return self();
}
/**
* Starts a Spark application.
*
* <p>
* This method returns a handle that provides information about the running application and can
* be used to do basic interaction with it.
* <p>
* The returned handle assumes that the application will instantiate a single SparkContext
* during its lifetime. Once that context reports a final state (one that indicates the
* SparkContext has stopped), the handle will not perform new state transitions, so anything
* that happens after that cannot be monitored. If the underlying application is launched as
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
*
* @since 1.6.0
* @param listeners Listeners to add to the handle before the app is launched.
* @return A handle for the launched application.
*/
public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners)
throws IOException;
abstract T self();
private static class ArgumentValidator extends SparkSubmitOptionParser {
private final boolean hasValue;
ArgumentValidator(boolean hasValue) {
this.hasValue = hasValue;
}
@Override
protected boolean handle(String opt, String value) {
if (value == null && hasValue) {
throw new IllegalArgumentException(String.format("'%s' expects a value.", opt));
}
return true;
}
@Override
protected boolean handleUnknown(String opt) {
// Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
return true;
}
protected void handleExtraArgs(List<String> extra) {
// No op.
}
}
}