blob: e40e0344511947c03c999c51bfce68bd2159b143 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.MultivaluedMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
import static org.apache.nifi.atlas.AtlasUtils.toStr;
import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
public class NiFiAtlasClient {
private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class);
private final AtlasClientV2 atlasClient;
public NiFiAtlasClient(AtlasClientV2 atlasClient) {
this.atlasClient = atlasClient;
}
/**
* This is an utility method to delete unused types.
* Should be used during development or testing only.
* @param typeNames to delete
*/
void deleteTypeDefs(String ... typeNames) throws AtlasServiceException {
final AtlasTypesDef existingTypeDef = getTypeDefs(typeNames);
try {
atlasClient.deleteAtlasTypeDefs(existingTypeDef);
} catch (UniformInterfaceException e) {
if (e.getResponse().getStatus() == 204) {
// 204 is a successful response.
// NOTE: However after executing this, Atlas should be restarted to work properly.
logger.info("Deleted type defs: {}", existingTypeDef);
} else {
throw e;
}
}
}
/**
* @return True when required NiFi types are already created.
*/
public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException {
final Set<String> typeNames = ENTITIES.keySet();
final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream()
.collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity()));
return typeNames.stream().allMatch(existingDefs::containsKey);
}
/**
* Create or update NiFi types in Atlas type system.
* @param update If false, doesn't perform anything if there is existing type def for the name.
*/
public void registerNiFiTypeDefs(boolean update) throws AtlasServiceException {
final Set<String> typeNames = ENTITIES.keySet();
final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream()
.collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity()));
final AtomicBoolean shouldUpdate = new AtomicBoolean(false);
final AtlasTypesDef type = new AtlasTypesDef();
typeNames.stream().filter(typeName -> {
final AtlasEntityDef existingDef = existingDefs.get(typeName);
if (existingDef != null) {
// type is already defined.
if (!update) {
return false;
}
shouldUpdate.set(true);
}
return true;
}).forEach(typeName -> {
final NiFiTypes.EntityDefinition def = ENTITIES.get(typeName);
final AtlasEntityDef entity = new AtlasEntityDef();
type.getEntityDefs().add(entity);
entity.setName(typeName);
Set<String> superTypes = new HashSet<>();
List<AtlasAttributeDef> attributes = new ArrayList<>();
def.define(entity, superTypes, attributes);
entity.setSuperTypes(superTypes);
entity.setAttributeDefs(attributes);
});
// Create or Update.
final AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get()
? atlasClient.updateAtlasTypeDefs(type)
: atlasClient.createAtlasTypeDefs(type);
logger.debug("Result={}", atlasTypeDefsResult);
}
private AtlasTypesDef getTypeDefs(String ... typeNames) throws AtlasServiceException {
final AtlasTypesDef typeDefs = new AtlasTypesDef();
for (int i = 0; i < typeNames.length; i++) {
final MultivaluedMap<String, String> searchParams = new MultivaluedMapImpl();
searchParams.add(SearchFilter.PARAM_NAME, typeNames[i]);
final AtlasTypesDef typeDef = atlasClient.getAllTypeDefs(new SearchFilter(searchParams));
typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs());
}
logger.debug("typeDefs={}", typeDefs);
return typeDefs;
}
private Pattern FLOW_PATH_URL_PATTERN = Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$");
/**
* Fetch existing NiFiFlow entity from Atlas.
* @param rootProcessGroupId The id of a NiFi flow root process group.
* @param clusterName The cluster name of a flow.
* @return A NiFiFlow instance filled with retrieved data from Atlas. Status objects are left blank, e.g. ProcessorStatus.
* @throws AtlasServiceException Thrown if requesting to Atlas API failed, including when the flow is not found.
*/
public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException {
final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId);
final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, ATTR_QUALIFIED_NAME, qualifiedName);
final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = searchEntityDef(flowId);
if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) {
return null;
}
final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
final Map<String, AtlasEntity> nifiFlowReferredEntities = nifiFlowExt.getReferredEntities();
final Map<String, Object> attributes = nifiFlowEntity.getAttributes();
final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
nifiFlow.setExEntity(nifiFlowEntity);
nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME)));
nifiFlow.setClusterName(clusterName);
nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
nifiFlow.getQueues().putAll(fetchFlowComponents(TYPE_NIFI_QUEUE, nifiFlowReferredEntities));
nifiFlow.getRootInputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_INPUT_PORT, nifiFlowReferredEntities));
nifiFlow.getRootOutputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_OUTPUT_PORT, nifiFlowReferredEntities));
final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
final Map<AtlasObjectId, AtlasEntity> flowPathEntities = fetchFlowComponents(TYPE_NIFI_FLOW_PATH, nifiFlowReferredEntities);
for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
final String pathQualifiedName = toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
final NiFiFlowPath flowPath = new NiFiFlowPath(getComponentIdFromQualifiedName(pathQualifiedName));
if (flowPathEntity.hasAttribute(ATTR_URL)) {
final Matcher urlMatcher = FLOW_PATH_URL_PATTERN.matcher(toStr(flowPathEntity.getAttribute(ATTR_URL)));
if (urlMatcher.matches()) {
flowPath.setGroupId(urlMatcher.group(1));
}
}
flowPath.setExEntity(flowPathEntity);
flowPath.setName(toStr(flowPathEntity.getAttribute(ATTR_NAME)));
flowPath.getInputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_INPUTS))).keySet());
flowPath.getOutputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_OUTPUTS))).keySet());
flowPath.startTrackingChanges(nifiFlow);
flowPaths.put(flowPath.getId(), flowPath);
}
nifiFlow.startTrackingChanges();
return nifiFlow;
}
/**
* Retrieves the flow components of type {@code componentType} from Atlas server.
* Deleted components will be filtered out before calling Atlas.
* Atlas object ids will be initialized with all the attributes (guid, type, unique attributes) in order to be able
* to match ids retrieved from Atlas (having guid) and ids created by the reporting task (not having guid yet).
*
* @param componentType Atlas type of the flow component (nifi_flow_path, nifi_queue, nifi_input_port, nifi_output_port)
* @param referredEntities referred entities of the flow entity (returned when the flow fetched) containing the basic data (id, status) of the flow components
* @return flow component entities mapped to their object ids
*/
private Map<AtlasObjectId, AtlasEntity> fetchFlowComponents(String componentType, Map<String, AtlasEntity> referredEntities) {
return referredEntities.values().stream()
.filter(referredEntity -> referredEntity.getTypeName().equals(componentType))
.filter(referredEntity -> referredEntity.getStatus() == AtlasEntity.Status.ACTIVE)
.map(referredEntity -> {
final Map<String, Object> uniqueAttributes = Collections.singletonMap(ATTR_QUALIFIED_NAME, referredEntity.getAttribute(ATTR_QUALIFIED_NAME));
final AtlasObjectId id = new AtlasObjectId(referredEntity.getGuid(), componentType, uniqueAttributes);
try {
final AtlasEntity.AtlasEntityWithExtInfo fetchedEntityExt = searchEntityDef(id);
return new Tuple<>(id, fetchedEntityExt.getEntity());
} catch (AtlasServiceException e) {
logger.warn("Failed to search entity by id {}, due to {}", id, e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
}
@SuppressWarnings("unchecked")
private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
if (_references == null) {
return Collections.emptyList();
}
List<Map<String, Object>> references = (List<Map<String, Object>>) _references;
return references.stream()
.map(ref -> new AtlasObjectId(toStr(ref.get(ATTR_GUID)), toStr(ref.get(ATTR_TYPENAME)), ref))
.collect(Collectors.toList());
}
/**
* <p>AtlasObjectIds returned from Atlas have GUID, but do not have qualifiedName, while ones created by the reporting task
* do not have GUID, but qualifiedName. AtlasObjectId.equals returns false for this combination.
* In order to match ids correctly, this method converts fetches actual entities from ids to get qualifiedName attribute.</p>
*
* <p>Also, AtlasObjectIds returned from Atlas does not have entity state.
* If Atlas is configured to use soft-delete (default), deleted ids are still returned.
* Fetched entities are used to determine whether an AtlasObjectId is still active or deleted.
* Deleted entities will not be included in the result of this method.
* </p>
* @param ids to convert
* @return AtlasObjectIds with qualifiedName
*/
private Map<AtlasObjectId, AtlasEntity> toQualifiedNameIds(List<AtlasObjectId> ids) {
if (ids == null) {
return Collections.emptyMap();
}
return ids.stream().distinct().map(id -> {
try {
final AtlasEntity.AtlasEntityWithExtInfo entityExt = searchEntityDef(id);
final AtlasEntity entity = entityExt.getEntity();
if (AtlasEntity.Status.DELETED.equals(entity.getStatus())) {
return null;
}
final Map<String, Object> uniqueAttrs = Collections.singletonMap(ATTR_QUALIFIED_NAME, entity.getAttribute(ATTR_QUALIFIED_NAME));
return new Tuple<>(new AtlasObjectId(id.getGuid(), id.getTypeName(), uniqueAttrs), entity);
} catch (AtlasServiceException e) {
logger.warn("Failed to search entity by id {}, due to {}", id, e);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
}
public void registerNiFiFlow(NiFiFlow nifiFlow) throws AtlasServiceException {
// Create parent flow entity, so that common properties are taken over.
final AtlasEntity flowEntity = registerNiFiFlowEntity(nifiFlow);
// Create DataSet entities those are created by this NiFi flow.
final Map<String, List<AtlasEntity>> updatedDataSetEntities = registerDataSetEntities(nifiFlow);
// Create path entities.
final Set<AtlasObjectId> remainingPathIds = registerFlowPathEntities(nifiFlow);
// Update these attributes only if anything is created, updated or removed.
boolean shouldUpdateNiFiFlow = nifiFlow.isMetadataUpdated();
if (remainingPathIds != null) {
flowEntity.setAttribute(ATTR_FLOW_PATHS, remainingPathIds);
shouldUpdateNiFiFlow = true;
}
if (updatedDataSetEntities.containsKey(TYPE_NIFI_QUEUE)) {
flowEntity.setAttribute(ATTR_QUEUES, updatedDataSetEntities.get(TYPE_NIFI_QUEUE));
shouldUpdateNiFiFlow = true;
}
if (updatedDataSetEntities.containsKey(TYPE_NIFI_INPUT_PORT)) {
flowEntity.setAttribute(ATTR_INPUT_PORTS, updatedDataSetEntities.get(TYPE_NIFI_INPUT_PORT));
shouldUpdateNiFiFlow = true;
}
if (updatedDataSetEntities.containsKey(TYPE_NIFI_OUTPUT_PORT)) {
flowEntity.setAttribute(ATTR_OUTPUT_PORTS, updatedDataSetEntities.get(TYPE_NIFI_OUTPUT_PORT));
shouldUpdateNiFiFlow = true;
}
if (logger.isDebugEnabled()) {
logger.debug("### NiFi Flow Audit Logs START");
nifiFlow.getUpdateAudit().forEach(logger::debug);
nifiFlow.getFlowPaths().forEach((k, v) -> {
logger.debug("--- NiFiFlowPath Audit Logs: {}", k);
v.getUpdateAudit().forEach(logger::debug);
});
logger.debug("### NiFi Flow Audit Logs END");
}
if (shouldUpdateNiFiFlow) {
// Send updated entities.
final List<AtlasEntity> entities = new ArrayList<>();
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
entities.add(flowEntity);
try {
final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities);
logger.debug("mutation response={}", mutationResponse);
} catch (AtlasServiceException e) {
if (e.getStatus().getStatusCode() == AtlasErrorCode.INSTANCE_NOT_FOUND.getHttpCode().getStatusCode()
&& e.getMessage().contains(AtlasErrorCode.INSTANCE_NOT_FOUND.getErrorCode())) {
// NOTE: If previously existed nifi_flow_path entity is removed because the path is removed from NiFi,
// then Atlas respond with 404 even though the entity is successfully updated.
// Following exception is thrown in this case. Just log it.
// org.apache.atlas.AtlasServiceException:
// Metadata service API org.apache.atlas.AtlasBaseClient$APIInfo@45a37759
// failed with status 404 (Not Found) Response Body
// ({"errorCode":"ATLAS-404-00-00B","errorMessage":"Given instance is invalid/not found:
// Could not find entities in the repository with guids: [96d24487-cd66-4795-b552-f00b426fed26]"})
logger.debug("Received error response from Atlas but it should be stored." + e);
} else {
throw e;
}
}
}
}
private AtlasEntity registerNiFiFlowEntity(final NiFiFlow nifiFlow) throws AtlasServiceException {
final List<AtlasEntity> entities = new ArrayList<>();
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
if (!nifiFlow.isMetadataUpdated()) {
// Nothing has been changed, return existing entity.
return nifiFlow.getExEntity();
}
// Create parent flow entity using existing NiFiFlow entity if available, so that common properties are taken over.
final AtlasEntity flowEntity = nifiFlow.getExEntity() != null ? new AtlasEntity(nifiFlow.getExEntity()) : new AtlasEntity();
flowEntity.setTypeName(TYPE_NIFI_FLOW);
flowEntity.setVersion(1L);
flowEntity.setAttribute(ATTR_NAME, nifiFlow.getFlowName());
flowEntity.setAttribute(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(nifiFlow.getRootProcessGroupId()));
flowEntity.setAttribute(ATTR_URL, nifiFlow.getUrl());
flowEntity.setAttribute(ATTR_DESCRIPTION, nifiFlow.getDescription());
// If flowEntity is not persisted yet, then store nifi_flow entity to make nifiFlowId available for other entities.
if (flowEntity.getGuid().startsWith("-")) {
entities.add(flowEntity);
final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities);
logger.debug("Registered a new nifi_flow entity, mutation response={}", mutationResponse);
final String assignedNiFiFlowGuid = mutationResponse.getGuidAssignments().get(flowEntity.getGuid());
flowEntity.setGuid(assignedNiFiFlowGuid);
nifiFlow.setAtlasGuid(assignedNiFiFlowGuid);
}
return flowEntity;
}
/**
* Register DataSet within specified NiFiFlow.
* @return Set of registered Atlas type names and its remaining entities without deleted ones.
*/
private Map<String, List<AtlasEntity>> registerDataSetEntities(final NiFiFlow nifiFlow) throws AtlasServiceException {
final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedDataSetEntities();
if (changedEntities.containsKey(CREATED)) {
final List<AtlasEntity> createdEntities = changedEntities.get(CREATED);
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities);
logger.debug("Created DataSet entities mutation response={}", mutationResponse);
final Map<String, String> guidAssignments = mutationResponse.getGuidAssignments();
for (AtlasEntity entity : createdEntities) {
final String guid = guidAssignments.get(entity.getGuid());
final String qualifiedName = toStr(entity.getAttribute(ATTR_QUALIFIED_NAME));
if (StringUtils.isEmpty(guid)) {
logger.warn("GUID was not assigned for {}::{} for some reason.", entity.getTypeName(), qualifiedName);
continue;
}
final Map<AtlasObjectId, AtlasEntity> entityMap;
switch (entity.getTypeName()) {
case TYPE_NIFI_INPUT_PORT:
entityMap = nifiFlow.getRootInputPortEntities();
break;
case TYPE_NIFI_OUTPUT_PORT:
entityMap = nifiFlow.getRootOutputPortEntities();
break;
case TYPE_NIFI_QUEUE:
entityMap = nifiFlow.getQueues();
break;
default:
throw new RuntimeException(entity.getTypeName() + " is not expected.");
}
// In order to replace the id, remove current id which does not have GUID.
findIdByQualifiedName(entityMap.keySet(), qualifiedName).ifPresent(entityMap::remove);
entity.setGuid(guid);
final AtlasObjectId idWithGuid = new AtlasObjectId(guid, entity.getTypeName(), Collections.singletonMap(ATTR_QUALIFIED_NAME, qualifiedName));
entityMap.put(idWithGuid, entity);
}
}
if (changedEntities.containsKey(UPDATED)) {
final List<AtlasEntity> updatedEntities = changedEntities.get(UPDATED);
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
final EntityMutationResponse mutationResponse = atlasClient.updateEntities(atlasEntities);
logger.debug("Updated DataSet entities mutation response={}", mutationResponse);
}
final Set<String> changedTypeNames = changedEntities.entrySet().stream()
.filter(entry -> !AS_IS.equals(entry.getKey())).flatMap(entry -> entry.getValue().stream())
.map(AtlasEntity::getTypeName)
.collect(Collectors.toSet());
// NOTE: Cascading DELETE will be performed when parent NiFiFlow is updated without removed DataSet entities.
final Map<String, List<AtlasEntity>> remainingEntitiesByType = changedEntities.entrySet().stream()
.filter(entry -> !DELETED.equals(entry.getKey()))
.flatMap(entry -> entry.getValue().stream())
.filter(entity -> changedTypeNames.contains(entity.getTypeName()))
.collect(Collectors.groupingBy(AtlasEntity::getTypeName));
// If all entities are deleted for a type (e.g. nifi_intput_port), then remainingEntitiesByType will not contain such key.
// If the returning map does not contain anything for a type, then the corresponding attribute will not be updated.
// To empty an attribute when all of its elements are deleted, add empty list for a type.
changedTypeNames.forEach(changedTypeName -> remainingEntitiesByType.computeIfAbsent(changedTypeName, k -> Collections.emptyList()));
return remainingEntitiesByType;
}
private Set<AtlasObjectId> registerFlowPathEntities(final NiFiFlow nifiFlow) throws AtlasServiceException {
final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedFlowPathEntities();
if (changedEntities.containsKey(CREATED)) {
final List<AtlasEntity> createdEntities = changedEntities.get(CREATED);
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities);
logger.debug("Created FlowPath entities mutation response={}", mutationResponse);
final Map<String, String> guidAssignments = mutationResponse.getGuidAssignments();
createdEntities.forEach(entity -> {
final String guid = entity.getGuid();
entity.setGuid(guidAssignments.get(guid));
final String pathId = getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME)));
final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
path.setExEntity(entity);
});
}
if (changedEntities.containsKey(UPDATED)) {
final List<AtlasEntity> updatedEntities = changedEntities.get(UPDATED);
final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
final EntityMutationResponse mutationResponse = atlasClient.updateEntities(atlasEntities);
logger.debug("Updated FlowPath entities mutation response={}", mutationResponse);
updatedEntities.forEach(entity -> {
final String pathId = getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME)));
final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
path.setExEntity(entity);
});
}
if (NiFiFlow.EntityChangeType.containsChange(changedEntities.keySet())) {
return changedEntities.entrySet().stream()
.filter(entry -> !DELETED.equals(entry.getKey())).flatMap(entry -> entry.getValue().stream())
.map(path -> new AtlasObjectId(path.getGuid(), TYPE_NIFI_FLOW_PATH,
Collections.singletonMap(ATTR_QUALIFIED_NAME, path.getAttribute(ATTR_QUALIFIED_NAME))))
.collect(Collectors.toSet());
}
return null;
}
public AtlasEntity.AtlasEntityWithExtInfo searchEntityDef(AtlasObjectId id) throws AtlasServiceException {
final String guid = id.getGuid();
if (!StringUtils.isEmpty(guid)) {
return atlasClient.getEntityByGuid(guid, true, false);
}
final Map<String, String> attributes = new HashMap<>();
id.getUniqueAttributes().entrySet().stream().filter(entry -> entry.getValue() != null)
.forEach(entry -> attributes.put(entry.getKey(), entry.getValue().toString()));
return atlasClient.getEntityByAttribute(id.getTypeName(), attributes, true, false);
}
}