ATLAS-2489: Lineage info to include relationship guid
Change-Id: Ie26d1ad07b6ec66beb42830ad154a9dd81e7933f
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
index a2e1b5e..27186ca 100644
--- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
@@ -145,12 +145,14 @@
public static class LineageRelation {
private String fromEntityId;
private String toEntityId;
+ private String relationshipId;
public LineageRelation() { }
- public LineageRelation(String fromEntityId, String toEntityId) {
+ public LineageRelation(String fromEntityId, String toEntityId, final String relationshipId) {
this.fromEntityId = fromEntityId;
this.toEntityId = toEntityId;
+ this.relationshipId = relationshipId;
}
public String getFromEntityId() {
@@ -169,18 +171,27 @@
this.toEntityId = toEntityId;
}
+ public String getRelationshipId() {
+ return relationshipId;
+ }
+
+ public void setRelationshipId(final String relationshipId) {
+ this.relationshipId = relationshipId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LineageRelation that = (LineageRelation) o;
return Objects.equals(fromEntityId, that.fromEntityId) &&
- Objects.equals(toEntityId, that.toEntityId);
+ Objects.equals(toEntityId, that.toEntityId) &&
+ Objects.equals(relationshipId, that.relationshipId);
}
@Override
public int hashCode() {
- return Objects.hash(fromEntityId, toEntityId);
+ return Objects.hash(fromEntityId, toEntityId, relationshipId);
}
@Override
@@ -188,6 +199,7 @@
return "LineageRelation{" +
"fromEntityId='" + fromEntityId + '\'' +
", toEntityId='" + toEntityId + '\'' +
+ ", relationshipId='" + relationshipId + '\'' +
'}';
}
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index ec4125d..633ad7c 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -33,6 +33,8 @@
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
@@ -46,13 +48,14 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,9 +63,11 @@
@Service
public class EntityLineageService implements AtlasLineageService {
- private static final String INPUT_PROCESS_EDGE = "__Process.inputs";
- private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
- private static final String COLUMNS = "columns";
+ private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
+
+ private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
+ private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
+ private static final String COLUMNS = "columns";
private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -162,7 +167,7 @@
List<String> ret = new ArrayList<>();
Object columnObjs = entity.getAttribute(COLUMNS);
- if (columnObjs != null && columnObjs instanceof List) {
+ if (columnObjs instanceof List) {
for (Object pkObj : (List) columnObjs) {
if (pkObj instanceof AtlasObjectId) {
ret.add(((AtlasObjectId) pkObj).getGuid());
@@ -182,35 +187,27 @@
Set<LineageRelation> relations = new HashSet<>();
String lineageQuery = getLineageQuery(guid, direction, depth);
- List paths = (List) graph.executeGremlinScript(lineageQuery, true);
+ List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
- if (CollectionUtils.isNotEmpty(paths)) {
- for (Object path : paths) {
- if (path instanceof List) {
- List vertices = (List) path;
+ if (CollectionUtils.isNotEmpty(edgeMapList)) {
+ for (Object edgeMap : edgeMapList) {
+ if (edgeMap instanceof Map) {
+ for (final Object o : ((Map) edgeMap).entrySet()) {
+ final Map.Entry entry = (Map.Entry) o;
+ Object value = entry.getValue();
- if (CollectionUtils.isNotEmpty(vertices)) {
- AtlasEntityHeader prev = null;
-
- for (Object vertex : vertices) {
- if (!(vertex instanceof AtlasVertex)) {
- continue;
- }
-
- AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader((AtlasVertex) vertex);
-
- if (!entities.containsKey(entity.getGuid())) {
- entities.put(entity.getGuid(), entity);
- }
-
- if (prev != null) {
- if (direction.equals(LineageDirection.INPUT)) {
- relations.add(new LineageRelation(entity.getGuid(), prev.getGuid()));
- } else if (direction.equals(LineageDirection.OUTPUT)) {
- relations.add(new LineageRelation(prev.getGuid(), entity.getGuid()));
+ if (value instanceof List) {
+ for (Object elem : (List) value) {
+ if (elem instanceof AtlasEdge) {
+ processEdge((AtlasEdge) elem, entities, relations);
+ } else {
+ LOG.warn("Invalid value of type {} found, ignoring", (elem != null ? elem.getClass().getSimpleName() : "null"));
}
}
- prev = entity;
+ } else if (value instanceof AtlasEdge) {
+ processEdge((AtlasEdge) value, entities, relations);
+ } else {
+ LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
}
}
}
@@ -220,6 +217,31 @@
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
}
+ private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
+ AtlasVertex inVertex = edge.getInVertex();
+ AtlasVertex outVertex = edge.getOutVertex();
+ String inGuid = AtlasGraphUtilsV1.getIdFromVertex(inVertex);
+ String outGuid = AtlasGraphUtilsV1.getIdFromVertex(outVertex);
+ String relationGuid = AtlasGraphUtilsV1.getProperty(edge, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
+ boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+
+ if (!entities.containsKey(inGuid)) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex);
+ entities.put(inGuid, entityHeader);
+ }
+
+ if (!entities.containsKey(outGuid)) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex);
+ entities.put(outGuid, entityHeader);
+ }
+
+ if (isInputEdge) {
+ relations.add(new LineageRelation(inGuid, outGuid, relationGuid));
+ } else {
+ relations.add(new LineageRelation(outGuid, inGuid, relationGuid));
+ }
+ }
+
private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException {
AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth);
AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
@@ -236,10 +258,10 @@
String lineageQuery = null;
if (direction.equals(LineageDirection.INPUT)) {
- lineageQuery = generateLineageQuery(entityGuid, depth, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
+ lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
} else if (direction.equals(LineageDirection.OUTPUT)) {
- lineageQuery = generateLineageQuery(entityGuid, depth, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
+ lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
}
return lineageQuery;
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 58e3492..8828a87 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -44,13 +44,9 @@
case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()";
case FULL_LINEAGE:
- return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s'))." +
- "emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
- "path().toList()";
+ return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE:
- return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s')).times(%s)." +
- "emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
- "path().toList()";
+ return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
case TO_RANGE_LIST:
return ".range(startIdx, endIdx).toList()";
case RELATIONSHIP_SEARCH: