blob: 279cab847acce2002e2629e26ee94fee68544b40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.service;
import com.google.common.annotations.VisibleForTesting;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FeedInstanceStatus;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.entity.v0.process.Clusters;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
import org.apache.falcon.persistence.MonitoredEntityBean;
import org.apache.falcon.persistence.PendingInstanceBean;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.falcon.entity.EntityUtil.getStartTime;
import static org.apache.falcon.util.DateUtil.now;
/**
* Service to monitor Feed SLAs.
*/
public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService {
private static final Logger LOG = LoggerFactory.getLogger(EntitySLAMonitoringService.class);
private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
private static final int ONE_MS = 1;
private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();
static final String TAG_CRITICAL = "Missed-SLA-High";
static final String TAG_WARN = "Missed-SLA-Low";
private static final long MINUTE_DELAY = 60000L;
private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
private EntitySLAMonitoringService() {
}
public static EntitySLAMonitoringService get() {
return SERVICE;
}
/**
* Permissions for storePath.
*/
private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
/**
* Frequency in seconds of "status check" for pending entity instances.
*/
private int statusCheckFrequencySeconds; // 10 minutes
/**
* Time Duration (in milliseconds) in future for generating pending entity instances.
*
* In every cycle pending entity instances are added for monitoring, till this time in future.
*/
private int lookAheadWindowMillis; // 15 MINUTES
/**
* Filesystem used for serializing and deserializing.
*/
private FileSystem fileSystem;
/**
* Working directory for the entity sla monitoring service.
*/
private Path storePath;
/**
* Path to store the state of the monitoring service.
*/
private Path filePath;
@Override
public void onAdd(Entity entity) throws FalconException {
startEntityMonitoring(entity, false);
}
private void startEntityMonitoring(Entity entity, boolean isEntityUpdated) throws FalconException{
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
Set<String> clustersDefined = EntityUtil.getClustersDefined(entity);
if (entity.getEntityType() == EntityType.FEED) {
Feed feed = (Feed) entity;
// currently sla service for feed is enabled only for fileSystemStorage
if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) {
for (String cluster : clustersDefined) {
if (currentClusters.contains(cluster)) {
if (FeedHelper.getSLA(cluster, feed) != null) {
LOG.debug("Adding feed:{} for monitoring", feed.getName());
if (isEntityUpdated) {
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(),
EntityType.FEED.toString(), now());
} else {
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(),
EntityType.FEED.toString(), getStartTime(entity, cluster));
}
}
}
}
}
} else if (entity.getEntityType() == EntityType.PROCESS) {
Process process = (Process) entity;
if (process.getSla() != null || checkProcessClusterSLA(process)) {
for (String cluster : clustersDefined) {
if (currentClusters.contains(cluster)) {
LOG.debug("Adding process:{} for monitoring", process.getName());
if (isEntityUpdated) {
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
EntityType.PROCESS.toString(), now());
} else {
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
EntityType.PROCESS.toString(), getStartTime(entity, cluster));
}
}
}
}
}
}
private Boolean checkFeedClusterSLA(Feed feed){
for(Cluster cluster : feed.getClusters().getClusters()){
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null){
return true;
}
}
return false;
}
private Boolean checkProcessClusterSLA(Process process){
Clusters clusters = process.getClusters();
for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
if (sla != null){
return true;
}
}
return false;
}
@Override
public void onRemove(Entity entity) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
if (entity.getEntityType() == EntityType.FEED) {
Feed feed = (Feed) entity;
// currently sla service is enabled only for fileSystemStorage
if (feed.getSla() != null && feed.getLocations() != null) {
for (Cluster cluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
LOG.debug("Removing feed:{} for monitoring", feed.getName());
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(),
EntityType.FEED.toString());
LOG.debug("Removing feed:{} for monitoring", feed.getName());
}
}
}
}
if (entity.getEntityType() == EntityType.PROCESS){
Process process = (Process) entity;
if (process.getSla() != null){
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName())) {
LOG.debug("Removing feed:{} for monitoring", process.getName());
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
EntityType.PROCESS.toString());
MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(),
EntityType.PROCESS.toString());
}
}
}
}
}
private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
if (feed.getLocations() != null) {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
for (Cluster cluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
return true;
}
}
}
return false;
}
private boolean isSLAMonitoringEnabledInCurrentColo(Process process) {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName()) && ProcessHelper.getSLA(cluster, process) != null) {
return true;
}
}
return false;
}
@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
if (newEntity.getEntityType() == EntityType.FEED) {
Feed oldFeed = (Feed) oldEntity;
Feed newFeed = (Feed) newEntity;
if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
onRemove(oldFeed);
} else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
onAdd(newFeed);
} else {
List<String> slaRemovedClusters = new ArrayList<>();
for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) {
if (FeedHelper.getSLA(oldCluster, oldFeed) != null
&& FeedHelper.getSLA(oldCluster, newFeed) == null) {
slaRemovedClusters.add(oldCluster);
}
}
updatePendingInstances(newFeed.getName(), slaRemovedClusters, EntityType.FEED.toString());
}
}
if (newEntity.getEntityType() == EntityType.PROCESS) {
Process oldProcess = (Process) oldEntity;
Process newProcess = (Process) newEntity;
if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
onRemove(oldProcess);
} else if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
onAdd(newProcess);
} else {
List<String> slaRemovedClusters = new ArrayList<>();
for (String oldCluster : EntityUtil.getClustersDefined(oldProcess)){
if (ProcessHelper.getSLA(oldCluster, oldProcess) != null
&& ProcessHelper.getSLA(oldCluster, newProcess) == null){
slaRemovedClusters.add(oldCluster);
}
}
updatePendingInstances(newProcess.getName(), slaRemovedClusters, EntityType.PROCESS.toString());
}
}
}
private void updatePendingInstances(String entityName, List<String> slaRemovedClusters, String entityType){
for(String clusterName :slaRemovedClusters){
MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
entityType);
}
}
@Override
public void onReload(Entity entity) throws FalconException {
}
@Override
public String getName() {
return EntitySLAMonitoringService.class.getSimpleName();
}
@Override
public void init() throws FalconException {
String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600");
statusCheckFrequencySeconds = Integer.parseInt(freq);
freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000");
lookAheadWindowMillis = Integer.parseInt(freq);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
addPendingEntityInstances(now());
executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
}
public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
throws FalconException {
LOG.debug("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
clusterName, nominalTime);
List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName,
EntityType.FEED.toString()));
// Slas for feeds not having sla tag are not stored.
if (CollectionUtils.isEmpty(instances)){
MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime,
EntityType.FEED.toString());
}
}
private FileSystem initializeFileSystem() {
try {
fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
if (!fileSystem.exists(storePath)) {
LOG.info("Creating directory for pending entity instances: {}", storePath);
// set permissions so config store dir is owned by falcon alone
HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
}
return fileSystem;
} catch (Exception e) {
throw new RuntimeException("Unable to bring up entity sla store for path: " + storePath, e);
}
}
@Override
public void destroy() throws FalconException {
}
//Periodically update status of pending instances, add new instances and take backup.
private class Monitor implements Runnable {
@Override
public void run() {
try {
if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities().size() > 0) {
checkPendingInstanceAvailability();
// add Instances from last checked time to 10 minutes from now(some buffer for status check)
Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis);
addPendingEntityInstances(newCheckPointTime);
} else {
LOG.debug("No entities present for sla monitoring.");
}
} catch (Throwable e) {
LOG.error("Feed SLA monitoring failed: ", e);
}
}
}
private void addPendingInstances(String entityType, Entity entity,
String clusterName,
List<Date> instances) throws FalconException {
if (instances != null && !instances.isEmpty()) {
for (Date date : instances) {
LOG.debug("Adding pending instance ={} for entity= {} in cluster>={} and entityType={}", date,
entity.getName(), clusterName, entityType);
MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), clusterName, date,
entityType);
}
}
}
private void addPendingEntityInstances(Date checkPointTime) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
String entityName = monitoredEntityBean.getEntityName();
String entityType = monitoredEntityBean.getEntityType();
if (EntityType.FEED.name().equalsIgnoreCase(entityType)
|| isEntityRunning(EntityUtil.getEntity(entityType, entityName))) {
Date lastMonitoredInstanceTime = monitoredEntityBean.getLastMonitoredTime();
Date newCheckPointTime = checkPointTime;
Entity entity = EntityUtil.getEntity(entityType, entityName);
Set<String> clustersDefined = EntityUtil.getClustersDefined(entity);
List<org.apache.falcon.entity.v0.cluster.Cluster> clusters = new ArrayList();
for (String cluster : clustersDefined) {
clusters.add(ClusterHelper.getCluster(cluster));
}
for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : clusters) {
if (currentClusters.contains(entityCluster.getName())) {
Date endTime = EntityUtil.getEndTime(entity, entityCluster.getName());
if (endTime.before(now())) {
newCheckPointTime = endTime;
}
List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity,
entityCluster.getName(), lastMonitoredInstanceTime, newCheckPointTime);
addPendingInstances(entityType, entity, entityCluster.getName(), instances);
// update last monitored time with the new checkpoint time
MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType,
new Date(newCheckPointTime.getTime() + MINUTE_DELAY));
}
}
}
}
}
/**
* Checks the availability of all the pendingInstances and removes the ones which have become available.
*/
private void checkPendingInstanceAvailability() throws FalconException {
List<PendingInstanceBean> pendingInstanceBeans = MONITORING_JDBC_STATE_STORE.getAllPendingInstances();
if (pendingInstanceBeans.isEmpty()){
LOG.info("No pending instances to be checked");
return;
}
for(PendingInstanceBean pendingInstanceBean : pendingInstanceBeans){
boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
pendingInstanceBean.getClusterName(), pendingInstanceBean.getNominalTime(),
pendingInstanceBean.getEntityType());
if (status) {
MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
pendingInstanceBean.getClusterName(), pendingInstanceBean.getNominalTime(),
pendingInstanceBean.getEntityType());
}
}
}
// checks whether a given entity instance is available or not
private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime,
String entityType) throws FalconException {
Entity entity = EntityUtil.getEntity(entityType, entityName);
authenticateUser(entity);
try {
if (entity.getEntityType() == EntityType.PROCESS){
LOG.trace("Checking instance availability status for entity:{}, cluster:{}, "
+ "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType);
AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime,
new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
if (instancesResult.getInstances().length > 0) {
if (instancesResult.getInstances()[0].status.equals(InstancesResult.WorkflowStatus.SUCCEEDED)){
LOG.trace("Entity instance(Process:{}, cluster:{}, instanceTime:{}) is available.",
entity.getName(), clusterName, nominalTime);
return true;
}
} else if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt(
RuntimeProperties.get().getProperty("workflow.history.expiration.period.days", "7"))) {
return true;
}
return false;
}
if (entity.getEntityType() == EntityType.FEED){
LOG.trace("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}",
entity.getName(), clusterName, nominalTime);
FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus((Feed) entity,
clusterName, nominalTime);
if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
|| status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
LOG.trace("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
clusterName, nominalTime);
return true;
}
}
} catch (Throwable e) {
LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}, nominalTime{}", entityName,
clusterName, entityType, nominalTime, e);
}
LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(),
clusterName, nominalTime);
return false;
}
/**
* Returns all the instances between given time range which have missed slaLow or slaHigh for given entity.
*
* Only entities which have defined sla in their definition are considered.
* Only the entity instances between the given time range are considered.
* Start time and end time are both inclusive.
* @param start start time, inclusive
* @param end end time, inclusive
* @return Set of pending entity instances belonging to the given range which have missed SLA
* @throws FalconException
*/
public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end)
throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
Pair<String, String> entityClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
pendingInstanceBean.getClusterName());
String entityType = pendingInstanceBean.getEntityType();
if (entityType.equalsIgnoreCase(EntityType.FEED.toString())){
Feed feed = EntityUtil.getEntity(entityType, entityClusterPair.first);
Cluster cluster = FeedHelper.getCluster(feed, entityClusterPair.second);
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null) {
Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end,
MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first,
entityClusterPair.second, entityType));
for (Pair<Date, String> status : slaStatus) {
SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first,
entityClusterPair.second, status.first, EntityType.FEED);
instance.setTags(status.second);
result.add(instance);
}
}
} else {
Process process = EntityUtil.getEntity(entityType, entityClusterPair.first);
org.apache.falcon.entity.v0.process.Cluster cluster = ProcessHelper.getCluster(process,
entityClusterPair.second);
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
if (sla != null && isEntityRunning(process)){
Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end,
MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first,
entityClusterPair.second, entityType));
for (Pair<Date, String> status : slaStatus) {
SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first,
entityClusterPair.second, status.first, EntityType.PROCESS);
instance.setTags(status.second);
result.add(instance);
}
}
}
}
return result;
}
/**
* Returns all the instances of a given entity between the given time range
* which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
* @param entityName name of the feed
* @param clusterName cluster name
* @param start start time, inclusive
* @param end end time, inclusive
* @return Pending instances of the given entity which belong to the given time range and have missed SLA.
* @throws FalconException
*/
public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
Date start, Date end, String entityType) throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstancesBetweenTimeRange(entityName,
clusterName, entityType, start, end);
if (missingInstances == null){
return result;
}
Entity entity = EntityUtil.getEntity(entityType, entityName);
if (entity.getEntityType() == EntityType.FEED) {
Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
if (sla != null) {
Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, missingInstances);
for (Pair<Date, String> status : slaStatus){
SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
status.first, EntityType.FEED);
instance.setTags(status.second);
result.add(instance);
}
}
return result;
} else {
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process) entity);
if (sla != null){
Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end, missingInstances);
for (Pair<Date, String> status : slaStatus){
SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
status.first, EntityType.PROCESS);
instance.setTags(status.second);
result.add(instance);
}
}
}
return result;
}
Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
throws FalconException {
Date now = new Date();
Frequency slaLow = sla.getSlaLow();
Frequency slaHigh = sla.getSlaHigh();
Set<Pair<Date, String>> result = new HashSet<>();
for (Date nominalTime : missingInstances) {
if (!nominalTime.before(start) && !nominalTime.after(end)) {
ExpressionHelper.setReferenceDate(nominalTime);
ExpressionHelper evaluator = ExpressionHelper.get();
Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
if (slaCriticalTime.before(now)) {
result.add(new Pair<>(nominalTime, TAG_CRITICAL));
} else if (slaWarnTime.before(now)) {
result.add(new Pair<>(nominalTime, TAG_WARN));
}
}
}
return result;
}
private Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
Date end, List<Date> missingInstances) throws FalconException {
Date now = new Date();
Frequency slaHigh = sla.getShouldEndIn();
Set<Pair<Date, String>> result = new HashSet<>();
for (Date nominalTime : missingInstances) {
if (!nominalTime.before(start) && !nominalTime.after(end)) {
ExpressionHelper.setReferenceDate(nominalTime);
ExpressionHelper evaluator = ExpressionHelper.get();
Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
if (slaCriticalTime.before(now)) {
result.add(new Pair<>(nominalTime, TAG_CRITICAL));
}
}
}
return result;
}
@VisibleForTesting
Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws FalconException {
if (entity.getEntityType() == EntityType.FEED){
Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
if (sla == null) {
throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+ entity.getName() + " and cluster: " + clusterName + " does not have any sla");
}
Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName);
Frequency slaLow = sla.getSlaLow();
Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
return startTime.before(slaTime) ? startTime : slaTime;
} else{
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity);
if (sla == null) {
throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+ entity.getName() + " and cluster: " + clusterName + " does not have any sla");
}
Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName);
Frequency slaLow = sla.getShouldEndIn();
Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
return startTime.before(slaTime) ? startTime : slaTime;
}
}
public void makeProcessInstanceAvailable(String clusterName, String entityName, String date, String entityType) {
Date nominalTime = null;
try {
nominalTime = DateUtil.parseDateFalconTZ(date);
}catch (ParseException e){
LOG.error("Exception while translating the date:", e);
}
if (nominalTime!= null){
List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
entityType));
if (!CollectionUtils.isEmpty(instances)){
MONITORING_JDBC_STATE_STORE.deletePendingInstance(entityName, clusterName, nominalTime,
entityType);
}
}
}
// Authenticate user only if not already authenticated.
private void authenticateUser(Entity entity){
if (!CurrentUser.isAuthenticated()) {
if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
CurrentUser.authenticate(entity.getACL().getOwner());
} else {
CurrentUser.authenticate(System.getProperty("user.name"));
}
}
}
private boolean isEntityRunning(Entity entity) throws FalconException {
authenticateUser(entity);
AbstractWorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
return workflowEngine.isActive(entity) && !workflowEngine.isSuspended(entity)
&& !workflowEngine.isCompleted(entity);
}
}