ATLAS-2489: Lineage info to include relationship guid

Change-Id: Ie26d1ad07b6ec66beb42830ad154a9dd81e7933f
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
index a2e1b5e..27186ca 100644
--- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
@@ -145,12 +145,14 @@
     public static class LineageRelation {
         private String fromEntityId;
         private String toEntityId;
+        private String relationshipId;
 
         public LineageRelation() { }
 
-        public LineageRelation(String fromEntityId, String toEntityId) {
+        public LineageRelation(String fromEntityId, String toEntityId, final String relationshipId) {
             this.fromEntityId = fromEntityId;
             this.toEntityId   = toEntityId;
+            this.relationshipId = relationshipId;
         }
 
         public String getFromEntityId() {
@@ -169,18 +171,27 @@
             this.toEntityId = toEntityId;
         }
 
+        public String getRelationshipId() {
+            return relationshipId;
+        }
+
+        public void setRelationshipId(final String relationshipId) {
+            this.relationshipId = relationshipId;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             LineageRelation that = (LineageRelation) o;
             return Objects.equals(fromEntityId, that.fromEntityId) &&
-                    Objects.equals(toEntityId, that.toEntityId);
+                    Objects.equals(toEntityId, that.toEntityId) &&
+                    Objects.equals(relationshipId, that.relationshipId);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(fromEntityId, toEntityId);
+            return Objects.hash(fromEntityId, toEntityId, relationshipId);
         }
 
         @Override
@@ -188,6 +199,7 @@
             return "LineageRelation{" +
                     "fromEntityId='" + fromEntityId + '\'' +
                     ", toEntityId='" + toEntityId + '\'' +
+                    ", relationshipId='" + relationshipId + '\'' +
                     '}';
         }
     }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index ec4125d..633ad7c 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -33,6 +33,8 @@
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
@@ -46,13 +48,14 @@
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,9 +63,11 @@
 
 @Service
 public class EntityLineageService implements AtlasLineageService {
-    private static final String INPUT_PROCESS_EDGE  = "__Process.inputs";
-    private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
-    private static final String COLUMNS             = "columns";
+    private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
+
+    private static final String PROCESS_INPUTS_EDGE  = "__Process.inputs";
+    private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
+    private static final String COLUMNS              = "columns";
 
     private final AtlasGraph                graph;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -162,7 +167,7 @@
         List<String> ret        = new ArrayList<>();
         Object       columnObjs = entity.getAttribute(COLUMNS);
 
-        if (columnObjs != null && columnObjs instanceof List) {
+        if (columnObjs instanceof List) {
             for (Object pkObj : (List) columnObjs) {
                 if (pkObj instanceof AtlasObjectId) {
                     ret.add(((AtlasObjectId) pkObj).getGuid());
@@ -182,35 +187,27 @@
         Set<LineageRelation>           relations    = new HashSet<>();
         String                         lineageQuery = getLineageQuery(guid, direction, depth);
 
-        List paths = (List) graph.executeGremlinScript(lineageQuery, true);
+        List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
 
-        if (CollectionUtils.isNotEmpty(paths)) {
-            for (Object path : paths) {
-                if (path instanceof List) {
-                    List vertices = (List) path;
+        if (CollectionUtils.isNotEmpty(edgeMapList)) {
+            for (Object edgeMap : edgeMapList) {
+                if (edgeMap instanceof Map) {
+                    for (final Object o : ((Map) edgeMap).entrySet()) {
+                        final Map.Entry entry = (Map.Entry) o;
+                        Object          value = entry.getValue();
 
-                    if (CollectionUtils.isNotEmpty(vertices)) {
-                        AtlasEntityHeader prev = null;
-
-                        for (Object vertex : vertices) {
-                            if (!(vertex instanceof AtlasVertex)) {
-                                continue;
-                            }
-
-                            AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader((AtlasVertex) vertex);
-
-                            if (!entities.containsKey(entity.getGuid())) {
-                                entities.put(entity.getGuid(), entity);
-                            }
-
-                            if (prev != null) {
-                                if (direction.equals(LineageDirection.INPUT)) {
-                                    relations.add(new LineageRelation(entity.getGuid(), prev.getGuid()));
-                                } else if (direction.equals(LineageDirection.OUTPUT)) {
-                                    relations.add(new LineageRelation(prev.getGuid(), entity.getGuid()));
+                        if (value instanceof List) {
+                            for (Object elem : (List) value) {
+                                if (elem instanceof AtlasEdge) {
+                                    processEdge((AtlasEdge) elem, entities, relations);
+                                } else {
+                                    LOG.warn("Invalid value of type {} found, ignoring", (elem != null ? elem.getClass().getSimpleName() : "null"));
                                 }
                             }
-                            prev = entity;
+                        } else if (value instanceof AtlasEdge) {
+                            processEdge((AtlasEdge) value, entities, relations);
+                        } else {
+                            LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
                         }
                     }
                 }
@@ -220,6 +217,31 @@
         return new AtlasLineageInfo(guid, entities, relations, direction, depth);
     }
 
+    private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
+        AtlasVertex inVertex     = edge.getInVertex();
+        AtlasVertex outVertex    = edge.getOutVertex();
+        String      inGuid       = AtlasGraphUtilsV1.getIdFromVertex(inVertex);
+        String      outGuid      = AtlasGraphUtilsV1.getIdFromVertex(outVertex);
+        String      relationGuid = AtlasGraphUtilsV1.getProperty(edge, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
+        boolean     isInputEdge  = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+
+        if (!entities.containsKey(inGuid)) {
+            AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex);
+            entities.put(inGuid, entityHeader);
+        }
+
+        if (!entities.containsKey(outGuid)) {
+            AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex);
+            entities.put(outGuid, entityHeader);
+        }
+
+        if (isInputEdge) {
+            relations.add(new LineageRelation(inGuid, outGuid, relationGuid));
+        } else {
+            relations.add(new LineageRelation(outGuid, inGuid, relationGuid));
+        }
+    }
+
     private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException {
         AtlasLineageInfo inputLineage  = getLineageInfo(guid, LineageDirection.INPUT, depth);
         AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
@@ -236,10 +258,10 @@
         String lineageQuery = null;
 
         if (direction.equals(LineageDirection.INPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
+            lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
 
         } else if (direction.equals(LineageDirection.OUTPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
+            lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
         }
 
         return lineageQuery;
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 58e3492..8828a87 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -44,13 +44,9 @@
             case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
                 return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()";
             case FULL_LINEAGE:
-                return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s'))." +
-                        "emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
-                        "path().toList()";
+                return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
             case PARTIAL_LINEAGE:
-                return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s')).times(%s)." +
-                        "emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
-                        "path().toList()";
+                return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
             case TO_RANGE_LIST:
                 return ".range(startIdx, endIdx).toList()";
             case RELATIONSHIP_SEARCH: