blob: b18a0b93593808661a9d1ed5cef8c4e0fe9209a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.SparkSubmit;
import java.util.ArrayList;
import java.util.List;
public class SparkMain extends LauncherMain {
private static final String MASTER_OPTION = "--master";
private static final String MODE_OPTION = "--deploy-mode";
private static final String JOB_NAME_OPTION = "--name";
private static final String CLASS_NAME_OPTION = "--class";
private static final String VERBOSE_OPTION = "--verbose";
private static final String DELIM = " ";
public static void main(String[] args) throws Exception {
run(SparkMain.class, args);
}
@Override
protected void run(String[] args) throws Exception {
Configuration actionConf = loadActionConf();
setYarnTag(actionConf);
LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
List<String> sparkArgs = new ArrayList<String>();
sparkArgs.add(MASTER_OPTION);
sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_MASTER));
String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
if (sparkDeployMode != null) {
sparkArgs.add(MODE_OPTION);
sparkArgs.add(sparkDeployMode);
}
sparkArgs.add(JOB_NAME_OPTION);
sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
String className = actionConf.get(SparkActionExecutor.SPARK_CLASS);
if (className != null) {
sparkArgs.add(CLASS_NAME_OPTION);
sparkArgs.add(className);
}
String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
if (StringUtils.isNotEmpty(sparkOpts)) {
String[] sparkOptions = sparkOpts.split(DELIM);
for (String opt : sparkOptions) {
sparkArgs.add(opt);
}
}
if (!sparkArgs.contains(VERBOSE_OPTION)) {
sparkArgs.add(VERBOSE_OPTION);
}
String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
sparkArgs.add(jarPath);
for (String arg : args) {
sparkArgs.add(arg);
}
System.out.println("Spark Action Main class : " + SparkSubmit.class.getName());
System.out.println();
System.out.println("Oozie Spark action configuration");
System.out.println("=================================================================");
System.out.println();
for (String arg : sparkArgs) {
System.out.println(" " + arg);
}
System.out.println();
runSpark(sparkArgs.toArray(new String[sparkArgs.size()]));
}
private void runSpark(String[] args) throws Exception {
System.out.println("=================================================================");
System.out.println();
System.out.println(">>> Invoking Spark class now >>>");
System.out.println();
System.out.flush();
SparkSubmit.main(args);
}
}