blob: b9777526815bc387d37221f9b11ed56bd78d68a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.parser;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.Properties;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.ACL;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.SparkAttributes;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.HadoopQueueUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
/**
* Concrete Parser which has XML parsing and validation logic for Process XML.
*/
public class ProcessEntityParser extends EntityParser<Process> {
private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);
public ProcessEntityParser() {
super(EntityType.PROCESS);
}
@Override
public void validate(Process process) throws FalconException {
validate(process, true);
}
public void validate(Process process, boolean checkDependentFeeds) throws FalconException {
if (process.getTimezone() == null) {
process.setTimezone(TimeZone.getTimeZone("UTC"));
}
validateACL(process);
// check if dependent entities exists
Set<String> clusters = new HashSet<String>();
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
if (!clusters.add(cluster.getName())) {
throw new ValidationException("Cluster: " + cluster.getName()
+ " is defined more than once for process: " + process.getName());
}
validateEntityExists(EntityType.CLUSTER, clusterName);
// Optinal end_date
if (cluster.getValidity().getEnd() == null) {
cluster.getValidity().setEnd(DateUtil.NEVER);
}
// set Cluster version
int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
throw new ValidationException("Process should not set cluster to a version that does not exist");
} else {
cluster.setVersion(clusterVersion);
}
validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
validateHDFSPaths(process, clusterName);
validateProperties(process);
if (checkDependentFeeds) {
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
validateEntityExists(EntityType.FEED, input.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, feed);
validateInputPartition(input, feed);
validateOptionalInputsForTableStorage(feed, input);
}
}
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
validateEntityExists(EntityType.FEED, output.getFeed());
Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateInstance(process, output, feed);
}
}
}
}
validateDatasetName(process.getInputs(), process.getOutputs());
validateLateInputs(process);
validateProcessSLA(process);
validateHadoopQueue(process);
validateProcessEntity(process);
}
private void validateProcessSLA(Process process) throws FalconException {
if (process.getSla() != null) {
ExpressionHelper evaluator = ExpressionHelper.get();
ExpressionHelper.setReferenceDate(new Date());
Frequency shouldStartExpression = process.getSla().getShouldStartIn();
Frequency shouldEndExpression = process.getSla().getShouldEndIn();
Frequency timeoutExpression = process.getTimeout();
if (shouldStartExpression != null){
Date shouldStart = new Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class));
if (shouldEndExpression != null) {
Date shouldEnd = new Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class));
if (shouldStart.after(shouldEnd)) {
throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
+ "is greater than shouldEndIn: "
+ shouldEndExpression);
}
}
if (timeoutExpression != null) {
Date timeout = new Date(evaluator.evaluate(timeoutExpression.toString(), Long.class));
if (timeout.before(shouldStart)) {
throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
+ " is greater than timeout: " + process.getTimeout());
}
}
}
}
}
/**
* Validate if the user submitting this entity has access to the specific dirs on HDFS.
*
* @param process process
* @param clusterName cluster the process is materialized on
* @throws FalconException
*/
private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
org.apache.falcon.entity.v0.cluster.Cluster cluster =
ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
if (!EntityUtil.responsibleFor(cluster.getColo())) {
return;
}
String workflowPath = process.getWorkflow().getPath();
String libPath = process.getWorkflow().getLib();
String nameNode = getNameNode(cluster);
try {
Configuration configuration = ClusterHelper.getConfiguration(cluster);
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
if (!fs.exists(new Path(workflowPath))) {
throw new ValidationException(
"Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
}
if (StringUtils.isNotBlank(libPath)) {
String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR);
for (String path : libPaths) {
if (!fs.exists(new Path(path))) {
throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode);
}
}
}
} catch (IOException e) {
throw new FalconException("Error validating workflow path " + workflowPath, e);
}
}
private String getNameNode(Cluster cluster) throws ValidationException {
// cluster should never be null as it is validated while submitting feeds.
if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
throw new ValidationException(
"Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
}
return ClusterHelper.getStorageUrl(cluster);
}
private void validateProcessValidity(Date start, Date end) throws FalconException {
try {
if (!start.before(end)) {
throw new ValidationException(
"Process start time: " + start + " should be before process end time: " + end);
}
} catch (ValidationException e) {
throw new ValidationException(e);
} catch (Exception e) {
throw new FalconException(e);
}
}
private void validateInputPartition(Input input, Feed feed) throws FalconException {
if (input.getPartition() == null) {
return;
}
final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
CrossEntityValidations.validateInputPartition(input, feed);
} else if (baseFeedStorageType == Storage.TYPE.TABLE) {
throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
}
}
private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
Set<String> datasetNames = new HashSet<String>();
if (inputs != null) {
for (Input input : inputs.getInputs()) {
if (!datasetNames.add(input.getName())) {
throw new ValidationException("Input name: " + input.getName() + " is already used");
}
}
}
if (outputs != null) {
for (Output output : outputs.getOutputs()) {
if (!datasetNames.add(output.getName())) {
throw new ValidationException("Output name: " + output.getName() + " is already used");
}
}
}
}
private void validateLateInputs(Process process) throws ValidationException {
if (process.getLateProcess() == null) {
return;
}
Map<String, String> feeds = new HashMap<String, String>();
if (process.getInputs() != null) {
for (Input in : process.getInputs().getInputs()) {
feeds.put(in.getName(), in.getFeed());
}
}
for (LateInput lp : process.getLateProcess().getLateInputs()) {
if (!feeds.keySet().contains(lp.getInput())) {
throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
}
try {
Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
if (feed.getLateArrival() == null) {
throw new ValidationException(
"Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
}
} catch (FalconException e) {
throw new ValidationException(e);
}
}
}
private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
throw new ValidationException("Optional Input is not supported for feeds with table storage! "
+ input.getName());
}
}
/**
* Validate ACL if authorization is enabled.
*
* @param process process entity
* @throws ValidationException
*/
protected void validateACL(Process process) throws FalconException {
if (isAuthorizationDisabled) {
return;
}
// Validate the entity owner is logged-in, authenticated user if authorization is enabled
ACL processACL = process.getACL();
if (processACL == null) {
throw new ValidationException("Process ACL cannot be empty for: " + process.getName());
}
validateACLOwnerAndGroup(processACL);
try {
authorize(process.getName(), processACL);
} catch (AuthorizationException e) {
throw new ValidationException(e);
}
}
protected void validateProperties(Process process) throws ValidationException {
Properties properties = process.getProperties();
if (properties == null) {
return; // Cluster has no properties to validate.
}
List<Property> propertyList = process.getProperties().getProperties();
HashSet<String> propertyKeys = new HashSet<String>();
for (Property prop : propertyList) {
if (StringUtils.isBlank(prop.getName())) {
throw new ValidationException("Property name and value cannot be empty for Process : "
+ process.getName());
}
if (!propertyKeys.add(prop.getName())) {
throw new ValidationException("Multiple properties with same name found for Process : "
+ process.getName());
}
}
}
private void validateHadoopQueue(Process process) throws FalconException {
// get queue name specified in the process entity
String processQueueName = null;
java.util.Properties props = EntityUtil.getEntityProperties(process);
if ((props != null) && (props.containsKey(EntityUtil.MR_QUEUE_NAME))) {
processQueueName = props.getProperty(EntityUtil.MR_QUEUE_NAME);
} else {
return;
}
// iterate through each cluster in process entity to check if the cluster has the process entity queue
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
if (rmURL == null) {
rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
}
if (rmURL != null) {
LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL);
Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
if (queueNames.contains(processQueueName)) {
LOG.info("Validated presence of queue {} specified in process "
+ "entity for cluster {}", processQueueName, clusterName);
} else {
String strMsg = String.format("The hadoop queue name %s specified in process "
+ "entity for cluster %s is invalid.", processQueueName, cluster.getName());
LOG.info(strMsg);
throw new FalconException(strMsg);
}
}
}
}
protected void validateProcessEntity(Process process) throws FalconException {
validateSparkProcessEntity(process, process.getSparkAttributes());
}
private void validateSparkProcessEntity(Process process, SparkAttributes sparkAttributes) throws
FalconException {
Workflow workflow = process.getWorkflow();
if (workflow.getEngine() == EngineType.SPARK) {
if (sparkAttributes == null) {
throw new ValidationException(
"For Spark Workflow engine Spark Attributes in Process Entity can't be null");
} else {
String clusterName = process.getClusters().getClusters().get(0).getName();
org.apache.falcon.entity.v0.cluster.Cluster cluster =
ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
String processEntitySparkMaster = sparkAttributes.getMaster();
String sparkMaster = (processEntitySparkMaster == null)
? clusterEntitySparkMaster
: processEntitySparkMaster;
if (StringUtils.isEmpty(sparkMaster)
|| StringUtils.isEmpty(sparkAttributes.getJar())) {
throw new ValidationException("Spark master and jar/python file can't be null");
}
}
}
}
}