blob: a00b3ad7f387165d4776bea793d84e4fe6ebe041 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Lifecycle;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.lifecycle.FeedLifecycleStage;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.util.BuildProperties;
import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Matcher;
/**
* Feed entity helper methods.
*/
public final class FeedHelper {
private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class);
private static final int ONE_MS = 1;
public static final String FORMAT = "yyyyMMddHHmm";
private FeedHelper() {}
public static Cluster getCluster(Feed feed, String clusterName) {
for (Cluster cluster : feed.getClusters().getClusters()) {
if (cluster.getName().equals(clusterName)) {
return cluster;
}
}
return null;
}
public static Storage createStorage(Feed feed) throws FalconException {
final Locations feedLocations = feed.getLocations();
if (feedLocations != null
&& feedLocations.getLocations().size() != 0) {
return new FileSystemStorage(feed);
}
try {
final CatalogTable table = feed.getTable();
if (table != null) {
return new CatalogStorage(feed);
}
} catch (URISyntaxException e) {
throw new FalconException(e);
}
throw new FalconException("Both catalog and locations are not defined.");
}
public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
Feed feed) throws FalconException {
return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
}
public static Storage createStorage(String clusterName, Feed feed)
throws FalconException {
return createStorage(getCluster(feed, clusterName), feed);
}
public static Storage createStorage(Cluster cluster, Feed feed)
throws FalconException {
final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
return createStorage(cluster, feed, clusterEntity);
}
public static Storage createStorage(Cluster cluster, Feed feed,
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
throws FalconException {
final List<Location> locations = getLocations(cluster, feed);
if (locations != null) {
return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
}
try {
final CatalogTable table = getTable(cluster, feed);
if (table != null) {
return new CatalogStorage(clusterEntity, table);
}
} catch (URISyntaxException e) {
throw new FalconException(e);
}
throw new FalconException("Both catalog and locations are not defined.");
}
/**
* Factory method to dole out a storage instance used for replication source.
*
* @param clusterEntity cluster entity
* @param feed feed entity
* @return an implementation of Storage
* @throws FalconException
*/
public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
Feed feed) throws FalconException {
Cluster feedCluster = getCluster(feed, clusterEntity.getName());
final List<Location> locations = getLocations(feedCluster, feed);
if (locations != null) {
return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations);
}
try {
final CatalogTable table = getTable(feedCluster, feed);
if (table != null) {
return new CatalogStorage(clusterEntity, table);
}
} catch (URISyntaxException e) {
throw new FalconException(e);
}
throw new FalconException("Both catalog and locations are not defined.");
}
public static Storage createStorage(String type, String storageUriTemplate)
throws URISyntaxException {
Storage.TYPE storageType = Storage.TYPE.valueOf(type);
if (storageType == Storage.TYPE.FILESYSTEM) {
return new FileSystemStorage(storageUriTemplate);
} else if (storageType == Storage.TYPE.TABLE) {
return new CatalogStorage(storageUriTemplate);
}
throw new IllegalArgumentException("Bad type: " + type);
}
public static Storage createStorage(String type, String storageUriTemplate,
Configuration conf) throws URISyntaxException {
Storage.TYPE storageType = Storage.TYPE.valueOf(type);
if (storageType == Storage.TYPE.FILESYSTEM) {
return new FileSystemStorage(storageUriTemplate);
} else if (storageType == Storage.TYPE.TABLE) {
return new CatalogStorage(storageUriTemplate, conf);
}
throw new IllegalArgumentException("Bad type: " + type);
}
public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
final Locations feedLocations = feed.getLocations();
if (feedLocations != null
&& feedLocations.getLocations().size() != 0) {
return Storage.TYPE.FILESYSTEM;
}
final CatalogTable table = feed.getTable();
if (table != null) {
return Storage.TYPE.TABLE;
}
throw new FalconException("Both catalog and locations are not defined.");
}
public static Storage.TYPE getStorageType(Feed feed,
Cluster cluster) throws FalconException {
final List<Location> locations = getLocations(cluster, feed);
if (locations != null) {
return Storage.TYPE.FILESYSTEM;
}
final CatalogTable table = getTable(cluster, feed);
if (table != null) {
return Storage.TYPE.TABLE;
}
throw new FalconException("Both catalog and locations are not defined.");
}
public static Storage.TYPE getStorageType(Feed feed,
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
throws FalconException {
Cluster feedCluster = getCluster(feed, clusterEntity.getName());
return getStorageType(feed, feedCluster);
}
public static List<Location> getLocations(Cluster cluster, Feed feed) {
// check if locations are overridden in cluster
final Locations clusterLocations = cluster.getLocations();
if (clusterLocations != null
&& clusterLocations.getLocations().size() != 0) {
return clusterLocations.getLocations();
}
Locations feedLocations = feed.getLocations();
return feedLocations == null ? null : feedLocations.getLocations();
}
public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster,
LocationType type) {
List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed);
if (locations != null) {
for (Location location : locations) {
if (location.getType() == type) {
return location;
}
}
}
return null;
}
public static Sla getSLA(Cluster cluster, Feed feed) {
final Sla clusterSla = cluster.getSla();
if (clusterSla != null) {
return clusterSla;
}
final Sla feedSla = feed.getSla();
return feedSla == null ? null : feedSla;
}
public static Sla getSLA(String clusterName, Feed feed) {
Cluster cluster = FeedHelper.getCluster(feed, clusterName);
return cluster != null ? getSLA(cluster, feed) : null;
}
protected static CatalogTable getTable(Cluster cluster, Feed feed) {
// check if table is overridden in cluster
if (cluster.getTable() != null) {
return cluster.getTable();
}
return feed.getTable();
}
public static String normalizePartitionExpression(String part1, String part2) {
String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2);
partExp = partExp.replaceAll("//+", "/");
partExp = StringUtils.stripStart(partExp, "/");
partExp = StringUtils.stripEnd(partExp, "/");
return partExp;
}
public static String normalizePartitionExpression(String partition) {
return normalizePartitionExpression(partition, null);
}
public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
Properties properties = new Properties();
Map<String, String> clusterVars = new HashMap<>();
clusterVars.put("colo", cluster.getColo());
clusterVars.put("name", cluster.getName());
if (cluster.getProperties() != null) {
for (Property property : cluster.getProperties().getProperties()) {
clusterVars.put(property.getName(), property.getValue());
}
}
properties.put("cluster", clusterVars);
return properties;
}
public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp)
throws FalconException {
Properties properties = getClusterProperties(clusterEntity);
ExpressionHelper expHelp = ExpressionHelper.get();
expHelp.setPropertiesForVariable(properties);
return expHelp.evaluateFullExpression(exp, String.class);
}
public static String getStagingPath(boolean isSource,
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
Feed feed, CatalogStorage storage, Tag tag, String suffix) {
String stagingDirPath = getStagingDir(isSource, clusterEntity, feed, storage, tag);
String datedPartitionKey = storage.getDatedPartitionKeys().get(0);
String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output',"
+ "'" + datedPartitionKey + "')}";
return stagingDirPath + "/"
+ datedPartitionKeySuffix + "/"
+ suffix + "/"
+ "data";
}
public static String getStagingDir(boolean isSource,
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
Feed feed, CatalogStorage storage, Tag tag) {
String workflowName = EntityUtil.getWorkflowName(
tag, Arrays.asList(clusterEntity.getName()), feed).toString();
// log path is created at scheduling wf
final String storageUri = isSource
? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) // read interface
: ClusterHelper.getStorageUrl(clusterEntity); // write interface
return storageUri
+ EntityUtil.getLogPath(clusterEntity, feed) + "/"
+ workflowName + "/"
+ storage.getDatabase() + "/"
+ storage.getTable();
}
public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) {
Properties props = new Properties();
props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy");
props.put("userWorkflowEngine", "falcon");
String version;
try {
version = BuildProperties.get().getProperty("build.version");
} catch (Exception e) { // unfortunate that this is only available in prism/webapp
version = "0.6";
}
props.put("userWorkflowVersion", version);
return props;
}
public static Properties getFeedProperties(Feed feed) {
Properties feedProperties = new Properties();
if (feed.getProperties() != null) {
for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
feedProperties.put(property.getName(), property.getValue());
}
}
return feedProperties;
}
public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException {
Cluster cluster = getCluster(feed, clusterName);
if (cluster !=null) {
return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle();
}
throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
}
public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException {
if (isLifecycleEnabled(feed, clusterName)) {
Lifecycle globalLifecycle = feed.getLifecycle();
Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();
if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
return clusterLifecycle.getRetentionStage();
} else if (globalLifecycle != null) {
return globalLifecycle.getRetentionStage();
}
}
return null;
}
/**
* Returns various policies applicable for a feed.
*
* @param feed
* @return list of names of lifecycle policies for the given feed, empty list if there are none.
*/
public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException {
List<String> result = new ArrayList<>();
Cluster cluster = getCluster(feed, clusterName);
if (cluster != null) {
if (isLifecycleEnabled(feed, clusterName)) {
String policy = getRetentionStage(feed, clusterName).getPolicy();
policy = StringUtils.isBlank(policy)
? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
result.add(policy);
}
return result;
}
throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
}
/**
* Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
* @param instancePath - actual data path
* @param templatePath - template path from feed definition
* @param timeZone timeZone
* @return date corresponding to the path
*/
//consider just the first occurrence of the pattern
public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) {
String path = instancePath.toString();
Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath);
Calendar cal = Calendar.getInstance(timeZone);
int lastEnd = 0;
Set<FeedDataPath.VARS> matchedVars = new HashSet<>();
while (matcher.find()) {
FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
String pad = templatePath.substring(lastEnd, matcher.start());
if (!path.startsWith(pad)) {
//Template and path do not match
return null;
}
int value;
try {
value = Integer.valueOf(path.substring(pad.length(), pad.length() + pathVar.getValueSize()));
} catch (NumberFormatException e) {
//Not a valid number for variable
return null;
}
pathVar.setCalendar(cal, value);
lastEnd = matcher.end();
path = path.substring(pad.length() + pathVar.getValueSize());
matchedVars.add(pathVar);
}
String remTemplatePath = templatePath.substring(lastEnd);
//Match the remaining constant at the end
//Handling case where feed instancePath has partitions
if (StringUtils.isNotEmpty(path) && StringUtils.isNotEmpty(remTemplatePath)
&& !path.contains(remTemplatePath)) {
return null;
}
//Reset other fields
for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
if (!matchedVars.contains(var)) {
cal.set(var.getCalendarField(), 0);
}
}
return cal.getTime();
}
public static Path getFeedBasePath(String feedPath) throws IOException {
Matcher matcher = FeedDataPath.PATTERN.matcher(feedPath);
if (matcher.find()) {
return new Path(feedPath.substring(0, matcher.start()));
} else {
throw new IOException("Unable to resolve pattern for feedPath: " + feedPath);
}
}
private static void validateFeedInstance(Feed feed, Date instanceTime,
org.apache.falcon.entity.v0.cluster.Cluster cluster) {
// validate the cluster
Cluster feedCluster = getCluster(feed, cluster.getName());
if (feedCluster == null) {
throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:"
+ feed.getName());
}
// validate that instanceTime is in validity range
if (feedCluster.getValidity().getStart().after(instanceTime)
|| !feedCluster.getValidity().getEnd().after(instanceTime)) {
throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for"
+ " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
}
// validate instanceTime on basis of startTime and frequency
Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(),
feed.getTimezone(), instanceTime);
if (!nextInstance.equals(instanceTime)) {
throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the "
+ " feed: " + feed.getName() + " on cluster: " + cluster.getName()
+ " on the basis of startDate and frequency");
}
}
/**
* Given a feed Instance finds the generating process instance.
*
* [process, cluster, instanceTime]
*
* If the feed is replicated, then it returns null.
*
* @param feed output feed
* @param feedInstanceTime instance time of the feed
* @return returns the instance of the process which produces the given feed
*/
public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime,
org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
//validate the inputs
validateFeedInstance(feed, feedInstanceTime, cluster);
Process process = getProducerProcess(feed);
if (process != null) {
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process,
cluster.getName());
Date pStart = processCluster.getValidity().getStart();
Date pEnd = processCluster.getValidity().getEnd();
Frequency pFrequency = process.getFrequency();
TimeZone pTz = process.getTimezone();
try {
Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime);
if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid){
return null;
}
SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
processInstanceTime, EntityType.PROCESS);
producer.setTags(SchedulableEntityInstance.OUTPUT);
return producer;
} catch (FalconException | IllegalArgumentException e) {
LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
+ " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName());
}
}
return null;
}
/**
* Given a feed find it's generating process.
*
* If no generating process is found it returns null.
* @param feed output feed
* @return Process which produces the given feed.
*/
public static Process getProducerProcess(Feed feed) throws FalconException {
EntityList dependencies = EntityUtil.getEntityDependencies(feed);
for (EntityList.EntityElement e : dependencies.getElements()) {
if (e.tag.contains(EntityList.OUTPUT_TAG)) {
return EntityUtil.getEntity(EntityType.PROCESS, e.name);
}
}
return null;
}
/**
* Find the producerInstance which will generate the given feedInstance.
*
* @param feed output feed
* @param feedInstanceTime instance time of the output feed
* @param producer producer process
* @return time of the producer instance which will produce the given feed instance.
*/
private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer,
org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
String clusterName = cluster.getName();
Cluster feedCluster = getCluster(feed, clusterName);
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName);
Date producerStartDate = processCluster.getValidity().getStart();
// read the process definition and find the relative time difference between process and output feed
// if output process instance time is now then output FeedInstance time is x
String outputInstance = null;
for (Output op : producer.getOutputs().getOutputs()) {
if (StringUtils.equals(feed.getName(), op.getFeed())) {
outputInstance = op.getInstance();
}
}
ExpressionHelper.setReferenceDate(producerStartDate);
ExpressionHelper evaluator = ExpressionHelper.get();
// producerInstance = feedInstanceTime + (difference between producer process and feed)
// the feedInstance before or equal to this time is the required one
Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class);
Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(),
feed.getFrequency(), feed.getTimezone(), relativeFeedInstance);
Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime()
- feedInstanceActual.getTime());
Date producerInstance = new Date(producerInstanceTime);
//validate that the producerInstance is in the validity range on the provided cluster
if (producerInstance.before(processCluster.getValidity().getStart())
|| producerInstance.after(processCluster.getValidity().getEnd())) {
throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime
+ " for feed " + feed.getName()
+ " is outside the range of instances produced by the producer process: " + producer.getName()
+ " in it's validity range on provided cluster: " + cluster.getName());
}
return producerInstance;
}
public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime,
org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
// validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime
validateFeedInstance(feed, feedInstanceTime, cluster);
Set<Process> consumers = getConsumerProcesses(feed);
for (Process p : consumers) {
Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster);
for (Date date : consumerInstanceTimes) {
SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date,
EntityType.PROCESS);
in.setTags(SchedulableEntityInstance.INPUT);
result.add(in);
}
}
return result;
}
/**
* Returns the consumer processes for a given feed if any, null otherwise.
*
* @param feed input feed
* @return the set of processes which use the given feed as input, empty set if no consumers.
*/
public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException {
Set<Process> result = new HashSet<>();
EntityList dependencies = EntityUtil.getEntityDependencies(feed);
for (EntityList.EntityElement e : dependencies.getElements()) {
if (e.tag.contains(EntityList.INPUT_TAG)) {
Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name);
result.add(consumer);
}
}
return result;
}
// return all instances of a process which will consume the given feed instance
private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer,
org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
Set<Date> result = new HashSet<>();
// find relevant cluster for the process
org.apache.falcon.entity.v0.process.Cluster processCluster =
ProcessHelper.getCluster(consumer, cluster.getName());
if (processCluster == null) {
throw new IllegalArgumentException("Cluster is not valid for process");
}
Date processStartDate = processCluster.getValidity().getStart();
Cluster feedCluster = getCluster(feed, cluster.getName());
Date feedStartDate = feedCluster.getValidity().getStart();
// find all corresponding Inputs as a process may refer same feed multiple times
List<Input> inputFeeds = new ArrayList<>();
if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) {
for (Input input : consumer.getInputs().getInputs()) {
if (StringUtils.equals(input.getFeed(), feed.getName())) {
inputFeeds.add(input);
}
}
}
// for each input corresponding to given feed, find corresponding consumer instances
for (Input in : inputFeeds) {
/* Algorithm for finding a consumer instance for an input feed instance
Step 1. Find one instance which will consume the given feed instance.
a. take process start date and find last input feed instance time. In this step take care of
frequencies being out of sync.
b. using the above find the time difference between the process instance and feed instance.
c. Adding the above time difference to given feed instance for which we want to find the consumer
instances we will get one consumer process instance.
Step 2. Keep checking for next instances of process till they consume the given feed Instance.
Step 3. Similarly check for all previous instances of process till they consume the given feed instance.
*/
// Step 1.a & 1.b
ExpressionHelper.setReferenceDate(processStartDate);
ExpressionHelper evaluator = ExpressionHelper.get();
Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate,
feed.getFrequency(), feed.getTimezone(), startRelative);
Long offset = processStartDate.getTime() - startTimeActual.getTime();
// Step 1.c
Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset);
Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate,
consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative);
// Step 2.
Date currentInstance = processInstanceStartActual;
while (true) {
Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate,
consumer.getFrequency(), consumer.getTimezone(), currentInstance);
ExpressionHelper.setReferenceDate(nextConsumerInstance);
evaluator = ExpressionHelper.get();
Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
feed.getTimezone(), inputStart).getTime();
Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
&& nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
result.add(nextConsumerInstance);
}
} else {
break;
}
currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS);
}
// Step 3.
currentInstance = processInstanceStartActual;
while (true) {
Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate,
consumer.getFrequency(), consumer.getTimezone(), currentInstance);
ExpressionHelper.setReferenceDate(nextConsumerInstance);
evaluator = ExpressionHelper.get();
Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
feed.getTimezone(), inputStart).getTime();
Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
&& nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
result.add(nextConsumerInstance);
}
} else {
break;
}
currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS);
}
}
return result;
}
public static FeedInstanceResult getFeedInstanceListing(Entity entityObject,
Date start, Date end) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
for (String cluster : clusters) {
Feed feed = (Feed) entityObject;
Storage storage = createStorage(cluster, feed);
List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
int index = 0;
for (FeedInstanceStatus feedStatus : feedListing) {
FeedInstanceResult.Instance instance = new
FeedInstanceResult.Instance(cluster, feedStatus.getInstance(),
feedStatus.getStatus().name());
instance.creationTime = feedStatus.getCreationTime();
instance.uri = feedStatus.getUri();
instance.size = feedStatus.getSize();
instance.sizeH = feedStatus.getSizeH();
instances[index++] = instance;
}
result.setInstances(instances);
}
return result;
}
public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName,
Date instanceTime)
throws FalconException {
Storage storage = createStorage(clusterName, feed);
return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime);
}
public static boolean isLifecycleEnabled(Feed feed, String clusterName) {
Cluster cluster = getCluster(feed, clusterName);
return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null);
}
public static Frequency getRetentionFrequency(Feed feed, String clusterName) throws FalconException {
Frequency retentionFrequency;
RetentionStage retentionStage = getRetentionStage(feed, clusterName);
if (retentionStage != null && retentionStage.getFrequency() != null) {
retentionFrequency = retentionStage.getFrequency();
} else {
Frequency feedFrequency = feed.getFrequency();
Frequency defaultFrequency = new Frequency("hours(6)");
if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) {
retentionFrequency = defaultFrequency;
} else {
retentionFrequency = new Frequency(feedFrequency.toString());
}
}
return retentionFrequency;
}
}