| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.launcher; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.spark.launcher.CommandBuilderUtils.*; |
| |
| /** |
| * Launcher for Spark applications. |
| * <p> |
| * Use this class to start Spark applications programmatically. The class uses a builder pattern |
| * to allow clients to configure the Spark application and launch it as a child process. |
| * </p> |
| */ |
| public class SparkLauncher extends AbstractLauncher<SparkLauncher> { |
| |
| /** The Spark master. */ |
| public static final String SPARK_MASTER = "spark.master"; |
| |
| /** The Spark deploy mode. */ |
| public static final String DEPLOY_MODE = "spark.submit.deployMode"; |
| |
| /** Configuration key for the driver memory. */ |
| public static final String DRIVER_MEMORY = "spark.driver.memory"; |
| /** Configuration key for the driver class path. */ |
| public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; |
| /** Configuration key for the driver VM options. */ |
| public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; |
| /** Configuration key for the driver native library path. */ |
| public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath"; |
| |
| /** Configuration key for the executor memory. */ |
| public static final String EXECUTOR_MEMORY = "spark.executor.memory"; |
| /** Configuration key for the executor class path. */ |
| public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; |
| /** Configuration key for the executor VM options. */ |
| public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; |
| /** Configuration key for the executor native library path. */ |
| public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"; |
| /** Configuration key for the number of executor CPU cores. */ |
| public static final String EXECUTOR_CORES = "spark.executor.cores"; |
| |
| static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"; |
| |
| static final String PYSPARK_PYTHON = "spark.pyspark.python"; |
| |
| static final String SPARKR_R_SHELL = "spark.r.shell.command"; |
| |
| /** Logger name to use when launching a child process. */ |
| public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"; |
| |
| /** |
| * A special value for the resource that tells Spark to not try to process the app resource as a |
| * file. This is useful when the class being executed is added to the application using other |
| * means - for example, by adding jars using the package download feature. |
| */ |
| public static final String NO_RESOURCE = "spark-internal"; |
| |
| /** |
| * Maximum time (in ms) to wait for a child process to connect back to the launcher server |
| * when using @link{#start()}. |
| */ |
| public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout"; |
| |
| /** Used internally to create unique logger names. */ |
| private static final AtomicInteger COUNTER = new AtomicInteger(); |
| |
| /** Factory for creating OutputRedirector threads. **/ |
| static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d"); |
| |
| static final Map<String, String> launcherConfig = new HashMap<>(); |
| |
| /** |
| * Set a configuration value for the launcher library. These config values do not affect the |
| * launched application, but rather the behavior of the launcher library itself when managing |
| * applications. |
| * |
| * @since 1.6.0 |
| * @param name Config name. |
| * @param value Config value. |
| */ |
| public static void setConfig(String name, String value) { |
| launcherConfig.put(name, value); |
| } |
| |
| // Visible for testing. |
| File workingDir; |
| boolean redirectErrorStream; |
| ProcessBuilder.Redirect errorStream; |
| ProcessBuilder.Redirect outputStream; |
| |
| public SparkLauncher() { |
| this(null); |
| } |
| |
| /** |
| * Creates a launcher that will set the given environment variables in the child. |
| * |
| * @param env Environment variables to set. |
| */ |
| public SparkLauncher(Map<String, String> env) { |
| if (env != null) { |
| this.builder.childEnv.putAll(env); |
| } |
| } |
| |
| /** |
| * Set a custom JAVA_HOME for launching the Spark application. |
| * |
| * @param javaHome Path to the JAVA_HOME to use. |
| * @return This launcher. |
| */ |
| public SparkLauncher setJavaHome(String javaHome) { |
| checkNotNull(javaHome, "javaHome"); |
| builder.javaHome = javaHome; |
| return this; |
| } |
| |
| /** |
| * Set a custom Spark installation location for the application. |
| * |
| * @param sparkHome Path to the Spark installation to use. |
| * @return This launcher. |
| */ |
| public SparkLauncher setSparkHome(String sparkHome) { |
| checkNotNull(sparkHome, "sparkHome"); |
| builder.childEnv.put(ENV_SPARK_HOME, sparkHome); |
| return this; |
| } |
| |
| /** |
| * Sets the working directory of spark-submit. |
| * |
| * @param dir The directory to set as spark-submit's working directory. |
| * @return This launcher. |
| */ |
| public SparkLauncher directory(File dir) { |
| workingDir = dir; |
| return this; |
| } |
| |
| /** |
| * Specifies that stderr in spark-submit should be redirected to stdout. |
| * |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectError() { |
| redirectErrorStream = true; |
| return this; |
| } |
| |
| /** |
| * Redirects error output to the specified Redirect. |
| * |
| * @param to The method of redirection. |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectError(ProcessBuilder.Redirect to) { |
| errorStream = to; |
| return this; |
| } |
| |
| /** |
| * Redirects standard output to the specified Redirect. |
| * |
| * @param to The method of redirection. |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) { |
| outputStream = to; |
| return this; |
| } |
| |
| /** |
| * Redirects error output to the specified File. |
| * |
| * @param errFile The file to which stderr is written. |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectError(File errFile) { |
| errorStream = ProcessBuilder.Redirect.to(errFile); |
| return this; |
| } |
| |
| /** |
| * Redirects error output to the specified File. |
| * |
| * @param outFile The file to which stdout is written. |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectOutput(File outFile) { |
| outputStream = ProcessBuilder.Redirect.to(outFile); |
| return this; |
| } |
| |
| /** |
| * Sets all output to be logged and redirected to a logger with the specified name. |
| * |
| * @param loggerName The name of the logger to log stdout and stderr. |
| * @return This launcher. |
| */ |
| public SparkLauncher redirectToLog(String loggerName) { |
| setConf(CHILD_PROCESS_LOGGER_NAME, loggerName); |
| return this; |
| } |
| |
| // The following methods just delegate to the parent class, but they are needed to keep |
| // binary compatibility with previous versions of this class. |
| |
| @Override |
| public SparkLauncher setPropertiesFile(String path) { |
| return super.setPropertiesFile(path); |
| } |
| |
| @Override |
| public SparkLauncher setConf(String key, String value) { |
| return super.setConf(key, value); |
| } |
| |
| @Override |
| public SparkLauncher setAppName(String appName) { |
| return super.setAppName(appName); |
| } |
| |
| @Override |
| public SparkLauncher setMaster(String master) { |
| return super.setMaster(master); |
| } |
| |
| @Override |
| public SparkLauncher setDeployMode(String mode) { |
| return super.setDeployMode(mode); |
| } |
| |
| @Override |
| public SparkLauncher setAppResource(String resource) { |
| return super.setAppResource(resource); |
| } |
| |
| @Override |
| public SparkLauncher setMainClass(String mainClass) { |
| return super.setMainClass(mainClass); |
| } |
| |
| @Override |
| public SparkLauncher addSparkArg(String arg) { |
| return super.addSparkArg(arg); |
| } |
| |
| @Override |
| public SparkLauncher addSparkArg(String name, String value) { |
| return super.addSparkArg(name, value); |
| } |
| |
| @Override |
| public SparkLauncher addAppArgs(String... args) { |
| return super.addAppArgs(args); |
| } |
| |
| @Override |
| public SparkLauncher addJar(String jar) { |
| return super.addJar(jar); |
| } |
| |
| @Override |
| public SparkLauncher addFile(String file) { |
| return super.addFile(file); |
| } |
| |
| @Override |
| public SparkLauncher addPyFile(String file) { |
| return super.addPyFile(file); |
| } |
| |
| @Override |
| public SparkLauncher setVerbose(boolean verbose) { |
| return super.setVerbose(verbose); |
| } |
| |
| /** |
| * Launches a sub-process that will start the configured Spark application. |
| * <p> |
| * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching |
| * Spark, since it provides better control of the child application. |
| * |
| * @return A process handle for the Spark app. |
| */ |
| public Process launch() throws IOException { |
| ProcessBuilder pb = createBuilder(); |
| |
| boolean outputToLog = outputStream == null; |
| boolean errorToLog = !redirectErrorStream && errorStream == null; |
| |
| String loggerName = getLoggerName(); |
| if (loggerName != null && outputToLog && errorToLog) { |
| pb.redirectErrorStream(true); |
| } |
| |
| Process childProc = pb.start(); |
| if (loggerName != null) { |
| InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream(); |
| new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY); |
| } |
| |
| return childProc; |
| } |
| |
| /** |
| * Starts a Spark application. |
| * |
| * <p> |
| * Applications launched by this launcher run as child processes. The child's stdout and stderr |
| * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection |
| * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be |
| * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that |
| * option is not set, the code will try to derive a name from the application's name or main |
| * class / script file. If those cannot be determined, an internal, unique name will be used. |
| * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more |
| * easily into the configuration of commonly-used logging systems. |
| * |
| * @since 1.6.0 |
| * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...) |
| * @param listeners Listeners to add to the handle before the app is launched. |
| * @return A handle for the launched application. |
| */ |
| @Override |
| public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException { |
| LauncherServer server = LauncherServer.getOrCreateServer(); |
| ChildProcAppHandle handle = new ChildProcAppHandle(server); |
| for (SparkAppHandle.Listener l : listeners) { |
| handle.addListener(l); |
| } |
| |
| String secret = server.registerHandle(handle); |
| |
| String loggerName = getLoggerName(); |
| ProcessBuilder pb = createBuilder(); |
| |
| boolean outputToLog = outputStream == null; |
| boolean errorToLog = !redirectErrorStream && errorStream == null; |
| |
| // Only setup stderr + stdout to logger redirection if user has not otherwise configured output |
| // redirection. |
| if (loggerName == null && (outputToLog || errorToLog)) { |
| String appName; |
| if (builder.appName != null) { |
| appName = builder.appName; |
| } else if (builder.mainClass != null) { |
| int dot = builder.mainClass.lastIndexOf("."); |
| if (dot >= 0 && dot < builder.mainClass.length() - 1) { |
| appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); |
| } else { |
| appName = builder.mainClass; |
| } |
| } else if (builder.appResource != null) { |
| appName = new File(builder.appResource).getName(); |
| } else { |
| appName = String.valueOf(COUNTER.incrementAndGet()); |
| } |
| String loggerPrefix = getClass().getPackage().getName(); |
| loggerName = String.format("%s.app.%s", loggerPrefix, appName); |
| } |
| |
| if (outputToLog && errorToLog) { |
| pb.redirectErrorStream(true); |
| } |
| |
| pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort())); |
| pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret); |
| try { |
| Process child = pb.start(); |
| InputStream logStream = null; |
| if (loggerName != null) { |
| logStream = outputToLog ? child.getInputStream() : child.getErrorStream(); |
| } |
| handle.setChildProc(child, loggerName, logStream); |
| } catch (IOException ioe) { |
| handle.kill(); |
| throw ioe; |
| } |
| |
| return handle; |
| } |
| |
| private ProcessBuilder createBuilder() throws IOException { |
| List<String> cmd = new ArrayList<>(); |
| cmd.add(findSparkSubmit()); |
| cmd.addAll(builder.buildSparkSubmitArgs()); |
| |
| // Since the child process is a batch script, let's quote things so that special characters are |
| // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are |
| // weird. |
| if (isWindows()) { |
| List<String> winCmd = new ArrayList<>(); |
| for (String arg : cmd) { |
| winCmd.add(quoteForBatchScript(arg)); |
| } |
| cmd = winCmd; |
| } |
| |
| ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()])); |
| for (Map.Entry<String, String> e : builder.childEnv.entrySet()) { |
| pb.environment().put(e.getKey(), e.getValue()); |
| } |
| |
| if (workingDir != null) { |
| pb.directory(workingDir); |
| } |
| |
| // Only one of redirectError and redirectError(...) can be specified. |
| // Similarly, if redirectToLog is specified, no other redirections should be specified. |
| checkState(!redirectErrorStream || errorStream == null, |
| "Cannot specify both redirectError() and redirectError(...) "); |
| checkState(getLoggerName() == null || |
| ((!redirectErrorStream && errorStream == null) || outputStream == null), |
| "Cannot used redirectToLog() in conjunction with other redirection methods."); |
| |
| if (redirectErrorStream) { |
| pb.redirectErrorStream(true); |
| } |
| if (errorStream != null) { |
| pb.redirectError(errorStream); |
| } |
| if (outputStream != null) { |
| pb.redirectOutput(outputStream); |
| } |
| |
| return pb; |
| } |
| |
| @Override |
| SparkLauncher self() { |
| return this; |
| } |
| |
| // Visible for testing. |
| String findSparkSubmit() { |
| String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; |
| return join(File.separator, builder.getSparkHome(), "bin", script); |
| } |
| |
| private String getLoggerName() throws IOException { |
| return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); |
| } |
| |
| } |