blob: 84ae4468278b69d2c22ce85f325554ecc49a3811 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.resource;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.EntityParser;
import org.apache.falcon.entity.parser.EntityParserFactory;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.store.EntityAlreadyExistsException;
import org.apache.falcon.entity.v0.*;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.resource.APIResult.Status;
import org.apache.falcon.resource.EntityList.EntityElement;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
/**
* A base class for managing Entity operations.
*/
public abstract class AbstractEntityManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class);
private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
protected static final int XML_DEBUG_LEN = 10 * 1024;
private AbstractWorkflowEngine workflowEngine;
protected ConfigurationStore configStore = ConfigurationStore.get();
public AbstractEntityManager() {
try {
workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
} catch (FalconException e) {
throw new FalconRuntimException(e);
}
}
protected void checkColo(String colo) {
if (!DeploymentUtil.getCurrentColo().equals(colo)) {
throw FalconWebException.newException(
"Current colo (" + DeploymentUtil.getCurrentColo() + ") is not " + colo,
Response.Status.BAD_REQUEST);
}
}
protected Set<String> getAllColos() {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
String[] colos = RuntimeProperties.get().getProperty("all.colos", DeploymentUtil.getDefaultColo()).split(",");
return new HashSet<String>(Arrays.asList(colos));
}
protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) {
Set<String> colos;
if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) {
colos = getApplicableColos(type, entity);
} else {
colos = new HashSet<String>(Arrays.asList(coloExpr.split(",")));
}
return colos;
}
protected Set<String> getApplicableColos(String type, String name) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER) {
return getAllColos();
}
return getApplicableColos(type, EntityUtil.getEntity(type, name));
} catch (FalconException e) {
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
protected Set<String> getApplicableColos(String type, Entity entity) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER) {
return getAllColos();
}
Set<String> clusters = EntityUtil.getClustersDefined(entity);
Set<String> colos = new HashSet<String>();
for (String cluster : clusters) {
Cluster clusterEntity = EntityUtil.getEntity(EntityType.CLUSTER, cluster);
colos.add(clusterEntity.getColo());
}
return colos;
} catch (FalconException e) {
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
/**
* Submit a new entity. Entities can be of type feed, process or data end
* points. Entity definitions are validated structurally against schema and
* subsequently for other rules before they are admitted into the system
* <p/>
* Entity name acts as the key and an entity once added, can't be added
* again unless deleted.
*
* @param request - Servlet Request
* @param type - entity type - feed, process or data end point
* @param colo - applicable colo
* @return result of the operation
*/
public APIResult submit(HttpServletRequest request, String type, String colo) {
checkColo(colo);
try {
audit(request, "STREAMED_DATA", type, "SUBMIT");
Entity entity = submitInternal(request, type);
return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName());
} catch (Throwable e) {
LOG.error("Unable to persist entity object", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
/**
* Post an entity XML with entity type. Validates the XML which can be
* Process, Feed or Dataendpoint
*
* @param type entity type
* @return APIResule -Succeeded or Failed
*/
public APIResult validate(HttpServletRequest request, String type) {
try {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Entity entity = deserializeEntity(request, entityType);
validate(entity);
//Validate that the entity can be scheduled in the cluster
if (entity.getEntityType().isSchedulable()) {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
try {
getWorkflowEngine().dryRun(entity, cluster);
} catch (FalconException e) {
throw new FalconException("dryRun failed on cluster " + cluster, e);
}
}
}
return new APIResult(APIResult.Status.SUCCEEDED,
"Validated successfully (" + entityType + ") " + entity.getName());
} catch (Throwable e) {
LOG.error("Validation failed for entity ({})", type, e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
/**
* Deletes a scheduled entity, a deleted entity is removed completely from
* execution pool.
*
* @param type entity type
* @param entity entity name
* @return APIResult
*/
public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
checkColo(colo);
try {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
audit(request, entity, type, "DELETE");
String removedFromEngine = "";
try {
Entity entityObj = EntityUtil.getEntity(type, entity);
canRemove(entityObj);
if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) {
getWorkflowEngine().delete(entityObj);
removedFromEngine = "(KILLED in ENGINE)";
}
configStore.remove(entityType, entity);
} catch (EntityNotRegisteredException e) { // already deleted
return new APIResult(APIResult.Status.SUCCEEDED,
entity + "(" + type + ") doesn't exist. Nothing to do");
}
return new APIResult(APIResult.Status.SUCCEEDED,
entity + "(" + type + ") removed successfully " + removedFromEngine);
} catch (Throwable e) {
LOG.error("Unable to reach workflow engine for deletion or deletion failed", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
// Parallel update can get very clumsy if two feeds are updated which
// are referred by a single process. Sequencing them.
public synchronized APIResult update(HttpServletRequest request, String type, String entityName, String colo,
String effectiveTimeStr) {
checkColo(colo);
try {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
audit(request, entityName, type, "UPDATE");
Entity oldEntity = EntityUtil.getEntity(type, entityName);
Entity newEntity = deserializeEntity(request, entityType);
validate(newEntity);
validateUpdate(oldEntity, newEntity);
configStore.initiateUpdate(newEntity);
Date effectiveTime =
StringUtils.isEmpty(effectiveTimeStr) ? null : EntityUtil.parseDateUTC(effectiveTimeStr);
StringBuilder result = new StringBuilder("Updated successfully");
//Update in workflow engine
if (!DeploymentUtil.isPrism()) {
Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
newClusters.retainAll(oldClusters); //common clusters for update
oldClusters.removeAll(newClusters); //deleted clusters
for (String cluster : newClusters) {
Date myEffectiveTime = validateEffectiveTime(newEntity, cluster, effectiveTime);
result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, myEffectiveTime));
}
for (String cluster : oldClusters) {
getWorkflowEngine().delete(oldEntity, cluster);
}
}
configStore.update(entityType, newEntity);
return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
} catch (Throwable e) {
LOG.error("Update failed", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
} finally {
ConfigurationStore.get().cleanupUpdateInit();
}
}
private Date validateEffectiveTime(Entity entity, String cluster, Date effectiveTime) {
Date start = EntityUtil.getStartTime(entity, cluster);
Date end = EntityUtil.getEndTime(entity, cluster);
if (effectiveTime == null || effectiveTime.before(start) || effectiveTime.after(end)) {
return null;
}
return effectiveTime;
}
private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
throw new FalconException(
oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString());
}
if (oldEntity.getEntityType() == EntityType.CLUSTER) {
throw new FalconException("Update not supported for clusters");
}
String[] props = oldEntity.getEntityType().getImmutableProperties();
for (String prop : props) {
Object oldProp, newProp;
try {
oldProp = PropertyUtils.getProperty(oldEntity, prop);
newProp = PropertyUtils.getProperty(newEntity, prop);
} catch (Exception e) {
throw new FalconException(e);
}
if (!ObjectUtils.equals(oldProp, newProp)) {
throw new ValidationException(oldEntity.toShortString() + ": " + prop + " can't be changed");
}
}
}
private void canRemove(Entity entity) throws FalconException {
Pair<String, EntityType>[] referencedBy = EntityIntegrityChecker.referencedBy(entity);
if (referencedBy != null && referencedBy.length > 0) {
StringBuilder messages = new StringBuilder();
for (Pair<String, EntityType> ref : referencedBy) {
messages.append(ref).append("\n");
}
throw new FalconException(
entity.getName() + "(" + entity.getEntityType() + ") cant " + "be removed as it is referred by "
+ messages);
}
}
protected synchronized Entity submitInternal(HttpServletRequest request, String type)
throws IOException, FalconException {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Entity entity = deserializeEntity(request, entityType);
Entity existingEntity = configStore.get(entityType, entity.getName());
if (existingEntity != null) {
if (EntityUtil.equals(existingEntity, entity)) {
return existingEntity;
}
throw new EntityAlreadyExistsException(
entity.toShortString() + " already registered with configuration store. "
+ "Can't be submitted again. Try removing before submitting.");
}
validate(entity);
configStore.publish(entityType, entity);
LOG.info("Submit successful: ({}): {}", type, entity.getName());
return entity;
}
protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
throws IOException, FalconException {
EntityParser<?> entityParser = EntityParserFactory.getParser(entityType);
InputStream xmlStream = request.getInputStream();
if (xmlStream.markSupported()) {
xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
}
try {
return entityParser.parse(xmlStream);
} catch (FalconException e) {
if (LOG.isDebugEnabled() && xmlStream.markSupported()) {
try {
xmlStream.reset();
String xmlData = getAsString(xmlStream);
LOG.debug("XML DUMP for ({}): {}", entityType, xmlData, e);
} catch (IOException ignore) {
// ignore
}
}
throw e;
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void validate(Entity entity) throws FalconException {
EntityParser entityParser = EntityParserFactory.getParser(entity.getEntityType());
entityParser.validate(entity);
}
private String getAsString(InputStream xmlStream) throws IOException {
byte[] data = new byte[XML_DEBUG_LEN];
IOUtils.readFully(xmlStream, data, 0, XML_DEBUG_LEN);
return new String(data);
}
protected void audit(HttpServletRequest request, String entity, String type, String action) {
if (request == null) {
return; // this must be internal call from Falcon
}
AUDIT.info("Performed {} on {} ({}) :: {}/{}",
action, entity, type, request.getRemoteHost(), CurrentUser.getUser());
}
private enum EntityStatus {
SUBMITTED, SUSPENDED, RUNNING
}
/**
* Returns the status of requested entity.
*
* @param type entity type
* @param entity entity name
* @return String
*/
public APIResult getStatus(String type, String entity, String colo) {
checkColo(colo);
Entity entityObj;
try {
entityObj = EntityUtil.getEntity(type, entity);
EntityType entityType = EntityType.valueOf(type.toUpperCase());
EntityStatus status = getStatus(entityObj, entityType);
return new APIResult(Status.SUCCEEDED, status.name());
} catch (FalconWebException e) {
throw e;
} catch (Exception e) {
LOG.error("Unable to get status for entity {} ({})", entity, type, e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException {
EntityStatus status;
if (type.isSchedulable()) {
if (workflowEngine.isActive(entity)) {
if (workflowEngine.isSuspended(entity)) {
status = EntityStatus.SUSPENDED;
} else {
status = EntityStatus.RUNNING;
}
} else {
status = EntityStatus.SUBMITTED;
}
} else {
status = EntityStatus.SUBMITTED;
}
return status;
}
/**
* Returns dependencies.
*
* @param type entity type
* @param entityName entity name
* @return EntityList
*/
public EntityList getDependencies(String type, String entityName) {
try {
Entity entityObj = EntityUtil.getEntity(type, entityName);
Set<Entity> dependents = EntityGraph.get().getDependents(entityObj);
Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
return new EntityList(dependentEntities, entityObj);
} catch (Exception e) {
LOG.error("Unable to get dependencies for entityName {} ({})", entityName, type, e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
/**
* Returns the list of entities registered of a given type.
*
* @param type Only return entities of this type
* @param fieldStr fields that the query is interested in, separated by comma
* @param filterBy filter by a specific field.
* @param offset Pagination offset
* @param resultsPerPage Number of results that should be returned starting at the offset
* @return EntityList
*/
public EntityList getEntityList(String type, String fieldStr, String filterBy, String filterTags,
String orderBy, Integer offset, Integer resultsPerPage) {
HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toLowerCase().split(",")));
final HashMap<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
final ArrayList<String> filterByTags = getFilterByTags(filterTags);
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Collection<String> entityNames = configStore.getEntities(entityType);
if (entityNames == null || entityNames.isEmpty()) {
return new EntityList(new Entity[]{});
}
ArrayList<Entity> entities = new ArrayList<Entity>();
for (String entityName : entityNames) {
Entity entity;
try {
entity = configStore.get(entityType, entityName);
if (entity == null) {
continue;
}
} catch (FalconException e1) {
LOG.error("Unable to get list for entities for ({})", type, e1);
throw FalconWebException.newException(e1, Response.Status.BAD_REQUEST);
}
List<String> tags = getTags(entity);
List<String> pipelines = getPipelines(entity);
String entityStatus = getStatusString(entity);
if (filterEntity(entity, entityStatus,
filterByFieldsValues, filterByTags, tags, pipelines)) {
continue;
}
entities.add(entity);
}
// Sort entities before returning a subset of entity elements.
entities = sortEntities(entities, orderBy);
int pageCount = getRequiredNumberOfResults(entities.size(), offset, resultsPerPage);
if (pageCount == 0) { // handle pagination
return new EntityList(new Entity[]{});
}
return new EntityList(buildEntityElements(offset, fields, entities, pageCount));
}
protected static HashMap<String, String> getFilterByFieldsValues(String filterBy) {
//Filter the results by specific field:value
HashMap<String, String> filterByFieldValues = new HashMap<String, String>();
if (!StringUtils.isEmpty(filterBy)) {
String[] fieldValueArray = filterBy.split(",");
for (String fieldValue : fieldValueArray) {
String[] splits = fieldValue.split(":", 2);
String filterByField = splits[0];
if (splits.length == 2) {
filterByFieldValues.put(filterByField, splits[1]);
} else {
filterByFieldValues.put(filterByField, "");
}
}
}
return filterByFieldValues;
}
private static ArrayList<String> getFilterByTags(String filterTags) {
ArrayList<String> filterTagsList = new ArrayList<String>();
if (!StringUtils.isEmpty(filterTags)) {
String[] splits = filterTags.split(",");
for (String tag : splits) {
filterTagsList.add(tag.trim());
}
}
return filterTagsList;
}
private List<String> getTags(Entity entity) {
String rawTags = null;
switch (entity.getEntityType()) {
case PROCESS:
rawTags = ((Process) entity).getTags();
break;
case FEED:
rawTags = ((Feed) entity).getTags();
break;
case CLUSTER:
rawTags = ((Cluster) entity).getTags();
break;
default:
break;
}
List<String> tags = new ArrayList<String>();
if (!StringUtils.isEmpty(rawTags)) {
for(String tag : rawTags.split(",")) {
LOG.info("Adding tag - "+ tag);
tags.add(tag.trim());
}
}
return tags;
}
private List<String> getPipelines(Entity entity) {
List<String> pipelines = new ArrayList<String>();
if (entity.getEntityType().equals(EntityType.PROCESS)) {
Process process = (Process) entity;
String pipelineString = process.getPipelines();
if (pipelineString != null) {
for (String pipeline : pipelineString.split(",")) {
pipelines.add(pipeline.trim());
}
}
}
return pipelines;
}
private String getStatusString(Entity entity) {
String statusString;
try {
statusString = getStatus(entity, entity.getEntityType()).name();
} catch (Throwable throwable) {
// Unable to fetch statusString, setting it to unknown for backwards compatibility
statusString = "UNKNOWN";
}
return statusString;
}
private boolean filterEntity(Entity entity, String entityStatus,
HashMap<String, String> filterByFieldsValues, ArrayList<String> filterByTags,
List<String> tags, List<String> pipelines) {
if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity)) {
// the user who requested list query has no permission to access this entity. Skip this entity
return true;
}
return !((filterByTags.size() == 0 || tags.size() == 0 || !filterEntityByTags(filterByTags, tags))
&& (filterByFieldsValues.size() == 0
|| !filterEntityByFields(entity, filterByFieldsValues, entityStatus, pipelines)));
}
protected boolean isEntityAuthorized(Entity entity) {
try {
SecurityUtil.getAuthorizationProvider().authorizeResource("entities", "list",
entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUgi());
} catch (Exception e) {
LOG.error("Authorization failed for entity=" + entity.getName()
+ " for user=" + CurrentUser.getUser(), e);
return false;
}
return true;
}
private boolean filterEntityByTags(ArrayList<String> filterTagsList, List<String> tags) {
boolean filterEntity = false;
for (String tag : filterTagsList) {
if (!tags.contains(tag)) {
filterEntity = true;
break;
}
}
return filterEntity;
}
private boolean filterEntityByFields(Entity entity, HashMap<String, String> filterKeyVals,
String status, List<String> pipelines) {
boolean filterEntity = false;
if (filterKeyVals.size() != 0) {
String filterValue;
for (Map.Entry<String, String> pair : filterKeyVals.entrySet()) {
filterValue = pair.getValue();
if (StringUtils.isEmpty(filterValue)) {
continue; // nothing to filter
}
EntityList.EntityFilterByFields filter =
EntityList.EntityFilterByFields.valueOf(pair.getKey().toUpperCase());
switch (filter) {
case TYPE:
if (!entity.getEntityType().toString().equalsIgnoreCase(filterValue)) {
filterEntity = true;
}
break;
case NAME:
if (!entity.getName().equalsIgnoreCase(filterValue)) {
filterEntity = true;
}
break;
case STATUS:
if (!status.equalsIgnoreCase(filterValue)) {
filterEntity = true;
}
break;
case PIPELINES:
if (entity.getEntityType().equals(EntityType.PROCESS)
&& !pipelines.contains(filterValue)) {
filterEntity = true;
}
break;
default:
break;
}
if (filterEntity) {
break;
}
}
}
return filterEntity;
}
private ArrayList<Entity> sortEntities(ArrayList<Entity> entities, String orderBy) {
// Sort the ArrayList using orderBy param
if (!StringUtils.isEmpty(orderBy)) {
EntityList.EntityFieldList orderByField = EntityList.EntityFieldList.valueOf(orderBy.toUpperCase());
switch (orderByField) {
case TYPE:
Collections.sort(entities, new Comparator<Entity>() {
@Override
public int compare(Entity e1, Entity e2) {
return e1.getEntityType().compareTo(e2.getEntityType());
}
});
break;
case NAME:
Collections.sort(entities, new Comparator<Entity>() {
@Override
public int compare(Entity e1, Entity e2) {
return e1.getName().compareTo(e2.getName());
}
});
break;
default:
break;
}
} // else no sort
return entities;
}
protected int getRequiredNumberOfResults(int arraySize, int offset, int numresults) {
/* Get a subset of elements based on offset and count. When returning subset of elements,
elements[offset] is included. Size 10, offset 10, return empty list.
Size 10, offset 5, count 3, return elements[5,6,7].
Size 10, offset 5, count >= 5, return elements[5,6,7,8,9]
When count is -1, return elements starting from elements[offset] until the end */
if (offset >= arraySize || arraySize == 0) {
// No elements to return
return 0;
}
int retLen = arraySize - offset;
if (retLen > numresults && numresults != -1) {
retLen = numresults;
}
return retLen;
}
private EntityElement[] buildEntityElements(Integer offset, HashSet<String> fields,
ArrayList<Entity> entities, int pageCount) {
EntityElement[] elements = new EntityElement[pageCount];
int elementIndex = 0;
for (Entity entity : entities.subList(offset, (offset + pageCount))) {
elements[elementIndex++] = getEntityElement(entity, fields);
}
return elements;
}
private EntityElement getEntityElement(Entity entity, HashSet<String> fields) {
EntityElement elem = new EntityElement();
elem.type = entity.getEntityType().toString();
elem.name = entity.getName();
if (fields.contains("status")) {
elem.status = getStatusString(entity);
}
if (fields.contains("pipelines")) {
elem.pipelines = getPipelines(entity);
}
if (fields.contains("tags")) {
elem.tag = getTags(entity);
}
return elem;
}
/**
* Returns the entity definition as an XML based on name.
*
* @param type entity type
* @param entityName entity name
* @return String
*/
public String getEntityDefinition(String type, String entityName) {
try {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Entity entity = configStore.get(entityType, entityName);
if (entity == null) {
throw new NoSuchElementException(entityName + " (" + type + ") not found");
}
return entity.toString();
} catch (Throwable e) {
LOG.error("Unable to get entity definition from config store for ({}): {}", type, entityName, e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
protected AbstractWorkflowEngine getWorkflowEngine() {
return this.workflowEngine;
}
}