blob: c455db37abecce66b2730a1a423b60fd98198f50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.core.starter.spark;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.core.starter.config.PluginType;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.spark.config.StarterConstant;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A Starter to generate spark-submit command for SeaTunnel job on spark.
*/
public class SparkStarter implements Starter {
/**
* original commandline args
*/
protected String[] args;
/**
* args parsed from {@link #args}
*/
protected SparkCommandArgs commandArgs;
/**
* the spark application name
*/
protected String appName;
/**
* jars to include on the spark driver and executor classpaths
*/
protected List<Path> jars = new ArrayList<>();
/**
* files to be placed in the working directory of each spark executor
*/
protected List<Path> files = new ArrayList<>();
/**
* spark configuration properties
*/
protected Map<String, String> sparkConf;
private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
this.args = args;
this.commandArgs = commandArgs;
}
@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) throws IOException {
SparkStarter starter = getInstance(args);
List<String> command = starter.buildCommands();
System.out.println(String.join(" ", command));
}
/**
* method to get SparkStarter instance, will return
* {@link ClusterModeSparkStarter} or
* {@link ClientModeSparkStarter} depending on deploy mode.
*/
static SparkStarter getInstance(String[] args) {
SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), StarterConstant.SHELL_NAME, true);
DeployMode deployMode = commandArgs.getDeployMode();
switch (deployMode) {
case CLUSTER:
return new ClusterModeSparkStarter(args, commandArgs);
case CLIENT:
return new ClientModeSparkStarter(args, commandArgs);
default:
throw new IllegalArgumentException("Deploy mode " + deployMode + " not supported");
}
}
@Override
public List<String> buildCommands() throws IOException {
setSparkConf();
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(getConnectorJarDependencies());
this.appName = this.sparkConf.getOrDefault("spark.app.name", Constants.LOGO);
return buildFinal();
}
/**
* parse spark configurations from SeaTunnel config file
*/
private void setSparkConf() throws FileNotFoundException {
commandArgs.getVariables()
.stream()
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2)
.forEach(pair -> System.setProperty(pair[0], pair[1]));
this.sparkConf = getSparkConf(commandArgs.getConfigFile());
String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
if (!commandArgs.getVariables().isEmpty()) {
String properties = commandArgs.getVariables()
.stream()
.map(v -> "-D" + v)
.collect(Collectors.joining(" "));
driverJavaOpts += " " + properties;
executorJavaOpts += " " + properties;
this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim());
this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim());
}
}
/**
* Get spark configurations from SeaTunnel job config file.
*/
static Map<String, String> getSparkConf(String configFile) throws FileNotFoundException {
File file = new File(configFile);
if (!file.exists()) {
throw new FileNotFoundException("config file '" + file + "' does not exists!");
}
Config appConfig = ConfigFactory.parseFile(file)
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
return appConfig.getConfig("env")
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().unwrapped().toString()));
}
/**
* return connector's jars, which located in 'connectors/spark/*'.
*/
private List<Path> getConnectorJarDependencies() {
Path pluginRootDir = Common.connectorJarDir("seatunnel");
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
return Collections.emptyList();
}
Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
Set<URL> pluginJars = new HashSet<>();
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
pluginJars.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
pluginJars.addAll(seaTunnelSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SINK)));
return pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
}
/**
* build final spark-submit commands
*/
protected List<String> buildFinal() {
List<String> commands = new ArrayList<>();
commands.add("${SPARK_HOME}/bin/spark-submit");
appendOption(commands, "--class", SeatunnelSpark.class.getName());
appendOption(commands, "--name", this.appName);
appendOption(commands, "--master", this.commandArgs.getMaster());
appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getName());
appendJars(commands, this.jars);
appendFiles(commands, this.files);
appendSparkConf(commands, this.sparkConf);
appendAppJar(commands);
appendArgs(commands, args);
return commands;
}
/**
* append option to StringBuilder
*/
protected void appendOption(List<String> commands, String option, String value) {
commands.add(option);
commands.add("\"" + value.replace("\"", "\\\"") + "\"");
}
/**
* append jars option to StringBuilder
*/
protected void appendJars(List<String> commands, List<Path> paths) {
appendPaths(commands, "--jars", paths);
}
/**
* append files option to StringBuilder
*/
protected void appendFiles(List<String> commands, List<Path> paths) {
appendPaths(commands, "--files", paths);
}
/**
* append comma-split paths option to StringBuilder
*/
protected void appendPaths(List<String> commands, String option, List<Path> paths) {
if (!paths.isEmpty()) {
String values = paths.stream()
.map(Path::toString)
.collect(Collectors.joining(","));
appendOption(commands, option, values);
}
}
/**
* append spark configurations to StringBuilder
*/
protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
appendOption(commands, "--conf", key + "=" + value);
}
}
/**
* append original commandline args to StringBuilder
*/
protected void appendArgs(List<String> commands, String[] args) {
commands.addAll(Arrays.asList(args));
}
/**
* append appJar to StringBuilder
*/
protected void appendAppJar(List<String> commands) {
commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString());
}
@SuppressWarnings("checkstyle:Indentation")
private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType... pluginTypes) {
return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
List<? extends Config> configList = config.getConfigList(pluginType.getType());
return configList.stream()
.map(pluginConfig -> PluginIdentifier.of("seatunnel", pluginType.getType(),
pluginConfig.getString("plugin_name")));
}).collect(Collectors.toList());
}
/**
* a Starter for building spark-submit commands with client mode options
*/
private static class ClientModeSparkStarter extends SparkStarter {
/**
* client mode specified spark options
*/
private enum ClientModeSparkConfigs {
/**
* Memory for driver in client mode
*/
DriverMemory("--driver-memory", "spark.driver.memory"),
/**
* Extra Java options to pass to the driver in client mode
*/
DriverJavaOptions("--driver-java-options", "spark.driver.extraJavaOptions"),
/**
* Extra library path entries to pass to the driver in client mode
*/
DriverLibraryPath(" --driver-library-path", "spark.driver.extraLibraryPath"),
/**
* Extra class path entries to pass to the driver in client mode
*/
DriverClassPath("--driver-class-path", "spark.driver.extraClassPath");
private final String optionName;
private final String propertyName;
private static final Map<String, ClientModeSparkConfigs> PROPERTY_NAME_MAP = new HashMap<>();
static {
for (ClientModeSparkConfigs config : values()) {
PROPERTY_NAME_MAP.put(config.propertyName, config);
}
}
ClientModeSparkConfigs(String optionName, String propertyName) {
this.optionName = optionName;
this.propertyName = propertyName;
}
}
private ClientModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
super(args, commandArgs);
}
@Override
protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) {
String driverJavaOptions = this.sparkConf.get(config.propertyName);
if (StringUtils.isNotBlank(driverJavaOptions)) {
appendOption(commands, config.optionName, driverJavaOptions);
}
}
for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (ClientModeSparkConfigs.PROPERTY_NAME_MAP.containsKey(key)) {
continue;
}
appendOption(commands, "--conf", key + "=" + value);
}
}
}
/**
* a Starter for building spark-submit commands with cluster mode options
*/
private static class ClusterModeSparkStarter extends SparkStarter {
private ClusterModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
super(args, commandArgs);
}
@Override
public List<String> buildCommands() throws IOException {
Common.setDeployMode(commandArgs.getDeployMode());
Common.setStarter(true);
Path pluginTarball = Common.pluginTarball();
CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
this.files.add(pluginTarball);
this.files.add(Paths.get(commandArgs.getConfigFile()));
return super.buildCommands();
}
}
}