blob: 1329733f2d5532e1bdd215516b3607bb4eabbd72 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.workflow;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.converter.OozieProcessMapper;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.OozieClient;
import java.util.*;
/**
* Oozie workflow builder for falcon entities.
*/
public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
@Override
public Map<String, Properties> newWorkflowSchedule(Process process, List<String> clusters) throws FalconException {
Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
for (String clusterName : clusters) {
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
Properties properties = newWorkflowSchedule(process, processCluster.getValidity().getStart(), clusterName,
CurrentUser.getUser());
if (properties != null) {
propertiesMap.put(clusterName, properties);
}
}
return propertiesMap;
}
private void addOptionalInputProperties(Properties properties, Input in, String clusterName)
throws FalconException {
Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
String inName = in.getName();
properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
properties.put(inName + ".timezone", feed.getTimezone().getID());
properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
String locPath = FeedHelper.createStorage(clusterName, feed)
.getUriTemplate(LocationType.DATA).replace('$', '%');
properties.put(inName + ".uri-template", locPath);
properties.put(inName + ".start-instance", in.getStart());
properties.put(inName + ".end-instance", in.getEnd());
}
private Timeunit mapToCoordTimeUnit(TimeUnit tu) {
switch (tu) {
case days:
return Timeunit.DAY;
case hours:
return Timeunit.HOUR;
case minutes:
return Timeunit.MINUTE;
case months:
return Timeunit.MONTH;
default:
throw new IllegalArgumentException("Unhandled time unit " + tu);
}
}
@Override
public Properties newWorkflowSchedule(Process process, Date startDate, String clusterName, String user)
throws FalconException {
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
if (!startDate.before(processCluster.getValidity().getEnd())) {// start time >= end time
return null;
}
Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
Path bundlePath = new Path(ClusterHelper.getLocation(cluster, "staging"), EntityUtil.getStagingPath(process));
Process processClone = (Process) process.copy();
EntityUtil.setStartDate(processClone, clusterName, startDate);
OozieProcessMapper mapper = new OozieProcessMapper(processClone);
if (!mapper.map(cluster, bundlePath)) {
return null;
}
Properties properties = createAppProperties(clusterName, bundlePath, user);
//Add libpath
String libPath = process.getWorkflow().getLib();
if (!StringUtils.isEmpty(libPath)) {
String path = libPath.replace("${nameNode}", "");
properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
}
if (process.getInputs() != null) {
for (Input in : process.getInputs().getInputs()) {
if (in.isOptional()) {
addOptionalInputProperties(properties, in, clusterName);
}
}
}
return properties;
}
@Override
public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
process.getFrequency(), process.getTimezone(), now);
}
@Override
public String[] getWorkflowNames(Process process) {
return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString()};
}
}