blob: 1fa675845162742236fb190e10dd502db2f4d310 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.process;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.CONTROLS;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.DATAIN;
import org.apache.falcon.oozie.coordinator.DATAOUT;
import org.apache.falcon.oozie.coordinator.DATASETS;
import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* Builds oozie coordinator for process.
*/
public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<Process> {
private static final int THIRTY_MINUTES = 30 * 60 * 1000;
public ProcessExecutionCoordinatorBuilder(Process entity) {
super(entity, LifeCycle.EXECUTION);
}
@Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
String coordName = getEntityName();
Path coordPath = getBuildPath(buildPath);
copySharedLibs(cluster, new Path(coordPath, "lib"));
COORDINATORAPP coord = new COORDINATORAPP();
// coord attributes
initializeCoordAttributes(cluster, coord, coordName);
CONTROLS controls = initializeControls(); // controls
coord.setControls(controls);
// Configuration
Properties props = createCoordDefaultConfiguration(cluster, coordName);
initializeInputPaths(cluster, coord, props); // inputs
initializeOutputPaths(cluster, coord, props); // outputs
Workflow processWorkflow = entity.getWorkflow();
propagateUserWorkflowProperties(processWorkflow, props);
// create parent wf
Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster,
coordPath);
WORKFLOW wf = new WORKFLOW();
wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
props.putAll(wfProps);
wf.setConfiguration(getConfig(props));
// set coord action to parent wf
org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
action.setWorkflow(wf);
coord.setAction(action);
Path marshalPath = marshal(cluster, coord, coordPath);
return Arrays.asList(getProperties(marshalPath, coordName));
}
@Override
protected WorkflowExecutionContext.EntityOperations getOperation() {
return WorkflowExecutionContext.EntityOperations.GENERATE;
}
private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
coord.setName(coordName);
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
cluster.getName());
coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
coord.setTimezone(entity.getTimezone().getID());
coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
}
private CONTROLS initializeControls()
throws FalconException {
CONTROLS controls = new CONTROLS();
controls.setConcurrency(String.valueOf(entity.getParallel()));
controls.setExecution(entity.getOrder().name());
Frequency timeout = entity.getTimeout();
long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
long timeoutInMillis;
if (timeout != null) {
timeoutInMillis = ExpressionHelper.get().
evaluate(entity.getTimeout().toString(), Long.class);
} else {
timeoutInMillis = frequencyInMillis * 6;
if (timeoutInMillis < THIRTY_MINUTES) {
timeoutInMillis = THIRTY_MINUTES;
}
}
controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
if (timeoutInMillis / frequencyInMillis * 2 > 0) {
controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
}
return controls;
}
private void initializeInputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getInputs() == null) {
props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "NONE");
props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), IGNORE);
props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), IGNORE);
return;
}
List<String> inputFeeds = new ArrayList<String>();
List<String> inputNames = new ArrayList<String>();
List<String> inputPaths = new ArrayList<String>();
List<String> inputFeedStorageTypes = new ArrayList<String>();
for (Input input : entity.getInputs().getInputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
if (!input.isOptional()) {
if (coord.getDatasets() == null) {
coord.setDatasets(new DATASETS());
}
if (coord.getInputEvents() == null) {
coord.setInputEvents(new INPUTEVENTS());
}
SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAIN datain = createDataIn(input);
coord.getInputEvents().getDataIn().add(datain);
}
String inputExpr = null;
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
props.put(input.getName(), inputExpr);
} else if (storage.getType() == Storage.TYPE.TABLE) {
inputExpr = "${coord:dataIn('" + input.getName() + "')}";
propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
}
inputFeeds.add(feed.getName());
inputPaths.add(inputExpr);
inputNames.add(input.getName());
inputFeedStorageTypes.add(storage.getType().name());
}
propagateLateDataProperties(inputFeeds, inputNames, inputPaths, inputFeedStorageTypes, props);
}
private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputNames, List<String> inputPaths,
List<String> inputFeedStorageTypes, Properties props) {
// populate late data handler - should-record action
props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(inputFeeds, '#'));
props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(inputNames, '#'));
props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(inputPaths, '#'));
// storage type for each corresponding feed sent as a param to LateDataHandler
// needed to compute usage based on storage type in LateDataHandler
props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), StringUtils.join(inputFeedStorageTypes, '#'));
}
private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
String datasetName, LocationType locationType) throws FalconException {
SYNCDATASET syncdataset = new SYNCDATASET();
syncdataset.setName(datasetName);
syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
String uriTemplate = storage.getUriTemplate(locationType);
if (storage.getType() == Storage.TYPE.TABLE) {
uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
}
syncdataset.setUriTemplate(uriTemplate);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
syncdataset.setTimezone(feed.getTimezone().getID());
if (feed.getAvailabilityFlag() == null) {
syncdataset.setDoneFlag("");
} else {
syncdataset.setDoneFlag(feed.getAvailabilityFlag());
}
return syncdataset;
}
private DATAIN createDataIn(Input input) {
DATAIN datain = new DATAIN();
datain.setName(input.getName());
datain.setDataset(input.getName());
datain.setStartInstance(getELExpression(input.getStart()));
datain.setEndInstance(getELExpression(input.getEnd()));
return datain;
}
private String getELExpression(String expr) {
if (expr != null) {
expr = "${" + expr + "}";
}
return expr;
}
private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getOutputs() == null) {
props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE");
props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
return;
}
if (coord.getDatasets() == null) {
coord.setDatasets(new DATASETS());
}
if (coord.getOutputEvents() == null) {
coord.setOutputEvents(new OUTPUTEVENTS());
}
List<String> outputFeeds = new ArrayList<String>();
List<String> outputPaths = new ArrayList<String>();
for (Output output : entity.getOutputs().getOutputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAOUT dataout = createDataOut(output);
coord.getOutputEvents().getDataOut().add(dataout);
String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
outputFeeds.add(feed.getName());
outputPaths.add(outputExpr);
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
props.put(output.getName(), outputExpr);
propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
} else if (storage.getType() == Storage.TYPE.TABLE) {
propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
}
}
// Output feed name and path for parent workflow
props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputPaths, ','));
}
private DATAOUT createDataOut(Output output) {
DATAOUT dataout = new DATAOUT();
dataout.setName(output.getName());
dataout.setDataset(output.getName());
dataout.setInstance(getELExpression(output.getInstance()));
return dataout;
}
private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
Storage storage, Properties props) throws FalconException {
// stats and meta paths
createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
}
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
COORDINATORAPP coord, Properties props, Storage storage) throws FalconException {
String name = output.getName();
String type = locType.name().toLowerCase();
SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
DATAOUT dataout = new DATAOUT();
dataout.setName(name + type);
dataout.setDataset(name + type);
dataout.setInstance(getELExpression(output.getInstance()));
OUTPUTEVENTS outputEvents = coord.getOutputEvents();
if (outputEvents == null) {
outputEvents = new OUTPUTEVENTS();
coord.setOutputEvents(outputEvents);
}
outputEvents.getDataOut().add(dataout);
String outputExpr = "${coord:dataOut('" + name + type + "')}";
props.put(name + "." + type, outputExpr);
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
processWorkflow.getName(), entity.getName()));
props.put("userWorkflowVersion", processWorkflow.getVersion());
props.put("userWorkflowEngine", processWorkflow.getEngine().value());
}
protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
String prefix = "falcon_" + input.getName();
propagateCommonCatalogTableProperties(tableStorage, props, prefix);
props.put(prefix + "_partition_filter_pig",
"${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
props.put(prefix + "_partition_filter_hive",
"${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
props.put(prefix + "_partition_filter_java",
"${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
}
}