blob: c5b3ecc8302911cd927a7fe34080aa5a73729421 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.falcon.resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.lock.MemoryLocks;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.UnschedulableEntityException;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.service.EntitySLAMonitoringService;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
* REST resource of allowed actions on Schedulable Entities, Only Process and
* Feed can have schedulable actions.
public abstract class AbstractSchedulableEntityManager extends AbstractInstanceManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulableEntityManager.class);
private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
* Schedules a submitted entity immediately.
* @param type entity type
* @param entity entity name
* @param properties Specifying 'falcon.scheduler:native' as a property will schedule the entity on the
* native workflow engine, else it will default to the workflow engine
* as defined in
* @return APIResult
public APIResult schedule(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entity,
@Dimension("colo") @PathParam("colo") String colo,
@QueryParam("skipDryRun") Boolean skipDryRun,
@QueryParam("properties") String properties) {
try {
scheduleInternal(type, entity, skipDryRun, EntityUtil.getPropertyMap(properties));
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled successfully");
} catch (Throwable e) {
LOG.error("Unable to schedule workflow", e);
throw FalconWebException.newAPIException(e);
protected synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
Map<String, String> properties) throws FalconException, AuthorizationException {
Entity entityObj = null;
try {
entityObj = EntityUtil.getEntity(type, entity);
verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SCHEDULE);
//first acquire lock on entity before scheduling
if (!memoryLocks.acquireLock(entityObj, "schedule")) {
throw FalconWebException.newAPIException("Looks like an schedule/update command is already"
+ " running for " + entityObj.toShortString());
}"Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
WorkflowEngineFactory.getWorkflowEngine(entityObj, properties).schedule(entityObj, skipDryRun, properties);
} catch (Throwable e) {
LOG.error("Entity schedule failed for " + type + ": " + entity, e);
throw FalconWebException.newAPIException(e);
} finally {
if (entityObj != null) {
memoryLocks.releaseLock(entityObj);"Memory lock released for {}", entityObj.toShortString());
* Validates the parameters whether SLA is supported or not.
* @param entityType currently two entityTypes are supported Process and Feed
* @param entityName name of the entity
* @param start startDate from which SLA is to be looked at
* @param end endDate upto which SLA is to be looked at.
* @param colo colo in which entity is to be looked into
* @throws FalconException if the validation fails
* **/
public static void validateSlaParams(String entityType, String entityName, String start, String end,
String colo) throws FalconException {
EntityType type = EntityType.getEnum(entityType);
if (!type.isSchedulable()){
throw new ValidationException("SLA monitoring is not supported for: " + type);
// validate valid entity name.
if (StringUtils.isNotBlank(entityName)) {
EntityUtil.getEntity(entityType, entityName);
Date startTime, endTime;
// validate mandatory start date
if (StringUtils.isBlank(start)) {
throw new ValidationException("'start' is mandatory and can not be blank.");
} else {
startTime = SchemaHelper.parseDateUTC(start);
// validate optional end date
if (StringUtils.isBlank(end)) {
endTime = new Date();
} else {
endTime = SchemaHelper.parseDateUTC(end);
if (startTime.after(endTime)) {
throw new ValidationException("start can not be after end");
* Returns the entity instances which are not yet available and have missed either slaLow or slaHigh.
* This api doesn't return the entitites which missed SLA but are now available. Purpose of this api is to
* show entity instances which you need to attend to.
* @param startStr startTime in
* @param endStr
public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(String entityName, String entityType,
String startStr, String endStr,
String colo) {
Set<SchedulableEntityInstance> instances = new HashSet<>();
String resultMessage = "Success!";
try {
Date start = EntityUtil.parseDateUTC(startStr);
Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
if (StringUtils.isBlank(entityName)) {
instances = EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end);
} else {
String status = getStatusString(EntityUtil.getEntity(entityType, entityName));
if (status.equals( {
for (String clusterName : DeploymentUtil.getCurrentClusters()) {
clusterName, start, end, entityType));
} else {
resultMessage = entityName + " is " + status;
} catch (FalconException e) {
throw FalconWebException.newAPIException(e);
SchedulableEntityInstanceResult result = new SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED,
return result;
* Submits a new entity and schedules it immediately.
* @param type entity type
* @param properties Specifying 'falcon.scheduler:native' as a property will schedule the entity on the
* native workflow engine, else it will default to the workflow engine
* as defined in
* @return APIResult
public APIResult submitAndSchedule(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("colo") @PathParam("colo") String colo,
@QueryParam("skipDryRun") Boolean skipDryRun,
@QueryParam("properties") String properties) {
try {
Entity entity = submitInternal(request.getInputStream(), type, request.getParameter(DO_AS_PARAM));
scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties));
return new APIResult(APIResult.Status.SUCCEEDED,
entity.getName() + "(" + type + ") scheduled successfully");
} catch (Throwable e) {
LOG.error("Unable to submit and schedule ", e);
throw FalconWebException.newAPIException(e);
* Suspends a running entity.
* @param type entity type
* @param entity entity name
* @return APIResult
public APIResult suspend(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entity,
@Dimension("entityName") @PathParam("entity") String colo) {
try {
Entity entityObj = EntityUtil.getEntity(type, entity);
verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SUSPEND);
if (getWorkflowEngine(entityObj).isActive(entityObj)) {
} else {
throw FalconWebException.newAPIException("Status of " + entity + "(" + type + ") is "
+ getStatusString(entityObj) + ". Only scheduled entities can be suspended.");
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") suspended successfully");
} catch (Throwable e) {
LOG.error("Unable to suspend entity", e);
throw FalconWebException.newAPIException(e);
* Resumes a suspended entity.
* @param type entity type
* @param entity entity name
* @return APIResult
public APIResult resume(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entity,
@Dimension("colo") @PathParam("colo") String colo) {
try {
Entity entityObj = EntityUtil.getEntity(type, entity);
verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.RESUME);
if (getWorkflowEngine(entityObj).isActive(entityObj)) {
} else {
throw new IllegalStateException(entity + "(" + type + ") is not scheduled");
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") resumed successfully");
} catch (Exception e) {
LOG.error("Unable to resume entity", e);
throw FalconWebException.newAPIException(e);
* Returns summary of most recent N instances of an entity, filtered by cluster.
* @param type Only return entities of this type.
* @param startDate For each entity, show instances after startDate.
* @param endDate For each entity, show instances before endDate.
* @param cluster Return entities for specific cluster.
* @param fields fields that the query is interested in, separated by comma
* @param filterBy filter by a specific field.
* @param filterTags filter by these tags.
* @param orderBy order result by these fields.
* @param offset Pagination offset.
* @param resultsPerPage Number of results that should be returned starting at the offset.
* @param numInstances Number of instance summaries to show per entity
* @return EntitySummaryResult
public EntitySummaryResult getEntitySummary(String type, String cluster, String startDate, String endDate,
String fields, String filterBy, String filterTags,
String orderBy, String sortOrder, Integer offset,
Integer resultsPerPage, Integer numInstances, final String doAsUser) {
HashSet<String> fieldSet = new HashSet<String>(Arrays.asList(fields.toLowerCase().split(",")));
Pair<Date, Date> startAndEndDates = getStartEndDatesForSummary(startDate, endDate);
Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
if (StringUtils.isNotEmpty(filterTags)) {
filterByFieldsValues.put(, Arrays.asList(filterTags));
List<Entity> entities;
String colo;
try {
entities = sortEntitiesPagination(
getFilteredEntities(EntityType.valueOf(type.toUpperCase()), "", "", filterByFieldsValues,
cluster, doAsUser),
orderBy, sortOrder, offset, resultsPerPage);
colo = ((Cluster) configStore.get(EntityType.CLUSTER, cluster)).getColo();
} catch (FalconWebException e) {
throw e;
} catch(Exception e) {
LOG.error("Failed to get entities", e);
throw FalconWebException.newAPIException(e);
List<EntitySummaryResult.EntitySummary> entitySummaries = new ArrayList<EntitySummaryResult.EntitySummary>();
for (Entity entity : entities) {
InstancesResult instancesResult = getInstances(entity.getEntityType().name(), entity.getName(),
colo, null, "", "", "", 0, numInstances, null);
/* ToDo - Use oozie bulk API after FALCON-591 is implemented
* getBulkInstances(entity, cluster,
* startAndEndDates.first, startAndEndDates.second, colo, "starttime", 0, numInstances);
List<EntitySummaryResult.Instance> entitySummaryInstances =
List<String> pipelines = new ArrayList<String>();
List<String> tags = new ArrayList<String>();
if (fieldSet.contains("pipelines")) { pipelines = EntityUtil.getPipelines(entity); }
if (fieldSet.contains("tags")) { tags = EntityUtil.getTags(entity); }
EntitySummaryResult.EntitySummary entitySummary =
new EntitySummaryResult.EntitySummary(entity.getName(), entity.getEntityType().toString(),
tags.toArray(new String[tags.size()]),
pipelines.toArray(new String[pipelines.size()]),
new EntitySummaryResult.Instance[entitySummaryInstances.size()]));
return new EntitySummaryResult("Entity Summary Result",
entitySummaries.toArray(new EntitySummaryResult.EntitySummary[entitySummaries.size()]));
* Force updates an entity.
* @param type
* @param entityName
* @return APIResult
public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entityName,
@Dimension("colo") @QueryParam("colo") String colo,
@QueryParam("skipDryRun") Boolean skipDryRun) {
StringBuilder result = new StringBuilder();
try {
Entity entity = EntityUtil.getEntity(type, entityName);
verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.TOUCH);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
result.append(getWorkflowEngine(entity).touch(entity, cluster, skipDryRun));
} catch (Throwable e) {
LOG.error("Touch failed", e);
throw FalconWebException.newAPIException(e);
return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
private void validateTypeForEntitySummary(String type) {
EntityType entityType = EntityType.getEnum(type);
if (!entityType.isSchedulable()) {
throw FalconWebException.newAPIException("Invalid entity type " + type
+ " for EntitySummary API. Valid options are feed or process");
private Pair<Date, Date> getStartEndDatesForSummary(String startDate, String endDate) {
Date end = (StringUtils.isEmpty(endDate)) ? new Date() : SchemaHelper.parseDateUTC(endDate);
long startMillisecs = end.getTime() - (2* DAY_IN_MILLIS); // default - 2 days before end
Date start = (StringUtils.isEmpty(startDate))
? new Date(startMillisecs) : SchemaHelper.parseDateUTC(startDate);
return new Pair<Date, Date>(start, end);
private List<EntitySummaryResult.Instance> getElementsFromInstanceResult(InstancesResult instancesResult) {
ArrayList<EntitySummaryResult.Instance> elemInstanceList =
new ArrayList<EntitySummaryResult.Instance>();
InstancesResult.Instance[] instances = instancesResult.getInstances();
if (instances != null && instances.length > 0) {
for (InstancesResult.Instance rawInstance : instances) {
EntitySummaryResult.Instance instance = new EntitySummaryResult.Instance(rawInstance.getCluster(),
instance.logFile = rawInstance.getLogFile();
instance.sourceCluster = rawInstance.sourceCluster;
instance.startTime = rawInstance.startTime;
instance.endTime = rawInstance.endTime;
return elemInstanceList;
private void checkSchedulableEntity(String type) throws UnschedulableEntityException {
EntityType entityType = EntityType.getEnum(type);
if (!entityType.isSchedulable()) {
throw new UnschedulableEntityException(
"Entity type (" + type + ") " + " cannot be Scheduled/Suspended/Resumed");