blob: 51db75d67a609a0cc394dae46320d4e3100473ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.process;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.util.OozieUtils;
import org.apache.hadoop.fs.Path;
import javax.xml.bind.JAXBElement;
import java.util.List;
/**
* Builds orchestration workflow for process where engine is spark.
*/
public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
private static final String FALCON_PREFIX = "falcon_";
public SparkProcessWorkflowBuilder(Process entity) {
super(entity);
}
@Override
protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
ACTION action = unmarshalAction(ACTION_TEMPLATE);
JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement = OozieUtils.unMarshalSparkAction(action);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
String sparkMasterURL = entity.getSparkAttributes().getMaster();
Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar());
String sparkJobName = entity.getSparkAttributes().getName();
String sparkOpts = entity.getSparkAttributes().getSparkOpts();
String sparkClassName = entity.getSparkAttributes().getClazz();
String clusterEntitySparkMasterURL = getClusterEntitySparkMaster(cluster);
//Overriding cluster spark master url if defined in process entity
sparkMasterURL = (sparkMasterURL == null) ? clusterEntitySparkMasterURL : sparkMasterURL;
if (StringUtils.isBlank(sparkMasterURL)) {
throw new FalconException("Spark Master URL can'be empty");
}
sparkAction.setMaster(sparkMasterURL);
sparkAction.setName(sparkJobName);
addPrepareDeleteOutputPath(sparkAction);
if (StringUtils.isNotEmpty(sparkOpts)) {
sparkAction.setSparkOpts(sparkOpts);
}
if (StringUtils.isNotEmpty(sparkClassName)) {
sparkAction.setClazz(sparkClassName);
}
List<String> argList = sparkAction.getArg();
List<String> sparkArgs = entity.getSparkAttributes().getArgs();
if (sparkArgs != null) {
argList.addAll(sparkArgs);
}
//Adding output first so that final order must have input and then output followed by user's arguments.
addOutputFeedsAsArgument(argList, cluster);
addInputFeedsAsArgument(argList, cluster);
// In Oozie spark action, value for jar is either Java jar file path or Python file path.
validateSparkJarFilePath(sparkJarFilePath);
sparkAction.setJar(sparkJarFilePath.getName());
setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity);
propagateEntityProperties(sparkAction);
OozieUtils.marshalSparkAction(action, actionJaxbElement);
return action;
}
private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) {
if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
entity.getWorkflow().setLib(sparkJarFilePath);
} else {
String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath;
entity.getWorkflow().setLib(workflowLib);
}
}
private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException {
if (!sparkJarFilePath.isAbsolute()) {
throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath);
}
}
private void addPrepareDeleteOutputPath(org.apache.falcon.oozie.spark.ACTION sparkAction) throws FalconException {
List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
if (deleteOutputPathList.isEmpty()) {
return;
}
org.apache.falcon.oozie.spark.PREPARE prepare = new org.apache.falcon.oozie.spark.PREPARE();
List<org.apache.falcon.oozie.spark.DELETE> deleteList = prepare.getDelete();
for (String deletePath : deleteOutputPathList) {
org.apache.falcon.oozie.spark.DELETE delete = new org.apache.falcon.oozie.spark.DELETE();
delete.setPath(deletePath);
deleteList.add(delete);
}
if (!deleteList.isEmpty()) {
sparkAction.setPrepare(prepare);
}
}
private void propagateEntityProperties(org.apache.falcon.oozie.spark.ACTION sparkAction) {
CONFIGURATION conf = new CONFIGURATION();
super.propagateEntityProperties(conf, null);
List<Property> sparkConf = sparkAction.getConfiguration().getProperty();
for (CONFIGURATION.Property prop : conf.getProperty()) {
Property sparkProp = new Property();
sparkProp.setName(prop.getName());
sparkProp.setValue(prop.getValue());
sparkConf.add(sparkProp);
}
}
private void addInputFeedsAsArgument(List<String> argList, Cluster cluster) throws FalconException {
if (entity.getInputs() == null) {
return;
}
//Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
int numInputFeed = entity.getInputs().getInputs().size();
while (numInputFeed > 0) {
Input input = entity.getInputs().getInputs().get(numInputFeed-1);
Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
final String inputName = input.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + inputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}");
}
numInputFeed--;
}
}
private void addOutputFeedsAsArgument(List<String> argList, Cluster cluster) throws FalconException {
if (entity.getOutputs() == null) {
return;
}
//Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
int numOutputFeed = entity.getOutputs().getOutputs().size();
while (numOutputFeed > 0) {
Output output = entity.getOutputs().getOutputs().get(numOutputFeed-1);
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
final String outputName = output.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + outputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
}
numOutputFeed--;
}
}
private String getClusterEntitySparkMaster(Cluster cluster) {
return ClusterHelper.getSparkMasterEndPoint(cluster);
}
}