| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.core.starter.spark; |
| |
| import org.apache.seatunnel.common.Constants; |
| import org.apache.seatunnel.common.config.Common; |
| import org.apache.seatunnel.common.config.DeployMode; |
| import org.apache.seatunnel.core.starter.Starter; |
| import org.apache.seatunnel.core.starter.config.ConfigBuilder; |
| import org.apache.seatunnel.core.starter.config.PluginType; |
| import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; |
| import org.apache.seatunnel.core.starter.spark.config.StarterConstant; |
| import org.apache.seatunnel.core.starter.utils.CommandLineUtils; |
| import org.apache.seatunnel.core.starter.utils.CompressionUtils; |
| import org.apache.seatunnel.plugin.discovery.PluginIdentifier; |
| import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; |
| import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; |
| |
| import org.apache.seatunnel.shade.com.typesafe.config.Config; |
| import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; |
| import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions; |
| |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.URL; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| /** |
| * A Starter to generate spark-submit command for SeaTunnel job on spark. |
| */ |
| public class SparkStarter implements Starter { |
| |
| /** |
| * original commandline args |
| */ |
| protected String[] args; |
| |
| /** |
| * args parsed from {@link #args} |
| */ |
| protected SparkCommandArgs commandArgs; |
| |
| /** |
| * the spark application name |
| */ |
| protected String appName; |
| |
| /** |
| * jars to include on the spark driver and executor classpaths |
| */ |
| protected List<Path> jars = new ArrayList<>(); |
| |
| /** |
| * files to be placed in the working directory of each spark executor |
| */ |
| protected List<Path> files = new ArrayList<>(); |
| |
| /** |
| * spark configuration properties |
| */ |
| protected Map<String, String> sparkConf; |
| |
| private SparkStarter(String[] args, SparkCommandArgs commandArgs) { |
| this.args = args; |
| this.commandArgs = commandArgs; |
| } |
| |
| @SuppressWarnings("checkstyle:RegexpSingleline") |
| public static void main(String[] args) throws IOException { |
| SparkStarter starter = getInstance(args); |
| List<String> command = starter.buildCommands(); |
| System.out.println(String.join(" ", command)); |
| } |
| |
| /** |
| * method to get SparkStarter instance, will return |
| * {@link ClusterModeSparkStarter} or |
| * {@link ClientModeSparkStarter} depending on deploy mode. |
| */ |
| static SparkStarter getInstance(String[] args) { |
| SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), StarterConstant.SHELL_NAME, true); |
| DeployMode deployMode = commandArgs.getDeployMode(); |
| switch (deployMode) { |
| case CLUSTER: |
| return new ClusterModeSparkStarter(args, commandArgs); |
| case CLIENT: |
| return new ClientModeSparkStarter(args, commandArgs); |
| default: |
| throw new IllegalArgumentException("Deploy mode " + deployMode + " not supported"); |
| } |
| } |
| |
| @Override |
| public List<String> buildCommands() throws IOException { |
| setSparkConf(); |
| Common.setDeployMode(commandArgs.getDeployMode()); |
| Common.setStarter(true); |
| this.jars.addAll(Common.getPluginsJarDependencies()); |
| this.jars.addAll(getConnectorJarDependencies()); |
| this.appName = this.sparkConf.getOrDefault("spark.app.name", Constants.LOGO); |
| return buildFinal(); |
| } |
| |
| /** |
| * parse spark configurations from SeaTunnel config file |
| */ |
| private void setSparkConf() throws FileNotFoundException { |
| commandArgs.getVariables() |
| .stream() |
| .filter(Objects::nonNull) |
| .map(variable -> variable.split("=", 2)) |
| .filter(pair -> pair.length == 2) |
| .forEach(pair -> System.setProperty(pair[0], pair[1])); |
| this.sparkConf = getSparkConf(commandArgs.getConfigFile()); |
| String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", ""); |
| String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", ""); |
| if (!commandArgs.getVariables().isEmpty()) { |
| String properties = commandArgs.getVariables() |
| .stream() |
| .map(v -> "-D" + v) |
| .collect(Collectors.joining(" ")); |
| driverJavaOpts += " " + properties; |
| executorJavaOpts += " " + properties; |
| this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim()); |
| this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim()); |
| } |
| } |
| |
| /** |
| * Get spark configurations from SeaTunnel job config file. |
| */ |
| static Map<String, String> getSparkConf(String configFile) throws FileNotFoundException { |
| File file = new File(configFile); |
| if (!file.exists()) { |
| throw new FileNotFoundException("config file '" + file + "' does not exists!"); |
| } |
| Config appConfig = ConfigFactory.parseFile(file) |
| .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)) |
| .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true)); |
| |
| return appConfig.getConfig("env") |
| .entrySet() |
| .stream() |
| .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().unwrapped().toString())); |
| } |
| |
| /** |
| * return connector's jars, which located in 'connectors/spark/*'. |
| */ |
| private List<Path> getConnectorJarDependencies() { |
| Path pluginRootDir = Common.connectorJarDir("seatunnel"); |
| if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) { |
| return Collections.emptyList(); |
| } |
| Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig(); |
| Set<URL> pluginJars = new HashSet<>(); |
| SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(); |
| SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); |
| pluginJars.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE))); |
| pluginJars.addAll(seaTunnelSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SINK))); |
| return pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList()); |
| } |
| |
| /** |
| * build final spark-submit commands |
| */ |
| protected List<String> buildFinal() { |
| List<String> commands = new ArrayList<>(); |
| commands.add("${SPARK_HOME}/bin/spark-submit"); |
| appendOption(commands, "--class", SeatunnelSpark.class.getName()); |
| appendOption(commands, "--name", this.appName); |
| appendOption(commands, "--master", this.commandArgs.getMaster()); |
| appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getName()); |
| appendJars(commands, this.jars); |
| appendFiles(commands, this.files); |
| appendSparkConf(commands, this.sparkConf); |
| appendAppJar(commands); |
| appendArgs(commands, args); |
| return commands; |
| } |
| |
| /** |
| * append option to StringBuilder |
| */ |
| protected void appendOption(List<String> commands, String option, String value) { |
| commands.add(option); |
| commands.add("\"" + value.replace("\"", "\\\"") + "\""); |
| } |
| |
| /** |
| * append jars option to StringBuilder |
| */ |
| protected void appendJars(List<String> commands, List<Path> paths) { |
| appendPaths(commands, "--jars", paths); |
| } |
| |
| /** |
| * append files option to StringBuilder |
| */ |
| protected void appendFiles(List<String> commands, List<Path> paths) { |
| appendPaths(commands, "--files", paths); |
| } |
| |
| /** |
| * append comma-split paths option to StringBuilder |
| */ |
| protected void appendPaths(List<String> commands, String option, List<Path> paths) { |
| if (!paths.isEmpty()) { |
| String values = paths.stream() |
| .map(Path::toString) |
| .collect(Collectors.joining(",")); |
| appendOption(commands, option, values); |
| } |
| } |
| |
| /** |
| * append spark configurations to StringBuilder |
| */ |
| protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) { |
| for (Map.Entry<String, String> entry : sparkConf.entrySet()) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| appendOption(commands, "--conf", key + "=" + value); |
| } |
| } |
| |
| /** |
| * append original commandline args to StringBuilder |
| */ |
| protected void appendArgs(List<String> commands, String[] args) { |
| commands.addAll(Arrays.asList(args)); |
| } |
| |
| /** |
| * append appJar to StringBuilder |
| */ |
| protected void appendAppJar(List<String> commands) { |
| commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString()); |
| } |
| |
| @SuppressWarnings("checkstyle:Indentation") |
| private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType... pluginTypes) { |
| return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> { |
| List<? extends Config> configList = config.getConfigList(pluginType.getType()); |
| return configList.stream() |
| .map(pluginConfig -> PluginIdentifier.of("seatunnel", pluginType.getType(), |
| pluginConfig.getString("plugin_name"))); |
| }).collect(Collectors.toList()); |
| } |
| |
| /** |
| * a Starter for building spark-submit commands with client mode options |
| */ |
| private static class ClientModeSparkStarter extends SparkStarter { |
| |
| /** |
| * client mode specified spark options |
| */ |
| private enum ClientModeSparkConfigs { |
| |
| /** |
| * Memory for driver in client mode |
| */ |
| DriverMemory("--driver-memory", "spark.driver.memory"), |
| |
| /** |
| * Extra Java options to pass to the driver in client mode |
| */ |
| DriverJavaOptions("--driver-java-options", "spark.driver.extraJavaOptions"), |
| |
| /** |
| * Extra library path entries to pass to the driver in client mode |
| */ |
| DriverLibraryPath(" --driver-library-path", "spark.driver.extraLibraryPath"), |
| |
| /** |
| * Extra class path entries to pass to the driver in client mode |
| */ |
| DriverClassPath("--driver-class-path", "spark.driver.extraClassPath"); |
| |
| private final String optionName; |
| |
| private final String propertyName; |
| |
| private static final Map<String, ClientModeSparkConfigs> PROPERTY_NAME_MAP = new HashMap<>(); |
| |
| static { |
| for (ClientModeSparkConfigs config : values()) { |
| PROPERTY_NAME_MAP.put(config.propertyName, config); |
| } |
| } |
| |
| ClientModeSparkConfigs(String optionName, String propertyName) { |
| this.optionName = optionName; |
| this.propertyName = propertyName; |
| } |
| } |
| |
| private ClientModeSparkStarter(String[] args, SparkCommandArgs commandArgs) { |
| super(args, commandArgs); |
| } |
| |
| @Override |
| protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) { |
| for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) { |
| String driverJavaOptions = this.sparkConf.get(config.propertyName); |
| if (StringUtils.isNotBlank(driverJavaOptions)) { |
| appendOption(commands, config.optionName, driverJavaOptions); |
| } |
| } |
| for (Map.Entry<String, String> entry : sparkConf.entrySet()) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| if (ClientModeSparkConfigs.PROPERTY_NAME_MAP.containsKey(key)) { |
| continue; |
| } |
| appendOption(commands, "--conf", key + "=" + value); |
| } |
| } |
| } |
| |
| /** |
| * a Starter for building spark-submit commands with cluster mode options |
| */ |
| private static class ClusterModeSparkStarter extends SparkStarter { |
| |
| private ClusterModeSparkStarter(String[] args, SparkCommandArgs commandArgs) { |
| super(args, commandArgs); |
| } |
| |
| @Override |
| public List<String> buildCommands() throws IOException { |
| Common.setDeployMode(commandArgs.getDeployMode()); |
| Common.setStarter(true); |
| Path pluginTarball = Common.pluginTarball(); |
| CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball); |
| this.files.add(pluginTarball); |
| this.files.add(Paths.get(commandArgs.getConfigFile())); |
| return super.buildCommands(); |
| } |
| } |
| } |