blob: 688e1f763c2058e5390373e53368aa4ebbc4b13d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.List;
import java.util.logging.Logger;
/**
* In-process launcher for Spark applications.
* <p>
* Use this class to start Spark applications programmatically. Applications launched using this
* class will run in the same process as the caller.
* <p>
* Because Spark only supports a single active instance of <code>SparkContext</code> per JVM, code
* that uses this class should be careful about which applications are launched. It's recommended
* that this launcher only be used to launch applications in cluster mode.
* <p>
* Also note that, when running applications in client mode, JVM-related configurations (like
* driver memory or configs which modify the driver's class path) do not take effect. Logging
* configuration is also inherited from the parent application.
*
* @since Spark 2.3.0
*/
public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
private static final Logger LOG = Logger.getLogger(InProcessLauncher.class.getName());
/**
* Starts a Spark application.
*
* @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
* @param listeners Listeners to add to the handle before the app is launched.
* @return A handle for the launched application.
*/
@Override
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
if (builder.isClientMode(builder.getEffectiveConfig())) {
LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher.");
}
Method main = findSparkSubmit();
LauncherServer server = LauncherServer.getOrCreateServer();
InProcessAppHandle handle = new InProcessAppHandle(server);
for (SparkAppHandle.Listener l : listeners) {
handle.addListener(l);
}
String secret = server.registerHandle(handle);
setConf(LauncherProtocol.CONF_LAUNCHER_PORT, String.valueOf(server.getPort()));
setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);
List<String> sparkArgs = builder.buildSparkSubmitArgs();
String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);
String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
"<unknown>");
handle.start(appName, main, argv);
return handle;
}
@Override
InProcessLauncher self() {
return this;
}
// Visible for testing.
Method findSparkSubmit() throws IOException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = getClass().getClassLoader();
}
Class<?> sparkSubmit;
// SPARK-22941: first try the new SparkSubmit interface that has better error handling,
// but fall back to the old interface in case someone is mixing & matching launcher and
// Spark versions.
try {
sparkSubmit = cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
} catch (Exception e1) {
try {
sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
} catch (Exception e2) {
throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.",
e2);
}
}
Method main;
try {
main = sparkSubmit.getMethod("main", String[].class);
} catch (Exception e) {
throw new IOException("Cannot find SparkSubmit main method.", e);
}
CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
"main method is not static.");
return main;
}
}