blob: d4b79fd5a202a8b12010776b68926cc04863aa82 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.parser;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.util.DateUtil;
import java.util.Date;
/**
* Validation helper functions to validate across process, feed and cluster definitions.
*/
public final class CrossEntityValidations {
private CrossEntityValidations() {}
public static void validateInstanceRange(Process process, Input input, Feed feed) throws FalconException {
try {
for (Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
validateFeedDefinedForCluster(feed, clusterName);
org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
clusterName).getValidity();
// Optinal end_date
if (feedValidity.getEnd() == null) {
feedValidity.setEnd(DateUtil.NEVER);
}
Date feedStart = feedValidity.getStart();
Date feedEnd = feedValidity.getEnd();
String instStartEL = input.getStart();
String instEndEL = input.getEnd();
ExpressionHelper evaluator = ExpressionHelper.get();
Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
ExpressionHelper.setReferenceDate(processValidity.getStart());
Date instStart = evaluator.evaluate(instStartEL, Date.class);
Date instEnd = evaluator.evaluate(instEndEL, Date.class);
if (instStart.before(feedStart)) {
throw new ValidationException("Start instance " + instStartEL + " of feed " + feed.getName()
+ " is before the start of feed " + feedValidity.getStart() + " for cluster "
+ clusterName);
}
if (instEnd.before(instStart)) {
throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
+ " is before the start instance " + instStartEL + " for cluster " + clusterName);
}
if (instEnd.after(feedEnd)) {
throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
+ " is after the end of feed " + feedValidity.getEnd() + " for cluster " + clusterName);
}
}
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new FalconException(e);
}
}
public static void validateFeedRetentionPeriod(String startInstance, Feed feed, String clusterName)
throws FalconException {
String feedRetention = FeedHelper.getCluster(feed, clusterName).getRetention().getLimit().toString();
ExpressionHelper evaluator = ExpressionHelper.get();
Date now = new Date();
ExpressionHelper.setReferenceDate(now);
Date instStart = evaluator.evaluate(startInstance, Date.class);
long feedDuration = evaluator.evaluate(feedRetention, Long.class);
Date feedStart = new Date(now.getTime() - feedDuration);
if (instStart.before(feedStart)) {
throw new ValidationException("StartInstance :" + startInstance + " of process is out of range for Feed: "
+ feed.getName() + " in cluster: " + clusterName + "'s retention limit :" + feedRetention);
}
}
// Mapping to oozie coord's dataset fields
public static void validateInstance(Process process, Output output, Feed feed) throws FalconException {
try {
for (Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
validateFeedDefinedForCluster(feed, clusterName);
org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
clusterName).getValidity();
Date feedStart = feedValidity.getStart();
Date feedEnd = feedValidity.getEnd();
String instEL = output.getInstance();
ExpressionHelper evaluator = ExpressionHelper.get();
Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
ExpressionHelper.setReferenceDate(processValidity.getStart());
Date inst = evaluator.evaluate(instEL, Date.class);
if (inst.before(feedStart)) {
throw new ValidationException("Instance " + instEL + " of feed " + feed.getName()
+ " is before the start of feed " + feedValidity.getStart() + " for cluster" + clusterName);
}
if (inst.after(feedEnd)) {
throw new ValidationException("End instance " + instEL + " for feed " + feed.getName()
+ " is after the end of feed " + feedValidity.getEnd() + " for cluster" + clusterName);
}
}
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new FalconException(e);
}
}
public static void validateInputPartition(Input input, Feed feed) throws ValidationException {
String[] parts = input.getPartition().split("/");
if (feed.getPartitions() == null || feed.getPartitions().getPartitions().isEmpty()
|| feed.getPartitions().getPartitions().size() < parts.length) {
throw new ValidationException("Partition specification in input " + input.getName() + " is wrong");
}
}
public static void validateFeedDefinedForCluster(Feed feed, String clusterName) throws FalconException {
if (FeedHelper.getCluster(feed, clusterName) == null) {
throw new ValidationException("Feed " + feed.getName() + " is not defined for cluster " + clusterName);
}
}
}