ATLAS-1959: Enhance relationship attributes to support cardinality mappings

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/addons/models/0010-base_model.json b/addons/models/0010-base_model.json
index 303f379..20996a2 100644
--- a/addons/models/0010-base_model.json
+++ b/addons/models/0010-base_model.json
@@ -102,15 +102,15 @@
             "endDef1": {
                 "type": "DataSet",
                 "name": "sourceToProcesses",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SET"
             },
             "endDef2": {
                 "type": "Process",
                 "name": "inputs",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__Process.inputs"
+                "isLegacyAttribute": true
             },
             "propagateTags": "NONE"
         },
@@ -121,14 +121,14 @@
           "endDef1": {
                 "type": "Process",
                 "name": "outputs",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__Process.outputs"
+                "isLegacyAttribute": true
           },
           "endDef2": {
                 "type": "DataSet",
                 "name": "sinkFromProcesses",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SET"
           },
           "propagateTags": "NONE"
diff --git a/addons/models/0030-hive_model.json b/addons/models/0030-hive_model.json
index a795f0f..f47a7b9 100644
--- a/addons/models/0030-hive_model.json
+++ b/addons/models/0030-hive_model.json
@@ -530,15 +530,15 @@
             "endDef1": {
                 "type": "hive_db",
                 "name": "tables",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET"
             },
             "endDef2": {
                 "type": "hive_table",
                 "name": "db",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_table.db"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         },
@@ -549,16 +549,16 @@
             "endDef1": {
                 "type": "hive_table",
                 "name": "columns",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__hive_table.columns"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "hive_column",
                 "name": "table",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_column.table"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         },
@@ -569,16 +569,16 @@
             "endDef1": {
                 "type": "hive_table",
                 "name": "partitionKeys",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__hive_table.partitionKeys"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "hive_column",
                 "name": "table",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_column.table"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         },
@@ -589,16 +589,16 @@
             "endDef1": {
                 "type": "hive_table",
                 "name": "sd",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_table.sd"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "hive_storagedesc",
                 "name": "table",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_storagedesc.table"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         },
@@ -609,15 +609,15 @@
             "endDef1": {
                 "type": "hive_process",
                 "name": "columnLineages",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET"
             },
             "endDef2": {
                 "type": "hive_column_lineage",
                 "name": "query",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hive_column_lineage.query"
+                "isLegacyAttribute": true
             },
             "propagateTags": "NONE"
         }
diff --git a/addons/models/0050-falcon_model.json b/addons/models/0050-falcon_model.json
index 7755fa8..4fa4604 100644
--- a/addons/models/0050-falcon_model.json
+++ b/addons/models/0050-falcon_model.json
@@ -152,14 +152,14 @@
             "endDef1": {
                 "type": "falcon_feed",
                 "name": "stored-in",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__falcon_feed.stored-in"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "falcon_cluster",
                 "name": "feeds",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET"
             },
             "propagateTags": "NONE"
@@ -171,15 +171,15 @@
             "endDef1": {
                 "type": "falcon_cluster",
                 "name": "processes",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET"
             },
             "endDef2": {
                 "type": "falcon_process",
                 "name": "runs-on",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__falcon_process.runs-on"
+                "isLegacyAttribute": true
             },
             "propagateTags": "NONE"
         },
@@ -190,15 +190,15 @@
             "endDef1": {
                 "type": "falcon_cluster",
                 "name": "feedCreations",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET"
             },
             "endDef2": {
                 "type": "falcon_feed_creation",
                 "name": "stored-in",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__falcon_feed_creation.stored-in"
+                "isLegacyAttribute": true
             },
             "propagateTags": "NONE"
         }
diff --git a/addons/models/0060-hbase_model.json b/addons/models/0060-hbase_model.json
index 1d264df..3e46e06 100644
--- a/addons/models/0060-hbase_model.json
+++ b/addons/models/0060-hbase_model.json
@@ -105,16 +105,16 @@
             "endDef1": {
                 "type": "hbase_table",
                 "name": "column_families",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__hbase_table.column_families"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "hbase_column_family",
                 "name": "table",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hbase_column_family.table"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         },
@@ -125,16 +125,16 @@
             "endDef1": {
                 "type": "hbase_column_family",
                 "name": "columns",
-                "isContainer": "true",
+                "isContainer": true,
                 "cardinality": "SET",
-                "legacyLabel": "__hbase_column_family.columns"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "hbase_column",
                 "name": "column_family",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SINGLE",
-                "legacyLabel": "__hbase_column.column_family"
+                "isLegacyAttribute": true
             },
             "propagateTags": "ONE_TO_TWO"
         }
diff --git a/addons/models/0080-storm_model.json b/addons/models/0080-storm_model.json
index 25360ff..b008c7a 100644
--- a/addons/models/0080-storm_model.json
+++ b/addons/models/0080-storm_model.json
@@ -151,14 +151,14 @@
             "endDef1": {
                 "type": "storm_topology",
                 "name": "nodes",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SET",
-                "legacyLabel": "__storm_topology.nodes"
+                "isLegacyAttribute": true
             },
             "endDef2": {
                 "type": "storm_node",
                 "name": "topolgies",
-                "isContainer": "false",
+                "isContainer": false,
                 "cardinality": "SET"
             },
             "propagateTags": "NONE"
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 68da6af..365e548 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -177,7 +177,7 @@
         this.relationshipAttributes = relationshipAttributes;
     }
 
-    public void addRelationshipAttribute(String name, Object value) {
+    public void setRelationshipAttribute(String name, Object value) {
         Map<String, Object> r = this.relationshipAttributes;
 
         if (r != null) {
@@ -190,6 +190,18 @@
         }
     }
 
+    public Object getRelationshipAttribute(String name) {
+        Map<String, Object> a = this.relationshipAttributes;
+
+        return a != null ? a.get(name) : null;
+    }
+
+    public boolean hasRelationshipAttribute(String name) {
+        Map<String, Object> r = this.relationshipAttributes;
+
+        return r != null ? r.containsKey(name) : false;
+    }
+
     public List<AtlasClassification> getClassifications() { return classifications; }
 
     public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
index 2de9bdf..4188371 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
@@ -176,8 +176,6 @@
         return "-" + Long.toString(s_nextId.getAndIncrement());
     }
 
-    public String getRelationshipLabel() { return "r:" + super.getTypeName(); }
-
     private void init() {
         init(nextInternalId(), null, null, null, null, null, null, null, null, 0L);
     }
diff --git a/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipDef.java b/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipDef.java
index fc820d4..c17e875 100644
--- a/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipDef.java
+++ b/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipDef.java
@@ -219,6 +219,8 @@
         return this.endDef2;
     }
 
+    public String getRelationshipLabel() { return "r:" + super.getName(); }
+
     public AtlasRelationshipDef(AtlasRelationshipDef other) throws AtlasBaseException {
         super(other);
 
diff --git a/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipEndDef.java b/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipEndDef.java
index f80ea89..01e5ce7 100644
--- a/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipEndDef.java
+++ b/intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipEndDef.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.model.typedef;
 
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
-import org.apache.commons.lang.StringUtils;
 import org.codehaus.jackson.annotate.JsonAutoDetect;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -61,9 +60,9 @@
      */
     private Cardinality cardinality;
     /**
-     * legacy edge label name of the endpoint
+     * When set this indicates that this end is is a legacy attribute
      */
-    private String legacyLabel;
+    private boolean isLegacyAttribute;
 
     /**
      * Base constructor
@@ -97,15 +96,15 @@
      *   - whether the end is a container or not
      */
     public AtlasRelationshipEndDef(String typeName, String name, Cardinality cardinality, boolean isContainer) {
-        this(typeName, name, cardinality, isContainer, null);
+        this(typeName, name, cardinality, isContainer, false);
     }
 
-    public AtlasRelationshipEndDef(String typeName, String name, Cardinality cardinality, boolean isContainer, String legacyLabel) {
+    public AtlasRelationshipEndDef(String typeName, String name, Cardinality cardinality, boolean isContainer, boolean isLegacyAttribute) {
         setType(typeName);
         setName(name);
         setCardinality(cardinality);
         setIsContainer(isContainer);
-        setLegacyLabel(legacyLabel);
+        setIsLegacyAttribute(isLegacyAttribute);
     }
 
     /**
@@ -118,7 +117,7 @@
             setName(other.getName());
             setIsContainer(other.getIsContainer());
             setCardinality(other.getCardinality());
-            setLegacyLabel(other.getLegacyLabel());
+            setIsLegacyAttribute(other.isLegacyAttribute);
         }
     }
 
@@ -166,11 +165,9 @@
         return this.cardinality;
     }
 
-    public String getLegacyLabel() { return legacyLabel; }
+    public boolean getIsLegacyAttribute() { return isLegacyAttribute; }
 
-    public void setLegacyLabel(String legacyLabel) {  this.legacyLabel = legacyLabel; }
-
-    public boolean hasLegacyRelation() { return StringUtils.isNotEmpty(getLegacyLabel()) ? true : false; }
+    public void setIsLegacyAttribute(boolean legacyAttribute) { isLegacyAttribute = legacyAttribute; }
 
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
@@ -182,7 +179,7 @@
         sb.append(", name==>'").append(name).append('\'');
         sb.append(", isContainer==>'").append(isContainer).append('\'');
         sb.append(", cardinality==>'").append(cardinality).append('\'');
-        sb.append(", legacyLabel==>'").append(legacyLabel).append('\'');
+        sb.append(", isLegacyAttribute==>'").append(isLegacyAttribute).append('\'');
         sb.append('}');
 
         return sb;
@@ -200,12 +197,12 @@
                Objects.equals(name, that.name) &&
                isContainer == that.isContainer &&
                cardinality == that.cardinality &&
-               Objects.equals(legacyLabel, that.legacyLabel);
+               isLegacyAttribute == that.isLegacyAttribute;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(type, getName(), isContainer, cardinality, legacyLabel);
+        return Objects.hash(type, getName(), isContainer, cardinality, isLegacyAttribute);
     }
 
     @Override
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index e94dd19..e3005ee 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -191,6 +191,8 @@
 
     public Map<String, AtlasAttribute> getRelationshipAttributes() { return relationshipAttributes; }
 
+    public AtlasAttribute getRelationshipAttribute(String attributeName) { return relationshipAttributes.get(attributeName); }
+
     // this method should be called from AtlasRelationshipType.resolveReferencesPhase2()
     void addRelationshipAttribute(String attributeName, AtlasAttribute attribute) {
         relationshipAttributes.put(attributeName, attribute);
@@ -220,6 +222,16 @@
         return relationshipAttributes.containsKey(attributeName);
     }
 
+    public String getQualifiedAttributeName(String attrName) throws AtlasBaseException {
+        if (allAttributes.containsKey(attrName)) {
+            return allAttributes.get(attrName).getQualifiedName();
+        } else if (relationshipAttributes.containsKey(attrName)) {
+            return relationshipAttributes.get(attrName).getQualifiedName();
+        }
+
+        throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entityDef.getName());
+    }
+
     @Override
     public AtlasEntity createDefaultValue() {
         AtlasEntity ret = new AtlasEntity(entityDef.getName());
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 841b66f..934dffc 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -30,6 +30,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+
 /**
  * class that implements behaviour of an relationship-type.
  */
@@ -97,19 +101,41 @@
         // if legacyLabel is not specified at both ends, use relationshipDef name as relationship label.
         // if legacyLabel is specified in any one end, use it as the relationship label for both ends (legacy case).
         // if legacyLabel is specified at both ends use the respective end's legacyLabel as relationship label (legacy case).
-        if (!endDef1.hasLegacyRelation() && !endDef2.hasLegacyRelation()) {
-            relationshipLabel = relationshipDef.getName();
-
-        } else if (endDef1.hasLegacyRelation() && !endDef2.hasLegacyRelation()) {
-            relationshipLabel = endDef1.getLegacyLabel();
-
-        } else if (!endDef1.hasLegacyRelation() && endDef2.hasLegacyRelation()) {
-            relationshipLabel = endDef2.getLegacyLabel();
+        if (!endDef1.getIsLegacyAttribute() && !endDef2.getIsLegacyAttribute()) {
+            relationshipLabel = relationshipDef.getRelationshipLabel();
+        } else if (endDef1.getIsLegacyAttribute() && !endDef2.getIsLegacyAttribute()) {
+            relationshipLabel = getLegacyEdgeLabel(end1Type, endDef1.getName());
+        } else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
+            relationshipLabel = getLegacyEdgeLabel(end2Type, endDef2.getName());
         }
 
         addRelationshipAttributeToEndType(endDef1, end1Type, end2Type.getTypeName(), typeRegistry, relationshipLabel);
 
         addRelationshipAttributeToEndType(endDef2, end2Type, end1Type.getTypeName(), typeRegistry, relationshipLabel);
+
+        // add relationship edge direction information
+        addRelationshipEdgeDirection();
+    }
+
+    private void addRelationshipEdgeDirection() {
+        AtlasRelationshipEndDef endDef1       = relationshipDef.getEndDef1();
+        AtlasRelationshipEndDef endDef2       = relationshipDef.getEndDef2();
+        AtlasAttribute          end1Attribute = end1Type.getRelationshipAttribute(endDef1.getName());
+        AtlasAttribute          end2Attribute = end2Type.getRelationshipAttribute(endDef2.getName());
+
+        //default relationship edge direction is end1 (out) -> end2 (in)
+        AtlasRelationshipEdgeDirection end1Direction = OUT;
+        AtlasRelationshipEdgeDirection end2Direction = IN;
+
+        if (endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
+            end2Direction = OUT;
+        } else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
+            end1Direction = IN;
+            end2Direction = OUT;
+        }
+
+        end1Attribute.setRelationshipEdgeDirection(end1Direction);
+        end2Attribute.setRelationshipEdgeDirection(end2Direction);
     }
 
     @Override
@@ -229,7 +255,7 @@
         // if relationshipLabel is null, then legacyLabel is mentioned at both ends,
         // use the respective end's legacyLabel as relationshipLabel
         if (relationshipLabel == null) {
-            relationshipLabel = endDef.getLegacyLabel();
+            relationshipLabel = getLegacyEdgeLabel(entityType, attrName);
         }
 
         if (attribute == null) { //attr doesn't exist in type - is a new relationship attribute
@@ -251,4 +277,15 @@
 
         entityType.addRelationshipAttributeType(attrName, this);
     }
+
+    private String getLegacyEdgeLabel(AtlasEntityType entityType, String attributeName) {
+        String         ret       = null;
+        AtlasAttribute attribute = entityType.getAttribute(attributeName);
+
+        if (attribute != null) {
+            ret = "__" + attribute.getQualifiedName();
+        }
+
+        return ret;
+    }
 }
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index f97d767..b390a97 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -601,15 +601,16 @@
     }
 
     public static class AtlasAttribute {
-        private final AtlasStructType   definedInType;
-        private final AtlasType         attributeType;
-        private final AtlasAttributeDef attributeDef;
-        private final String            qualifiedName;
-        private final String            vertexPropertyName;
-        private final boolean           isOwnedRef;
-        private final String            inverseRefAttributeName;
-        private AtlasAttribute          inverseRefAttribute;
-        private String                  relationshipEdgeLabel;
+        private final AtlasStructType          definedInType;
+        private final AtlasType                attributeType;
+        private final AtlasAttributeDef        attributeDef;
+        private final String                   qualifiedName;
+        private final String                   vertexPropertyName;
+        private final boolean                  isOwnedRef;
+        private final String                   inverseRefAttributeName;
+        private AtlasAttribute                 inverseRefAttribute;
+        private String                         relationshipEdgeLabel;
+        private AtlasRelationshipEdgeDirection relationshipEdgeDirection;
 
         public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipLabel) {
             this.definedInType            = definedInType;
@@ -637,8 +638,9 @@
                 }
             }
 
-            this.isOwnedRef              = isOwnedRef;
-            this.inverseRefAttributeName = inverseRefAttribute;
+            this.isOwnedRef                = isOwnedRef;
+            this.inverseRefAttributeName   = inverseRefAttribute;
+            this.relationshipEdgeDirection = AtlasRelationshipEdgeDirection.OUT;
         }
 
         public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType) {
@@ -677,6 +679,12 @@
 
         public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { this.relationshipEdgeLabel = relationshipEdgeLabel; }
 
+        public AtlasRelationshipEdgeDirection getRelationshipEdgeDirection() { return relationshipEdgeDirection; }
+
+        public void setRelationshipEdgeDirection(AtlasRelationshipEdgeDirection relationshipEdgeDirection) {
+            this.relationshipEdgeDirection = relationshipEdgeDirection;
+        }
+
         public static String getEdgeLabel(String property) {
             return "__" + property;
         }
@@ -721,5 +729,7 @@
                 new String[] { "$",  "_d" },
                 new String[] { "%", "_p"  },
         };
+
+        public enum AtlasRelationshipEdgeDirection { IN, OUT }
     }
 }
diff --git a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
new file mode 100755
index 0000000..98be2b8
--- /dev/null
+++ b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
+import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.lang.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.getArrayTypeName;
+import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.getMapTypeName;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory.AGGREGATION;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory.ASSOCIATION;
+import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
+import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE;
+import static org.apache.atlas.type.AtlasTypeUtil.createClassTypeDef;
+import static org.apache.atlas.type.AtlasTypeUtil.createOptionalAttrDef;
+import static org.apache.atlas.type.AtlasTypeUtil.createRequiredAttrDef;
+import static org.apache.atlas.type.AtlasTypeUtil.createStructTypeDef;
+import static org.apache.atlas.type.AtlasTypeUtil.createTraitTypeDef;
+import static org.apache.atlas.type.AtlasTypeUtil.createUniqueRequiredAttrDef;
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+/**
+ * Test utility class for relationship.
+ */
+public final class TestRelationshipUtilsV2 {
+
+    public static final String ORG_LEVEL_TYPE           = "OrgLevel";
+    public static final String SECURITY_CLEARANCE_TYPE  = "SecurityClearance";
+    public static final String ADDRESS_TYPE             = "Address";
+    public static final String PERSON_TYPE              = "Person";
+    public static final String MANAGER_TYPE             = "Manager";
+    public static final String DEPARTMENT_TYPE          = "Department";
+    public static final String EMPLOYEE_TYPE            = "Employee";
+    public static final String EMPLOYEE_DEPARTMENT_TYPE = "EmployeeDepartment";
+    public static final String EMPLOYEE_MANAGER_TYPE    = "EmployeeManager";
+    public static final String EMPLOYEE_MENTOR_TYPE     = "EmployeeMentor";
+    public static final String TYPE_A                   = "A";
+    public static final String TYPE_B                   = "B";
+    public static final String DEFAULT_VERSION          = "1.0";
+
+
+    private TestRelationshipUtilsV2() { }
+
+    public static AtlasTypesDef getDepartmentEmployeeTypes() throws AtlasBaseException {
+
+        /******* Person Type *******/
+        AtlasEntityDef personType = createClassTypeDef(PERSON_TYPE, description(PERSON_TYPE), superType(null),
+                                                        createUniqueRequiredAttrDef("name", "string"),
+                                                        createOptionalAttrDef("address", ADDRESS_TYPE),
+                                                        createOptionalAttrDef("birthday", "date"),
+                                                        createOptionalAttrDef("hasPets", "boolean"),
+                                                        createOptionalAttrDef("numberOfCars", "byte"),
+                                                        createOptionalAttrDef("houseNumber", "short"),
+                                                        createOptionalAttrDef("carMileage", "int"),
+                                                        createOptionalAttrDef("age", "float"),
+                                                        createOptionalAttrDef("numberOfStarsEstimate", "biginteger"),
+                                                        createOptionalAttrDef("approximationOfPi", "bigdecimal"));
+        /******* Employee Type *******/
+        AtlasEntityDef employeeType = createClassTypeDef(EMPLOYEE_TYPE, description(EMPLOYEE_TYPE), superType(PERSON_TYPE),
+                                                        createOptionalAttrDef("orgLevel", ORG_LEVEL_TYPE),
+                                                        createOptionalAttrDef("shares", "long"),
+                                                        createOptionalAttrDef("salary", "double"));
+        /******* Department Type *******/
+        AtlasEntityDef departmentType = createClassTypeDef(DEPARTMENT_TYPE, description(DEPARTMENT_TYPE), superType(null),
+                                                        createUniqueRequiredAttrDef("name", "string"));
+        /******* Manager Type *******/
+        AtlasEntityDef managerType = createClassTypeDef(MANAGER_TYPE, description(MANAGER_TYPE), superType(EMPLOYEE_TYPE));
+        /******* Address Type *******/
+        AtlasStructDef addressType = createStructTypeDef(ADDRESS_TYPE, description(ADDRESS_TYPE),
+                                                        createRequiredAttrDef("street", "string"),
+                                                        createRequiredAttrDef("city", "string"));
+        /******* Organization Level Type *******/
+        AtlasEnumDef orgLevelType = new AtlasEnumDef(ORG_LEVEL_TYPE, description(ORG_LEVEL_TYPE), DEFAULT_VERSION,
+                                                        getOrgLevelElements());
+
+        /******* Security Clearance Type *******/
+        AtlasClassificationDef securityClearanceType = createTraitTypeDef(SECURITY_CLEARANCE_TYPE, description(SECURITY_CLEARANCE_TYPE),
+                                                        superType(null), createRequiredAttrDef("level", "int"));
+
+        /******* [Department -> Employee] Relationship *******/
+        AtlasRelationshipDef employeeDepartmentType = new AtlasRelationshipDef(EMPLOYEE_DEPARTMENT_TYPE, description(EMPLOYEE_DEPARTMENT_TYPE),
+                                                        DEFAULT_VERSION, AGGREGATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(EMPLOYEE_TYPE, "department", SINGLE),
+                                                        new AtlasRelationshipEndDef(DEPARTMENT_TYPE, "employees", SET, true));
+        /******* [Manager -> Employee] Relationship *******/
+        AtlasRelationshipDef employeeManagerType    = new AtlasRelationshipDef(EMPLOYEE_MANAGER_TYPE, description(EMPLOYEE_MANAGER_TYPE),
+                                                        DEFAULT_VERSION, AGGREGATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(EMPLOYEE_TYPE, "manager", SINGLE),
+                                                        new AtlasRelationshipEndDef(MANAGER_TYPE, "subordinates", SET, true));
+
+        /******* [Mentor -> Employee] Relationship *******/
+        AtlasRelationshipDef employeeMentorType     = new AtlasRelationshipDef(EMPLOYEE_MENTOR_TYPE, description(EMPLOYEE_MENTOR_TYPE),
+                                                        DEFAULT_VERSION, ASSOCIATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(EMPLOYEE_TYPE, "mentor", SINGLE),
+                                                        new AtlasRelationshipEndDef(EMPLOYEE_TYPE, "mentees", SET));
+
+        return new AtlasTypesDef(ImmutableList.of(orgLevelType),
+                                 ImmutableList.of(addressType),
+                                 ImmutableList.of(securityClearanceType),
+                                 ImmutableList.of(personType, employeeType, departmentType, managerType),
+                                 ImmutableList.of(employeeDepartmentType, employeeManagerType, employeeMentorType));
+    }
+
+    public static AtlasEntitiesWithExtInfo getDepartmentEmployeeInstances() {
+        AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
+
+        /******* Department - HR *******/
+        AtlasEntity hrDept = new AtlasEntity(DEPARTMENT_TYPE, "name", "hr");
+
+        /******* Address *******/
+        AtlasStruct janeAddr = new AtlasStruct(ADDRESS_TYPE);
+            janeAddr.setAttribute("street", "Great America Parkway");
+            janeAddr.setAttribute("city", "Santa Clara");
+
+        AtlasStruct juliusAddr = new AtlasStruct(ADDRESS_TYPE);
+            juliusAddr.setAttribute("street", "Madison Ave");
+            juliusAddr.setAttribute("city", "Newtonville");
+
+        AtlasStruct maxAddr = new AtlasStruct(ADDRESS_TYPE);
+            maxAddr.setAttribute("street", "Ripley St");
+            maxAddr.setAttribute("city", "Newton");
+
+        AtlasStruct johnAddr = new AtlasStruct(ADDRESS_TYPE);
+            johnAddr.setAttribute("street", "Stewart Drive");
+            johnAddr.setAttribute("city", "Sunnyvale");
+
+        /******* Manager - Jane (John and Max subordinates) *******/
+        AtlasEntity jane = new AtlasEntity(MANAGER_TYPE);
+            jane.setAttribute("name", "Jane");
+            jane.setRelationshipAttribute("department", getAtlasObjectId(hrDept));
+            jane.setAttribute("address", janeAddr);
+
+        /******* Manager - Julius (no subordinates) *******/
+        AtlasEntity julius = new AtlasEntity(MANAGER_TYPE);
+            julius.setAttribute("name", "Julius");
+            julius.setRelationshipAttribute("department", getAtlasObjectId(hrDept));
+            julius.setAttribute("address", juliusAddr);
+
+        /******* Employee - Max (Manager: Jane, Mentor: Julius) *******/
+        AtlasEntity max = new AtlasEntity(EMPLOYEE_TYPE);
+            max.setAttribute("name", "Max");
+            max.setRelationshipAttribute("department", getAtlasObjectId(hrDept));
+            max.setAttribute("address", maxAddr);
+            max.setRelationshipAttribute("manager", getAtlasObjectId(jane));
+            max.setRelationshipAttribute("mentor", getAtlasObjectId(julius));
+            max.setAttribute("birthday",new Date(1979, 3, 15));
+            max.setAttribute("hasPets", true);
+            max.setAttribute("age", 36);
+            max.setAttribute("numberOfCars", 2);
+            max.setAttribute("houseNumber", 17);
+            max.setAttribute("carMileage", 13);
+            max.setAttribute("shares", Long.MAX_VALUE);
+            max.setAttribute("salary", Double.MAX_VALUE);
+            max.setAttribute("numberOfStarsEstimate", new BigInteger("1000000000000000000000000000000"));
+            max.setAttribute("approximationOfPi", new BigDecimal("3.1415926535897932"));
+
+        /******* Employee - John (Manager: Jane, Mentor: Max) *******/
+        AtlasEntity john = new AtlasEntity(EMPLOYEE_TYPE);
+            john.setAttribute("name", "John");
+            john.setRelationshipAttribute("department", getAtlasObjectId(hrDept));
+            john.setAttribute("address", johnAddr);
+            john.setRelationshipAttribute("manager", getAtlasObjectId(jane));
+            john.setRelationshipAttribute("mentor", getAtlasObjectId(max));
+            john.setAttribute("birthday",new Date(1950, 5, 15));
+            john.setAttribute("hasPets", true);
+            john.setAttribute("numberOfCars", 1);
+            john.setAttribute("houseNumber", 153);
+            john.setAttribute("carMileage", 13364);
+            john.setAttribute("shares", 15000);
+            john.setAttribute("salary", 123345.678);
+            john.setAttribute("age", 50);
+            john.setAttribute("numberOfStarsEstimate", new BigInteger("1000000000000000000000"));
+            john.setAttribute("approximationOfPi", new BigDecimal("3.141592653589793238462643383279502884197169399375105820974944592307816406286"));
+
+        ret.addEntity(hrDept);
+        ret.addEntity(jane);
+        ret.addEntity(julius);
+        ret.addEntity(max);
+        ret.addEntity(john);
+
+        return ret;
+    }
+
+    public static AtlasTypesDef getInverseReferenceTestTypes() throws AtlasBaseException {
+        AtlasEntityDef aType = createClassTypeDef(TYPE_A, superType(null), createUniqueRequiredAttrDef("name", "string"));
+        AtlasEntityDef bType = createClassTypeDef(TYPE_B, superType(null), createUniqueRequiredAttrDef("name", "string"));
+
+        AtlasRelationshipDef relationshipType1 = new AtlasRelationshipDef("TypeA_to_TypeB_on_b", description("TypeA_to_TypeB_on_b"),
+                                                        DEFAULT_VERSION, ASSOCIATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(TYPE_A, "b", SINGLE),
+                                                        new AtlasRelationshipEndDef(TYPE_B, "a", SINGLE));
+
+        AtlasRelationshipDef relationshipType2 = new AtlasRelationshipDef("TypeA_to_TypeB_on_oneB", description("TypeA_to_TypeB_on_oneB"),
+                                                        DEFAULT_VERSION, ASSOCIATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(TYPE_A, "oneB", SINGLE),
+                                                        new AtlasRelationshipEndDef(TYPE_B, "manyA", SET));
+
+        AtlasRelationshipDef relationshipType3 = new AtlasRelationshipDef("TypeA_to_TypeB_on_manyB", description("TypeA_to_TypeB_on_manyB"),
+                                                        DEFAULT_VERSION, ASSOCIATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(TYPE_A, "manyB", SET),
+                                                        new AtlasRelationshipEndDef(TYPE_B, "manyToManyA", SET));
+
+        AtlasRelationshipDef relationshipType4 = new AtlasRelationshipDef("TypeB_to_TypeA_on_mappedFromA", description("TypeB_to_TypeA_on_mappedFromA"),
+                                                        DEFAULT_VERSION, ASSOCIATION, ONE_TO_TWO,
+                                                        new AtlasRelationshipEndDef(TYPE_B, "mappedFromA", SINGLE),
+                                                        new AtlasRelationshipEndDef(TYPE_A, "mapToB", SET));
+
+        return new AtlasTypesDef(ImmutableList.<AtlasEnumDef>of(), ImmutableList.<AtlasStructDef>of(),
+                                 ImmutableList.<AtlasClassificationDef>of(),  ImmutableList.of(aType, bType),
+                                 ImmutableList.of(relationshipType1, relationshipType2, relationshipType3, relationshipType4));
+    }
+
+    private static List<AtlasEnumElementDef> getOrgLevelElements() {
+        return Arrays.asList(
+                new AtlasEnumElementDef("L1", description("L1"), 1),
+                new AtlasEnumElementDef("L2", description("L2"), 2),
+                new AtlasEnumElementDef("L3", description("L3"), 3)
+        );
+    }
+
+    private static String description(String typeName) {
+        return typeName + " description";
+    }
+
+    private static ImmutableSet<String> superType(String superTypeName) {
+        return StringUtils.isNotEmpty(superTypeName) ? ImmutableSet.of(superTypeName) : ImmutableSet.<String>of();
+    }
+}
\ No newline at end of file
diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
index 9774583..14614f1 100755
--- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
@@ -35,7 +35,6 @@
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.lang.RandomStringUtils;
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 6f6d74b..c47a89e 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -26,8 +26,10 @@
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
@@ -39,6 +41,7 @@
 import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.ITypedInstance;
@@ -344,12 +347,46 @@
         return null;
     }
 
+    public Iterator<AtlasEdge> getIncomingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
+        return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
+    }
+
     public Iterator<AtlasEdge> getOutGoingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
         return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel);
     }
 
-    public Iterator<AtlasEdge> getBothEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
-        return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.BOTH, edgeLabel);
+    public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) {
+        AtlasEdge ret;
+
+        switch (edgeDirection) {
+            case IN:
+                ret = getEdgeForLabel(vertex, edgeLabel, AtlasEdgeDirection.IN);
+                break;
+
+            case OUT:
+            default:
+                ret = getEdgeForLabel(vertex, edgeLabel, AtlasEdgeDirection.OUT);
+                break;
+        }
+
+        return ret;
+    }
+
+    public Iterator<AtlasEdge> getEdgesForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) {
+        Iterator<AtlasEdge> ret;
+
+        switch (edgeDirection) {
+            case IN:
+                ret = getIncomingEdgesByLabel(vertex, edgeLabel);
+                break;
+
+            case OUT:
+            default:
+                ret = getOutGoingEdgesByLabel(vertex, edgeLabel);
+                break;
+        }
+
+        return ret;
     }
 
     /**
@@ -360,7 +397,11 @@
      * @return
      */
     public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel) {
-        Iterator<AtlasEdge> iterator = getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.OUT, edgeLabel);
+        return getEdgeForLabel(vertex, edgeLabel, AtlasEdgeDirection.OUT);
+    }
+
+    public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasEdgeDirection edgeDirection) {
+        Iterator<AtlasEdge> iterator = getAdjacentEdgesByLabel(vertex, edgeDirection, edgeLabel);
         AtlasEdge latestDeletedEdge = null;
         long latestDeletedEdgeTime = Long.MIN_VALUE;
 
@@ -1280,4 +1321,43 @@
 
         return ret;
     }
+
+    public static boolean isRelationshipEdge(AtlasEdge edge) {
+        if (edge == null) {
+            return false;
+        }
+
+        String edgeLabel = edge.getLabel();
+
+        return StringUtils.isNotEmpty(edge.getLabel()) ? edgeLabel.startsWith("r:") : false;
+    }
+
+    public static AtlasObjectId getReferenceObjectId(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection) {
+        AtlasObjectId ret = null;
+
+        if (relationshipDirection == AtlasRelationshipEdgeDirection.OUT) {
+            ret = new AtlasObjectId(getGuid(edge.getInVertex()), getTypeName(edge.getInVertex()));
+
+        } else if (relationshipDirection == AtlasRelationshipEdgeDirection.IN) {
+            ret = new AtlasObjectId(getGuid(edge.getOutVertex()), getTypeName(edge.getOutVertex()));
+        }
+
+        return ret;
+    }
+
+    public static AtlasObjectId getCurrentObjectId(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection) {
+        String typeName = null;
+        String guid     = null;
+
+        if (relationshipDirection == AtlasRelationshipEdgeDirection.OUT) {
+            typeName = GraphHelper.getTypeName(edge.getOutVertex());
+            guid     = GraphHelper.getGuid(edge.getOutVertex());
+
+        } else if (relationshipDirection == AtlasRelationshipEdgeDirection.IN) {
+            typeName = GraphHelper.getTypeName(edge.getInVertex());
+            guid     = GraphHelper.getGuid(edge.getInVertex());
+        }
+
+        return new AtlasObjectId(guid, typeName);
+    }
 }
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 12e8bb1..0210a11 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -286,7 +286,7 @@
             return;
         }
 
-        AtlasStruct struct;
+        final AtlasStruct struct;
 
         if (val instanceof AtlasStruct) {
             struct = (AtlasStruct) val;
@@ -298,6 +298,10 @@
             throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString());
         }
 
+        visitStruct(structType, struct);
+    }
+
+    void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException {
         for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
             AtlasType attrType = attribute.getAttributeType();
             Object    attrVal  = struct.getAttribute(attribute.getName());
@@ -306,6 +310,16 @@
         }
     }
 
+    void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
+        visitStruct(entityType, entity);
+
+        for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+            AtlasType attrType = attribute.getAttributeType();
+            Object    attrVal  = entity.getRelationshipAttribute(attribute.getName());
+
+            visitAttribute(attrType, attrVal);
+        }
+    }
 
     void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
         if (entity == null) {
@@ -316,7 +330,7 @@
 
         recordObjectReference(entity.getGuid());
 
-        visitStruct(type, entity);
+        visitEntity(type, entity);
     }
 
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 43f2c55..948d9dd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -94,8 +94,8 @@
 
     public static String getQualifiedAttributePropertyKey(AtlasStructType fromType, String attributeName) throws AtlasBaseException {
         switch (fromType.getTypeCategory()) {
-         case STRUCT:
          case ENTITY:
+         case STRUCT:
          case CLASSIFICATION:
              return fromType.getQualifiedAttributeName(attributeName);
         default:
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
index 3ff6fbe..49e08a0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
@@ -29,9 +29,11 @@
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.GremlinVersion;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -78,35 +80,7 @@
 
         AtlasVertex       end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
         AtlasVertex       end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
-        AtlasRelationship ret;
-
-        // create relationship between two vertex
-        try {
-            AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
-
-            if (relationshipEdge == null) {
-                relationshipEdge = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
-
-                AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
-
-                if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
-                    for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
-                        String attrName  = attr.getName();
-                        Object attrValue = relationship.getAttribute(attrName);
-
-                        AtlasGraphUtilsV1.setProperty(relationshipEdge, attr.getVertexPropertyName(), attrValue);
-                    }
-                }
-
-                ret = mapEdgeToAtlasRelationship(relationshipEdge);
-
-            } else {
-                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
-                                             relationship.getEnd1().getGuid(), relationship.getEnd2().getGuid());
-            }
-        } catch (RepositoryException e) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
-        }
+        AtlasRelationship ret        = createRelationship(relationship, end1Vertex, end2Vertex);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== create({}): {}", relationship, ret);
@@ -117,31 +91,6 @@
 
     @Override
     @GraphTransaction
-    public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getOrCreate({})", relationship);
-        }
-
-        validateRelationship(relationship);
-
-        AtlasVertex       end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
-        AtlasVertex       end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
-        AtlasRelationship ret;
-
-        // check if relationship exists
-        AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
-
-        ret = (relationshipEdge != null) ? mapEdgeToAtlasRelationship(relationshipEdge) : create(relationship);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getOrCreate({}): {}", relationship, ret);
-        }
-
-        return ret;
-    }
-
-    @Override
-    @GraphTransaction
     public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> update({})", relationship);
@@ -196,6 +145,70 @@
         }
     }
 
+    public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getOrCreate({})", relationship);
+        }
+
+        validateRelationship(relationship);
+
+        AtlasVertex       end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex       end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+        AtlasRelationship ret;
+
+        // check if relationship exists
+        AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
+
+        if (relationshipEdge != null) {
+            ret = mapEdgeToAtlasRelationship(relationshipEdge);
+
+        } else {
+            validateRelationship(relationship);
+            ret = createRelationship(relationship, end1Vertex, end2Vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getOrCreate({}): {}", relationship, ret);
+        }
+
+        return ret;
+    }
+
+    private AtlasRelationship createRelationship(AtlasRelationship relationship, AtlasVertex end1Vertex, AtlasVertex end2Vertex)
+                                                 throws AtlasBaseException {
+        AtlasRelationship ret;
+
+        try {
+            AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
+
+            if (relationshipEdge == null) {
+                relationshipEdge = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
+
+                AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+                if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+                    for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
+                        String attrName           = attr.getName();
+                        String attrVertexProperty = attr.getVertexPropertyName();
+                        Object attrValue          = relationship.getAttribute(attrName);
+
+                        AtlasGraphUtilsV1.setProperty(relationshipEdge, attrVertexProperty, attrValue);
+                    }
+                }
+
+                ret = mapEdgeToAtlasRelationship(relationshipEdge);
+
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
+                                             relationship.getEnd1().getGuid(), relationship.getEnd2().getGuid());
+            }
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        return ret;
+    }
+
     private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
         if (relationship == null) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
@@ -229,6 +242,7 @@
         }
 
         validateEnd(relationship.getEnd1());
+
         validateEnd(relationship.getEnd2());
 
         validateAndNormalize(relationship);
@@ -273,7 +287,7 @@
         type.getNormalizedValue(relationship);
     }
 
-    private AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
+    public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
         String    relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship);
         AtlasEdge ret               = graphHelper.getEdgeForLabel(fromVertex, relationshipLabel);
 
@@ -331,31 +345,29 @@
     }
 
     private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
-
-        String                  ret                = relationship.getRelationshipLabel();
         AtlasRelationshipType   relationshipType   = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+        String                  ret                = relationshipType.getRelationshipDef().getRelationshipLabel();
         AtlasRelationshipEndDef endDef1            = relationshipType.getRelationshipDef().getEndDef1();
         AtlasRelationshipEndDef endDef2            = relationshipType.getRelationshipDef().getEndDef2();
         Set<String>             fromVertexTypes    = getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(fromVertex));
         Set<String>             toVertexTypes      = getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(toVertex));
+        AtlasAttribute          attribute          = null;
 
         // validate entity type and all its supertypes contains relationshipDefs end type
-        // e.g. [ hive_process -> hive_table] -> [ Process -> DataSet ]
+        // e.g. [hive_process -> hive_table] -> [Process -> DataSet]
         if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) {
-            String         attributeName = endDef1.getName();
-            AtlasAttribute endAttribute  = relationshipType.getEnd1Type().getAttribute(attributeName);
+            String attributeName = endDef1.getName();
 
-            if (endAttribute != null) {
-                ret = endAttribute.getRelationshipEdgeLabel();
-            }
+            attribute = relationshipType.getEnd1Type().getRelationshipAttribute(attributeName);
 
         } else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
-            String         attributeName = endDef2.getName();
-            AtlasAttribute endAttribute  = relationshipType.getEnd2Type().getAttribute(attributeName);
+            String attributeName = endDef2.getName();
 
-            if (endAttribute != null) {
-                ret = endAttribute.getRelationshipEdgeLabel();
-            }
+            attribute = relationshipType.getEnd2Type().getRelationshipAttribute(attributeName);
+        }
+
+        if (attribute != null) {
+            ret = attribute.getRelationshipEdgeLabel();
         }
 
         return ret;
@@ -391,7 +403,13 @@
         relationship.setCreateTime(new Date(GraphHelper.getCreatedTime(edge)));
         relationship.setUpdateTime(new Date(GraphHelper.getModifiedTime(edge)));
 
-        relationship.setVersion(GraphHelper.getVersion(edge).longValue());
+        Integer version = GraphHelper.getVersion(edge);
+
+        if (version == null) {
+            version = Integer.valueOf(1);
+        }
+
+        relationship.setVersion(version.longValue());
         relationship.setStatus(GraphHelper.getEdgeStatus(edge));
 
         AtlasVertex end1Vertex = edge.getOutVertex();
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index d4fdc25..4271376 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -37,6 +37,7 @@
 import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
@@ -53,6 +54,8 @@
 import java.util.Stack;
 
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.repository.graph.GraphHelper.getReferenceObjectId;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
 
 public abstract class DeleteHandlerV1 {
@@ -206,7 +209,6 @@
         return result;
     }
 
-
     /**
      * Force delete is used to remove struct/trait in case of entity updates
      * @param edge
@@ -217,13 +219,20 @@
      * @throws AtlasException
      */
     public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, boolean isOwned,
-        boolean forceDeleteStructTrait) throws AtlasBaseException {
+                                       boolean forceDeleteStructTrait) throws AtlasBaseException {
+
+        // default edge direction is outward
+        return deleteEdgeReference(edge, typeCategory, isOwned, forceDeleteStructTrait, AtlasRelationshipEdgeDirection.OUT);
+    }
+
+    public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, boolean isOwned, boolean forceDeleteStructTrait,
+                                       AtlasRelationshipEdgeDirection relationshipDirection) throws AtlasBaseException {
         LOG.debug("Deleting {}", string(edge));
         boolean forceDelete =
-            (typeCategory == TypeCategory.STRUCT || typeCategory == TypeCategory.CLASSIFICATION) && forceDeleteStructTrait;
+                (typeCategory == TypeCategory.STRUCT || typeCategory == TypeCategory.CLASSIFICATION) && forceDeleteStructTrait;
 
         if (typeCategory == TypeCategory.STRUCT || typeCategory == TypeCategory.CLASSIFICATION
-            || (typeCategory == TypeCategory.OBJECT_ID_TYPE && isOwned)) {
+                || (typeCategory == TypeCategory.OBJECT_ID_TYPE && isOwned)) {
             //If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities.
             //If the vertex is of type class, and its composite attribute, this reference vertex' lifecycle is controlled
             //through this delete, hence delete the edge and the reference vertex.
@@ -236,9 +245,19 @@
             //If the vertex is of type class, and its not a composite attributes, the reference AtlasVertex' lifecycle is not controlled
             //through this delete. Hence just remove the reference edge. Leave the reference AtlasVertex as is
 
-            //If deleting just the edge, reverse attribute should be updated for any references
-            //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
-            deleteEdge(edge, true, false);
+            // for relationship edges, inverse vertex's relationship attribute doesn't need to be updated.
+            // only delete the reference relationship edge
+            if (isRelationshipEdge(edge)) {
+                deleteEdge(edge, false);
+
+                AtlasObjectId deletedReferenceObjectId = getReferenceObjectId(edge, relationshipDirection);
+                RequestContextV1.get().recordEntityUpdate(deletedReferenceObjectId);
+            } else {
+                //legacy case - not a relationship edge
+                //If deleting just the edge, reverse attribute should be updated for any references
+                //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
+                deleteEdge(edge, true, false);
+            }
         }
         return !softDelete || forceDelete;
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 157f8cd..1282be5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -45,6 +45,7 @@
 import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
@@ -63,7 +64,11 @@
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
+import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 
 @Component
 public class EntityGraphMapper {
@@ -144,6 +149,8 @@
 
                 mapAttributes(createdEntity, vertex, CREATE, context);
 
+                mapRelationshipAttributes(createdEntity, vertex, CREATE, context);
+
                 resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
                 addClassifications(context, guid, createdEntity.getClassifications());
             }
@@ -157,6 +164,8 @@
 
                 mapAttributes(updatedEntity, vertex, UPDATE, context);
 
+                mapRelationshipAttributes(updatedEntity, vertex, UPDATE, context);
+
                 if (isPartialUpdate) {
                     resp.addEntity(PARTIAL_UPDATE, constructHeader(updatedEntity, entityType, vertex));
                 } else {
@@ -238,6 +247,7 @@
 
                     mapAttribute(attribute, attrValue, vertex, op, context);
                 }
+
             } else if (op.equals(UPDATE)) {
                 for (String attrName : struct.getAttributes().keySet()) {
                     AtlasAttribute attribute = structType.getAttribute(attrName);
@@ -260,6 +270,41 @@
         }
     }
 
+    private void mapRelationshipAttributes(AtlasEntity entity, AtlasVertex vertex, EntityOperation op,
+                                           EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapRelationshipAttributes({}, {})", op, entity.getTypeName());
+        }
+
+        if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+            AtlasEntityType entityType = getEntityType(entity.getTypeName());
+
+            if (op.equals(CREATE)) {
+                for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+                    Object attrValue = entity.getRelationshipAttribute(attribute.getName());
+
+                    mapAttribute(attribute, attrValue, vertex, op, context);
+                }
+
+            } else if (op.equals(UPDATE)) {
+                // relationship attributes mapping
+                for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+                    if (attribute != null && entity.hasRelationshipAttribute(attribute.getName())) {
+                        Object attrValue = entity.getRelationshipAttribute(attribute.getName());
+
+                        mapAttribute(attribute, attrValue, vertex, op, context);
+                    }
+                }
+            }
+
+            updateModificationMetadata(vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapRelationshipAttributes({}, {})", op, entity.getTypeName());
+        }
+    }
+
     private void mapAttribute(AtlasAttribute attribute, Object attrValue, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
         if (attrValue == null) {
             AtlasAttributeDef attributeDef = attribute.getAttributeDef();
@@ -309,14 +354,15 @@
             }
 
             case OBJECT_ID_TYPE: {
-                String edgeLabel = ctx.getAttribute().getRelationshipEdgeLabel();
+                String edgeLabel     = ctx.getAttribute().getRelationshipEdgeLabel();
+                AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
 
                 // legacy case - if relationship attribute doesn't exist, use legacy edge label.
                 if (StringUtils.isEmpty(edgeLabel)) {
                     edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
                 }
 
-                AtlasEdge currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
+                AtlasEdge currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel, edgeDirection);
                 AtlasEdge newEdge     = null;
 
                 if (ctx.getValue() != null) {
@@ -328,14 +374,41 @@
 
                     newEdge = mapObjectIdValueUsingRelationship(ctx, context);
 
-                    if (newEdge != null && ctx.getAttribute().getInverseRefAttribute() != null) {
+                    // legacy case update inverse attribute
+                    if (ctx.getAttribute().getInverseRefAttribute() != null) {
                         // Update the inverse reference using relationship on the target entity
                         addInverseReference(ctx.getAttribute().getInverseRefAttribute(), newEdge);
                     }
                 }
 
+                // created new relationship,
+                // record entity update on both vertices of the new relationship
+                if (currentEdge == null && newEdge != null) {
+
+                    // based on relationship edge direction record update only on attribute vertex
+                    if (edgeDirection == IN) {
+                        recordEntityUpdate(newEdge.getOutVertex());
+
+                    } else {
+                        recordEntityUpdate(newEdge.getInVertex());
+                    }
+                }
+
+                // update references, if current and new edge don't match
+                // record entity update on new reference and delete(edge) old reference.
                 if (currentEdge != null && !currentEdge.equals(newEdge)) {
-                    deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), ctx.getAttribute().isOwnedRef(), true);
+
+                    //record entity update on new edge
+                    if (isRelationshipEdge(newEdge)) {
+                        AtlasVertex attrVertex = context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue()));
+
+                        recordEntityUpdate(attrVertex);
+                        updateModificationMetadata(attrVertex);
+                    }
+
+                    //delete old reference
+                    deleteHandler.deleteEdgeReference(currentEdge, ctx.getAttrType().getTypeCategory(), ctx.getAttribute().isOwnedRef(),
+                                                      true, ctx.getAttribute().getRelationshipEdgeDirection());
                 }
 
                 return newEdge;
@@ -402,7 +475,7 @@
 
         if (inverseUpdated) {
             updateModificationMetadata(inverseVertex);
-            AtlasObjectId inverseEntityId = new AtlasObjectId(AtlasGraphUtilsV1.getIdFromVertex(inverseVertex), inverseType.getTypeName());
+            AtlasObjectId inverseEntityId = new AtlasObjectId(getIdFromVertex(inverseVertex), inverseType.getTypeName());
             RequestContextV1.get().recordEntityUpdate(inverseEntityId);
         }
     }
@@ -424,7 +497,7 @@
             if (entityType.hasRelationshipAttribute(inverseAttributeName)) {
                 String relationshipName = graphHelper.getRelationshipDefName(inverseVertex, entityType, inverseAttributeName);
 
-                ret = getOrCreateRelationship(inverseVertex, vertex, relationshipName);
+                ret = getOrCreateRelationship(inverseVertex, vertex, relationshipName, inverseAttribute);
 
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -576,24 +649,47 @@
         String    attributeName = ctx.getAttribute().getName();
         AtlasType type          = typeRegistry.getType(AtlasGraphUtilsV1.getTypeName(entityVertex));
 
+        AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
+        String                         edgeLabel     = ctx.getAttribute().getRelationshipEdgeLabel();
+
         if (type instanceof AtlasEntityType) {
             AtlasEntityType entityType = (AtlasEntityType) type;
 
             // use relationship to create/update edges
             if (entityType.hasRelationshipAttribute(attributeName)) {
                 if (ctx.getCurrentEdge() != null) {
-                    ret = updateRelationship(ctx.getCurrentEdge(), attributeVertex);
+                    ret = updateRelationship(ctx.getCurrentEdge(), attributeVertex, edgeDirection, ctx.getAttribute());
+
+                    recordEntityUpdate(attributeVertex);
 
                 } else {
-                    String relationshipName = graphHelper.getRelationshipDefName(entityVertex, entityType, attributeName);
-                    ret = getOrCreateRelationship(entityVertex, attributeVertex, relationshipName);
-                }
+                    String      relationshipName = graphHelper.getRelationshipDefName(entityVertex, entityType, attributeName);
+                    AtlasVertex fromVertex;
+                    AtlasVertex toVertex;
 
+                    if (edgeDirection == IN) {
+                        fromVertex = attributeVertex;
+                        toVertex   = entityVertex;
+
+                    } else {
+                        fromVertex = entityVertex;
+                        toVertex   = attributeVertex;
+                    }
+                    boolean relationshipExists = isRelationshipExists(fromVertex, toVertex, edgeLabel);
+
+                    ret = getOrCreateRelationship(fromVertex, toVertex, relationshipName, ctx.getAttribute());
+
+                    // if relationship did not exist before and new relationship was created
+                    // record entity update on both relationship vertices
+                    if (!relationshipExists) {
+                        recordEntityUpdate(attributeVertex);
+                    }
+                }
             } else {
                 // use legacy way to create/update edges
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("No RelationshipDef defined between {} and {} on attribute: {}",  AtlasGraphUtilsV1.getTypeName(entityVertex),
-                               AtlasGraphUtilsV1.getTypeName(attributeVertex), attributeName);
+                    LOG.debug("No RelationshipDef defined between {} and {} on attribute: {}",  getTypeName(entityVertex),
+                               getTypeName(attributeVertex), attributeName);
                 }
 
                 ret = mapObjectIdValue(ctx, context);
@@ -728,6 +824,21 @@
         return newElementsCreated;
     }
 
+    private boolean isRelationshipAttribute(AtlasAttribute attribute) {
+        boolean ret = false;
+
+        if (attribute != null) {
+            AtlasStructType structType    = attribute.getDefinedInType();
+            String          attributeName = attribute.getName();
+
+            if (structType instanceof AtlasEntityType) {
+                ret = ((AtlasEntityType) structType).hasRelationshipAttribute(attributeName);
+            }
+        }
+
+        return ret;
+    }
+
 
     private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex referringVertex, String edgeLabel, EntityMutationContext context) throws AtlasBaseException {
         AtlasVertex vertex = createStructVertex(struct);
@@ -767,6 +878,16 @@
         return (AtlasStructType)objType;
     }
 
+    private AtlasEntityType getEntityType(String typeName) throws AtlasBaseException {
+        AtlasType objType = typeRegistry.getType(typeName);
+
+        if (!(objType instanceof AtlasEntityType)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, typeName);
+        }
+
+        return (AtlasEntityType)objType;
+    }
+
     private Object mapCollectionElementsToVertex(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException {
         switch(ctx.getAttrType().getTypeCategory()) {
         case PRIMITIVE:
@@ -918,8 +1039,8 @@
         // Update edge if it exists
 
         AtlasVertex currentVertex = currentEdge.getInVertex();
-        String currentEntityId = AtlasGraphUtilsV1.getIdFromVertex(currentVertex);
-        String newEntityId = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
+        String currentEntityId = getIdFromVertex(currentVertex);
+        String newEntityId = getIdFromVertex(entityVertex);
 
         AtlasEdge newEdge = currentEdge;
         if (!currentEntityId.equals(newEntityId)) {
@@ -936,16 +1057,25 @@
         return newEdge;
     }
 
-    private AtlasEdge updateRelationship(AtlasEdge currentEdge, final AtlasVertex entityVertex) throws AtlasBaseException {
+    private AtlasEdge updateRelationship(AtlasEdge currentEdge, final AtlasVertex newEntityVertex,
+                                         AtlasRelationshipEdgeDirection edgeDirection, AtlasAttribute attribute)
+                                         throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating entity reference using relationship {} for reference attribute {}",  AtlasGraphUtilsV1.getTypeName(entityVertex));
+            LOG.debug("Updating entity reference using relationship {} for reference attribute {}", getTypeName(newEntityVertex));
         }
 
-        String    currentEntityId = AtlasGraphUtilsV1.getIdFromVertex(currentEdge.getInVertex());
-        String    newEntityId     = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
-        AtlasEdge ret             = currentEdge;
+        // Max's manager updated from Jane to Julius (Max.manager --> Jane.subordinates)
+        // manager attribute (OUT direction), current manager vertex (Jane) (IN vertex)
 
-        if (!currentEntityId.equals(newEntityId)) {
+        // Max's mentor updated from John to Jane (John.mentee --> Max.mentor)
+        // mentor attribute (IN direction), current mentee vertex (John) (OUT vertex)
+        String currentEntityId = (edgeDirection == IN) ? getIdFromVertex(currentEdge.getOutVertex()) :
+                                                         getIdFromVertex(currentEdge.getInVertex());
+
+        String    newEntityId = getIdFromVertex(newEntityVertex);
+        AtlasEdge ret         = currentEdge;
+
+        if (!currentEntityId.equals(newEntityId) && newEntityVertex != null) {
             // create a new relationship edge to the new attribute vertex from the instance
             String relationshipName = AtlasGraphUtilsV1.getTypeName(currentEdge);
 
@@ -953,7 +1083,8 @@
                 relationshipName = currentEdge.getLabel();
             }
 
-            ret = getOrCreateRelationship(currentEdge.getOutVertex(), entityVertex, relationshipName);
+            ret = (edgeDirection == IN) ? getOrCreateRelationship(newEntityVertex, currentEdge.getInVertex(), relationshipName, attribute) :
+                                          getOrCreateRelationship(currentEdge.getOutVertex(), newEntityVertex, relationshipName, attribute);
         }
 
         return ret;
@@ -983,8 +1114,7 @@
     //Removes unused edges from the old collection, compared to the new collection
     private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(currentEntries)) {
-            AtlasStructType entityType = attribute.getDefinedInType();
-            AtlasType       entryType  = ((AtlasArrayType)attribute.getAttributeType()).getElementType();
+            AtlasType entryType = ((AtlasArrayType) attribute.getAttributeType()).getElementType();
 
             if (AtlasGraphUtilsV1.isReference(entryType)) {
                 Collection<AtlasEdge> edgesToRemove = CollectionUtils.subtract(currentEntries, newEntries);
@@ -993,7 +1123,8 @@
                     List<AtlasEdge> additionalElements = new ArrayList<>();
 
                     for (AtlasEdge edge : edgesToRemove) {
-                        boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(), true);
+                        boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
+                                                                             true, attribute.getRelationshipEdgeDirection());
 
                         if (!deleted) {
                             additionalElements.add(edge);
@@ -1021,7 +1152,7 @@
     private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) {
         AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
 
-        header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex));
+        header.setGuid(getIdFromVertex(vertex));
 
         for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
             header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
@@ -1148,7 +1279,7 @@
 
         for (String classificationName : classificationNames) {
             try {
-                final String entityTypeName = GraphHelper.getTypeName(instanceVertex);
+                final String entityTypeName = getTypeName(instanceVertex);
                 String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, classificationName);
                 AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
                 if (edge != null) {
@@ -1182,20 +1313,20 @@
         }
     }
 
-    private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName) throws AtlasBaseException {
-        AtlasEdge         ret          = null;
-        AtlasObjectId     end1         = new AtlasObjectId(AtlasGraphUtilsV1.getIdFromVertex(end1Vertex), AtlasGraphUtilsV1.getTypeName(end1Vertex));
-        AtlasObjectId     end2         = new AtlasObjectId(AtlasGraphUtilsV1.getIdFromVertex(end2Vertex), AtlasGraphUtilsV1.getTypeName(end2Vertex));
-        AtlasRelationship relationship = relationshipStore.getOrCreate(new AtlasRelationship(relationshipName, end1, end2));
+    private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, AtlasAttribute attribute) throws AtlasBaseException {
+        AtlasEdge     ret  = null;
+        AtlasObjectId end1 = new AtlasObjectId(getIdFromVertex(end1Vertex), AtlasGraphUtilsV1.getTypeName(end1Vertex));
+        AtlasObjectId end2 = new AtlasObjectId(getIdFromVertex(end2Vertex), AtlasGraphUtilsV1.getTypeName(end2Vertex));
 
+        AtlasRelationship relationship = relationshipStore.getOrCreate(new AtlasRelationship(relationshipName, end1, end2));
         // return newly created AtlasEdge
-        // if multiple edges are returned, compare using id to pick the right one
+        // if multiple edges are returned, compare using guid to pick the right one
         Iterator<AtlasEdge> outEdges = graphHelper.getOutGoingEdgesByLabel(end1Vertex, relationship.getLabel());
 
         while (outEdges.hasNext()) {
             AtlasEdge edge = outEdges.next();
 
-            if (AtlasGraphUtilsV1.getIdFromVertex(end2Vertex).equals(AtlasGraphUtilsV1.getIdFromVertex(edge.getInVertex()))) {
+            if (getIdFromVertex(end2Vertex).equals(getIdFromVertex(edge.getInVertex()))) {
                 ret = edge;
                 break;
             }
@@ -1203,4 +1334,47 @@
 
         return ret;
     }
+
+    private boolean isRelationshipExists(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
+        boolean             ret   = false;
+        Iterator<AtlasEdge> edges = graphHelper.getOutGoingEdgesByLabel(fromVertex, edgeLabel);
+
+        while (edges != null && edges.hasNext()) {
+            AtlasEdge   edge     = edges.next();
+            AtlasVertex inVertex = edge.getInVertex();
+
+            if (inVertex != null && StringUtils.equals(getIdFromVertex(inVertex), getIdFromVertex(toVertex))) {
+                ret = true;
+            }
+        }
+
+        return ret;
+    }
+
+    private void recordEntityUpdate(AtlasVertex vertex) {
+        AtlasObjectId    objectId = new AtlasObjectId(GraphHelper.getGuid(vertex), GraphHelper.getTypeName(vertex));
+        RequestContextV1 req      = RequestContextV1.get();
+
+        if (!objectIdsContain(req.getUpdatedEntityIds(), objectId)) {
+            req.recordEntityUpdate(objectId);
+        }
+    }
+
+    private boolean objectIdsContain(Collection<AtlasObjectId> objectIds, AtlasObjectId objectId) {
+        boolean ret = false;
+
+        if (objectIds != null && objectIds.isEmpty()) {
+            ret = false;
+
+        } else {
+            for (AtlasObjectId id : objectIds) {
+                if (StringUtils.equals(id.getGuid(), objectId.getGuid())) {
+                    ret = true;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index f4257be..31fc837 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -69,6 +69,7 @@
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_STRING;
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 
 
 public final class EntityGraphRetriever {
@@ -136,6 +137,10 @@
         return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
     }
 
+    public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set<String> attributes) throws AtlasBaseException {
+        return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
+    }
+
     private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
         AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid);
 
@@ -188,7 +193,7 @@
 
             mapAttributes(entityVertex, entity, entityExtInfo);
 
-            mapRelationshipAttributes(entityVertex, entity, entityExtInfo);
+            mapRelationshipAttributes(entityVertex, entity);
 
             mapClassifications(entityVertex, entity, entityExtInfo);
         }
@@ -300,23 +305,6 @@
         }
     }
 
-    private void mapRelationshipAttributes(AtlasVertex entityVertex, AtlasEntity entity, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        AtlasType objType = typeRegistry.getType(entity.getTypeName());
-
-        if (!(objType instanceof AtlasEntityType)) {
-            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, entity.getTypeName());
-        }
-
-        AtlasEntityType entityType = (AtlasEntityType) objType;
-
-        for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
-
-            Object attrValue = mapVertexToRelationshipAttribute(entityVertex, entityType, attribute, entityExtInfo);
-
-            entity.addRelationshipAttribute(attribute.getName(), attrValue);
-        }
-    }
-
     public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
 
         AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
@@ -401,6 +389,7 @@
         String    vertexPropertyName = attribute.getQualifiedName();
         String    edgeLabel          = EDGE_LABEL_PREFIX + vertexPropertyName;
         boolean   isOwnedAttribute   = attribute.isOwnedRef();
+        AtlasRelationshipEdgeDirection edgeDirection = attribute.getRelationshipEdgeDirection();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Mapping vertex {} to atlas entity {}.{}", entityVertex, attribute.getDefinedInDef().getName(), attribute.getName());
@@ -417,13 +406,13 @@
                 ret = mapVertexToStruct(entityVertex, edgeLabel, null, entityExtInfo);
                 break;
             case OBJECT_ID_TYPE:
-                ret = mapVertexToObjectId(entityVertex, edgeLabel, null, entityExtInfo, isOwnedAttribute);
+                ret = mapVertexToObjectId(entityVertex, edgeLabel, null, entityExtInfo, isOwnedAttribute, edgeDirection);
                 break;
             case ARRAY:
-                ret = mapVertexToArray(entityVertex, (AtlasArrayType) attrType, vertexPropertyName, entityExtInfo, isOwnedAttribute);
+                ret = mapVertexToArray(entityVertex, (AtlasArrayType) attrType, vertexPropertyName, entityExtInfo, isOwnedAttribute, edgeDirection);
                 break;
             case MAP:
-                ret = mapVertexToMap(entityVertex, (AtlasMapType) attrType, vertexPropertyName, entityExtInfo, isOwnedAttribute);
+                ret = mapVertexToMap(entityVertex, (AtlasMapType) attrType, vertexPropertyName, entityExtInfo, isOwnedAttribute, edgeDirection);
                 break;
             case CLASSIFICATION:
                 // do nothing
@@ -433,51 +422,10 @@
         return ret;
     }
 
-    private Object mapVertexToRelationshipAttribute(AtlasVertex entityVertex, AtlasEntityType entityType, AtlasAttribute attribute,
-                                                    AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        Object               ret             = null;
-        AtlasRelationshipDef relationshipDef = graphHelper.getRelationshipDef(entityVertex, entityType, attribute.getName());
-
-        if (relationshipDef == null) {
-            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID, "relationshipDef is null");
-        }
-
-        AtlasRelationshipEndDef endDef1         = relationshipDef.getEndDef1();
-        AtlasRelationshipEndDef endDef2         = relationshipDef.getEndDef2();
-        AtlasEntityType         endDef1Type     = typeRegistry.getEntityTypeByName(endDef1.getType());
-        AtlasEntityType         endDef2Type     = typeRegistry.getEntityTypeByName(endDef2.getType());
-        AtlasRelationshipEndDef attributeEndDef = null;
-
-        if (endDef1Type.isTypeOrSuperTypeOf(entityType.getTypeName()) && StringUtils.equals(endDef1.getName(), attribute.getName())) {
-            attributeEndDef = endDef1;
-
-        } else if (endDef2Type.isTypeOrSuperTypeOf(entityType.getTypeName()) && StringUtils.equals(endDef2.getName(), attribute.getName())) {
-            attributeEndDef = endDef2;
-        }
-
-        if (attributeEndDef == null) {
-            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID, relationshipDef.toString());
-        }
-
-        String relationshipLabel = attribute.getRelationshipEdgeLabel();
-
-        switch (attributeEndDef.getCardinality()) {
-            case SINGLE:
-                ret = mapVertexToObjectId(entityVertex, relationshipLabel, null, entityExtInfo, attributeEndDef.getIsContainer());
-                break;
-
-            case LIST:
-            case SET:
-                ret = mapVertexToRelationshipArrayAttribute(entityVertex, (AtlasArrayType) attribute.getAttributeType(), relationshipLabel,
-                                                            entityExtInfo, attributeEndDef.getIsContainer());
-                break;
-        }
-
-        return ret;
-    }
-
     private Map<String, Object> mapVertexToMap(AtlasVertex entityVertex, AtlasMapType atlasMapType, final String propertyName,
-                                               AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute) throws AtlasBaseException {
+                                               AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute,
+                                               AtlasRelationshipEdgeDirection edgeDirection) throws AtlasBaseException {
+
         List<String> mapKeys = GraphHelper.getListProperty(entityVertex, propertyName);
 
         if (CollectionUtils.isEmpty(mapKeys)) {
@@ -496,7 +444,9 @@
             final String edgeLabel       = EDGE_LABEL_PREFIX + keyPropertyName;
             final Object keyValue        = GraphHelper.getMapValueProperty(mapValueType, entityVertex, keyPropertyName);
 
-            Object mapValue = mapVertexToCollectionEntry(entityVertex, mapValueType, keyValue, edgeLabel, entityExtInfo, isOwnedAttribute);
+            Object mapValue = mapVertexToCollectionEntry(entityVertex, mapValueType, keyValue, edgeLabel,
+                                                         entityExtInfo, isOwnedAttribute, edgeDirection);
+
             if (mapValue != null) {
                 ret.put(mapKey, mapValue);
             }
@@ -506,7 +456,9 @@
     }
 
     private List<Object> mapVertexToArray(AtlasVertex entityVertex, AtlasArrayType arrayType, String propertyName,
-                                          AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute) throws AtlasBaseException {
+                                          AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute,
+                                          AtlasRelationshipEdgeDirection edgeDirection)  throws AtlasBaseException {
+
         AtlasType    arrayElementType = arrayType.getElementType();
         List<Object> arrayElements    = GraphHelper.getArrayElementsProperty(arrayElementType, entityVertex, propertyName);
 
@@ -522,8 +474,8 @@
         String edgeLabel = EDGE_LABEL_PREFIX + propertyName;
 
         for (Object element : arrayElements) {
-            Object arrValue = mapVertexToCollectionEntry(entityVertex, arrayElementType, element,
-                                                         edgeLabel, entityExtInfo, isOwnedAttribute);
+            Object arrValue = mapVertexToCollectionEntry(entityVertex, arrayElementType, element, edgeLabel,
+                                                         entityExtInfo, isOwnedAttribute, edgeDirection);
 
             if (arrValue != null) {
                 arrValues.add(arrValue);
@@ -533,42 +485,9 @@
         return arrValues;
     }
 
-    private List<Object> mapVertexToRelationshipArrayAttribute(AtlasVertex entityVertex, AtlasArrayType arrayType,
-                                                               String relationshipName, AtlasEntityExtInfo entityExtInfo,
-                                                               boolean isContainer) throws AtlasBaseException {
-
-        Iterator<AtlasEdge> relationshipEdges = graphHelper.getBothEdgesByLabel(entityVertex, relationshipName);
-        AtlasType           arrayElementType  = arrayType.getElementType();
-        List<AtlasEdge>     arrayElements     = new ArrayList<>();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Mapping array attribute {} for vertex {}", arrayElementType.getTypeName(), entityVertex);
-        }
-
-        while (relationshipEdges.hasNext()) {
-            arrayElements.add(relationshipEdges.next());
-        }
-
-        if (CollectionUtils.isEmpty(arrayElements)) {
-            return null;
-        }
-
-        List arrValues = new ArrayList(arrayElements.size());
-
-        for (Object element : arrayElements) {
-            Object arrValue = mapVertexToCollectionEntry(entityVertex, arrayElementType, element, relationshipName,
-                                                         entityExtInfo, isContainer);
-
-            if (arrValue != null) {
-                arrValues.add(arrValue);
-            }
-        }
-
-        return arrValues;
-    }
-
-    private Object mapVertexToCollectionEntry(AtlasVertex entityVertex, AtlasType arrayElement, Object value, String edgeLabel,
-                                              AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute) throws AtlasBaseException {
+    private Object mapVertexToCollectionEntry(AtlasVertex entityVertex, AtlasType arrayElement, Object value,
+                                              String edgeLabel, AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute,
+                                              AtlasRelationshipEdgeDirection edgeDirection) throws AtlasBaseException {
         Object ret = null;
 
         switch (arrayElement.getTypeCategory()) {
@@ -587,7 +506,7 @@
                 break;
 
             case OBJECT_ID_TYPE:
-                ret = mapVertexToObjectId(entityVertex, edgeLabel, (AtlasEdge) value, entityExtInfo, isOwnedAttribute);
+                ret = mapVertexToObjectId(entityVertex, edgeLabel, (AtlasEdge) value, entityExtInfo, isOwnedAttribute, edgeDirection);
                 break;
 
             default:
@@ -646,11 +565,12 @@
     }
 
     private AtlasObjectId mapVertexToObjectId(AtlasVertex entityVertex, String edgeLabel, AtlasEdge edge,
-                                              AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute) throws AtlasBaseException {
+                                              AtlasEntityExtInfo entityExtInfo, boolean isOwnedAttribute,
+                                              AtlasRelationshipEdgeDirection edgeDirection) throws AtlasBaseException {
         AtlasObjectId ret = null;
 
         if (edge == null) {
-            edge = graphHelper.getEdgeForLabel(entityVertex, edgeLabel);
+            edge = graphHelper.getEdgeForLabel(entityVertex, edgeLabel, edgeDirection);
         }
 
         if (GraphHelper.elementExists(edge)) {
@@ -697,7 +617,102 @@
         return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null) : null;
     }
 
-    public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set<String> attributes) throws AtlasBaseException {
-        return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
+    private void mapRelationshipAttributes(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException {
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, entity.getTypeName());
+        }
+
+        for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+            Object attrValue = mapVertexToRelationshipAttribute(entityVertex, entityType, attribute);
+
+            entity.setRelationshipAttribute(attribute.getName(), attrValue);
+        }
+    }
+
+    private Object mapVertexToRelationshipAttribute(AtlasVertex entityVertex, AtlasEntityType entityType, AtlasAttribute attribute) throws AtlasBaseException {
+        Object               ret             = null;
+        AtlasRelationshipDef relationshipDef = graphHelper.getRelationshipDef(entityVertex, entityType, attribute.getName());
+
+        if (relationshipDef == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID, "relationshipDef is null");
+        }
+
+        AtlasRelationshipEndDef endDef1         = relationshipDef.getEndDef1();
+        AtlasRelationshipEndDef endDef2         = relationshipDef.getEndDef2();
+        AtlasEntityType         endDef1Type     = typeRegistry.getEntityTypeByName(endDef1.getType());
+        AtlasEntityType         endDef2Type     = typeRegistry.getEntityTypeByName(endDef2.getType());
+        AtlasRelationshipEndDef attributeEndDef = null;
+
+        if (endDef1Type.isTypeOrSuperTypeOf(entityType.getTypeName()) && StringUtils.equals(endDef1.getName(), attribute.getName())) {
+            attributeEndDef = endDef1;
+        } else if (endDef2Type.isTypeOrSuperTypeOf(entityType.getTypeName()) && StringUtils.equals(endDef2.getName(), attribute.getName())) {
+            attributeEndDef = endDef2;
+        }
+
+        if (attributeEndDef == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID, relationshipDef.toString());
+        }
+
+        switch (attributeEndDef.getCardinality()) {
+            case SINGLE:
+                ret = mapRelatedVertexToObjectId(entityVertex, attribute);
+                break;
+
+            case LIST:
+            case SET:
+                ret = mapRelationshipArrayAttribute(entityVertex, attribute);
+                break;
+        }
+
+        return ret;
+    }
+
+    private AtlasObjectId mapRelatedVertexToObjectId(AtlasVertex entityVertex, AtlasAttribute attribute) throws AtlasBaseException {
+        AtlasEdge edge = graphHelper.getEdgeForLabel(entityVertex, attribute.getRelationshipEdgeLabel(), attribute.getRelationshipEdgeDirection());
+
+        return mapRelatedVertexToObjectId(entityVertex, edge);
+    }
+
+    private List<AtlasObjectId> mapRelationshipArrayAttribute(AtlasVertex entityVertex, AtlasAttribute attribute) throws AtlasBaseException {
+        List<AtlasObjectId> ret   = new ArrayList<>();
+        Iterator<AtlasEdge> edges = null;
+
+        if (attribute.getRelationshipEdgeDirection() == AtlasRelationshipEdgeDirection.IN) {
+            edges = graphHelper.getIncomingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
+        } else if (attribute.getRelationshipEdgeDirection() == AtlasRelationshipEdgeDirection.OUT) {
+            edges = graphHelper.getOutGoingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel());
+        }
+
+        if (edges != null) {
+            while (edges.hasNext()) {
+                AtlasEdge relationshipEdge = edges.next();
+
+                AtlasObjectId objectId = mapRelatedVertexToObjectId(entityVertex, relationshipEdge);
+
+                ret.add(objectId);
+            }
+        }
+
+        return ret;
+    }
+
+    private AtlasObjectId mapRelatedVertexToObjectId(AtlasVertex entityVertex, AtlasEdge edge) throws AtlasBaseException {
+        AtlasObjectId ret = null;
+
+        if (GraphHelper.elementExists(edge)) {
+            AtlasVertex referenceVertex = edge.getInVertex();
+
+            if (StringUtils.equals(getIdFromVertex(referenceVertex), getIdFromVertex(entityVertex))) {
+                referenceVertex = edge.getOutVertex();
+            }
+
+            if (referenceVertex != null) {
+                ret = new AtlasObjectId(GraphHelper.getGuid(referenceVertex), GraphHelper.getTypeName(referenceVertex));
+            }
+        }
+
+        return ret;
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 404225c..7210799 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.inject.Inject;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index d901731..82692cf 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.common.collect.Sets;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
@@ -149,6 +150,8 @@
         AtlasExportResult exportResult = zipSource.getExportResult();
         List<String> creationOrder = zipSource.getCreationOrder();
 
+        RequestContextV1.clear();
+
         AtlasImportRequest request = getDefaultImportRequest();
         AtlasImportResult result = runImportWithParameters(importService, request, zipSource);
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreHardDeleteV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreHardDeleteV1Test.java
new file mode 100644
index 0000000..2c31140
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreHardDeleteV1Test.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.testng.annotations.Guice;
+
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+/**
+ * Inverse reference update test with {@link HardDeleteHandlerV1}
+ */
+@Guice(modules = TestModules.HardDeleteModule.class)
+public class AtlasRelationshipStoreHardDeleteV1Test extends AtlasRelationshipStoreV1Test {
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity jane) throws Exception {
+        // Max should have been removed from the subordinates list, leaving only John.
+        verifyRelationshipAttributeList(jane, "subordinates", ImmutableList.of(employeeNameIdMap.get("John")));
+    }
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_ManyToOne(AtlasEntity a1, AtlasEntity a2,
+                                                                            AtlasEntity a3, AtlasEntity b) {
+
+        verifyRelationshipAttributeValue(a1, "oneB", null);
+
+        verifyRelationshipAttributeValue(a2, "oneB", null);
+
+        verifyRelationshipAttributeList(b, "manyA", ImmutableList.of(getAtlasObjectId(a3)));
+    }
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1, AtlasEntity b) {
+        verifyRelationshipAttributeValue(a1, "b", null);
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreSoftDeleteV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreSoftDeleteV1Test.java
new file mode 100644
index 0000000..33ef8c0
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreSoftDeleteV1Test.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.testng.annotations.Guice;
+
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+
+/**
+ * Inverse reference update test with {@link SoftDeleteHandlerV1}
+ */
+@Guice(modules = TestModules.SoftDeleteModule.class)
+public class AtlasRelationshipStoreSoftDeleteV1Test extends AtlasRelationshipStoreV1Test {
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity jane) throws Exception {
+        // Max is still in the subordinates list, as the edge still exists with state DELETED
+        verifyRelationshipAttributeList(jane, "subordinates", ImmutableList.of(employeeNameIdMap.get("John"), employeeNameIdMap.get("Max")));
+    }
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_ManyToOne(AtlasEntity a1, AtlasEntity a2,
+                                                                            AtlasEntity a3, AtlasEntity b) {
+
+        verifyRelationshipAttributeValue(a1, "oneB", b.getGuid());
+
+        verifyRelationshipAttributeValue(a2, "oneB", b.getGuid());
+
+        verifyRelationshipAttributeList(b, "manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2), getAtlasObjectId(a3)));
+    }
+
+    @Override
+    protected void verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1, AtlasEntity b) {
+        verifyRelationshipAttributeValue(a1, "b", b.getGuid());
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
index 6770223..31efe86 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
@@ -17,18 +17,25 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.TestModules;
-import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeTest;
@@ -36,11 +43,25 @@
 import org.testng.annotations.Test;
 
 import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.atlas.TestRelationshipUtilsV2.EMPLOYEE_TYPE;
+import static org.apache.atlas.TestRelationshipUtilsV2.getDepartmentEmployeeInstances;
+import static org.apache.atlas.TestRelationshipUtilsV2.getDepartmentEmployeeTypes;
+import static org.apache.atlas.TestRelationshipUtilsV2.getInverseReferenceTestTypes;
+import static org.apache.atlas.TestUtils.NAME;
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
 import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
-public class AtlasRelationshipStoreV1Test {
+public abstract class AtlasRelationshipStoreV1Test {
 
     @Inject
     AtlasTypeRegistry typeRegistry;
@@ -56,32 +77,28 @@
 
     AtlasEntityStore          entityStore;
     AtlasRelationshipStore    relationshipStore;
-    AtlasEntityWithExtInfo    dbEntity;
-    AtlasEntityWithExtInfo    tblEntity;
     AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
 
+    protected Map<String, AtlasObjectId> employeeNameIdMap = new HashMap<>();
+
     @BeforeClass
     public void setUp() throws Exception {
         new GraphBackedSearchIndexer(typeRegistry);
 
-        AtlasTypesDef[] testTypesDefs = new AtlasTypesDef[] { TestUtilsV2.defineDeptEmployeeTypes(),
-                                                              TestUtilsV2.defineHiveTypes() };
+        // create employee relationship types
+        AtlasTypesDef employeeTypes = getDepartmentEmployeeTypes();
+        typeDefStore.createTypesDef(employeeTypes);
 
-        for (AtlasTypesDef typesDef : testTypesDefs) {
-            AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
+        AtlasEntitiesWithExtInfo employeeInstances = getDepartmentEmployeeInstances();
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(employeeInstances), false);
 
-            if (!typesToCreate.isEmpty()) {
-                typeDefStore.createTypesDef(typesToCreate);
-            }
+        for (AtlasEntityHeader entityHeader : response.getCreatedEntities()) {
+            employeeNameIdMap.put((String) entityHeader.getAttribute(NAME), getAtlasObjectId(entityHeader));
         }
 
-        dbEntity   = TestUtilsV2.createDBEntityV2();
-        tblEntity  = TestUtilsV2.createTableEntityV2(dbEntity.getEntity());
-    }
-
-    @AfterClass
-    public void clear() {
-        AtlasGraphProvider.cleanup();
+        init();
+        AtlasTypesDef testTypes = getInverseReferenceTestTypes();
+        typeDefStore.createTypesDef(testTypes);
     }
 
     @BeforeTest
@@ -92,8 +109,384 @@
         RequestContextV1.clear();
     }
 
-    @Test
-    public void testDbTableRelationship() throws Exception  {
-        // Add tests - in progress
+    @AfterClass
+    public void clear() {
+        AtlasGraphProvider.cleanup();
     }
-}
+
+    @Test
+    public void testDepartmentEmployeeEntitiesUsingRelationship() throws Exception  {
+        AtlasObjectId hrId     = employeeNameIdMap.get("hr");
+        AtlasObjectId maxId    = employeeNameIdMap.get("Max");
+        AtlasObjectId johnId   = employeeNameIdMap.get("John");
+        AtlasObjectId juliusId = employeeNameIdMap.get("Julius");
+        AtlasObjectId janeId   = employeeNameIdMap.get("Jane");
+
+        AtlasEntity hrDept = getEntityFromStore(hrId.getGuid());
+        AtlasEntity max    = getEntityFromStore(maxId.getGuid());
+        AtlasEntity john   = getEntityFromStore(johnId.getGuid());
+        AtlasEntity julius = getEntityFromStore(juliusId.getGuid());
+        AtlasEntity jane   = getEntityFromStore(janeId.getGuid());
+
+        // Department relationship attributes
+        List<AtlasObjectId> deptEmployees = toAtlasObjectIds(hrDept.getRelationshipAttribute("employees"));
+        assertNotNull(deptEmployees);
+        assertEquals(deptEmployees.size(), 4);
+        assertObjectIdsContains(deptEmployees, maxId);
+        assertObjectIdsContains(deptEmployees, johnId);
+        assertObjectIdsContains(deptEmployees, juliusId);
+        assertObjectIdsContains(deptEmployees, janeId);
+
+        // Max employee validation
+        AtlasObjectId maxDepartmentId = toAtlasObjectId(max.getRelationshipAttribute("department"));
+        assertNotNull(maxDepartmentId);
+        assertObjectIdEquals(maxDepartmentId, hrId);
+
+        AtlasObjectId maxManagerId = toAtlasObjectId(max.getRelationshipAttribute("manager"));
+        assertNotNull(maxManagerId);
+        assertObjectIdEquals(maxManagerId, janeId);
+
+        AtlasObjectId maxMentorId = toAtlasObjectId(max.getRelationshipAttribute("mentor"));
+        assertNotNull(maxMentorId);
+        assertObjectIdEquals(maxMentorId, juliusId);
+
+        List<AtlasObjectId> maxMenteesId = toAtlasObjectIds(max.getRelationshipAttribute("mentees"));
+        assertNotNull(maxMenteesId);
+        assertEquals(maxMenteesId.size(), 1);
+        assertObjectIdEquals(maxMenteesId.get(0), johnId);
+
+        // John Employee validation
+        AtlasObjectId johnDepartmentId = toAtlasObjectId(john.getRelationshipAttribute("department"));
+        assertNotNull(johnDepartmentId);
+        assertObjectIdEquals(johnDepartmentId, hrId);
+
+        AtlasObjectId johnManagerId = toAtlasObjectId(john.getRelationshipAttribute("manager"));
+        assertNotNull(johnManagerId);
+        assertObjectIdEquals(johnManagerId, janeId);
+
+        AtlasObjectId johnMentorId = toAtlasObjectId(john.getRelationshipAttribute("mentor"));
+        assertNotNull(johnMentorId);
+        assertObjectIdEquals(johnMentorId, maxId);
+
+        List<AtlasObjectId> johnMenteesId = toAtlasObjectIds(john.getRelationshipAttribute("mentees"));
+        assertNull(johnMenteesId);
+
+        // Jane Manager validation
+        AtlasObjectId janeDepartmentId = toAtlasObjectId(jane.getRelationshipAttribute("department"));
+        assertNotNull(janeDepartmentId);
+        assertObjectIdEquals(janeDepartmentId, hrId);
+
+        AtlasObjectId janeManagerId = toAtlasObjectId(jane.getRelationshipAttribute("manager"));
+        assertNull(janeManagerId);
+
+        AtlasObjectId janeMentorId = toAtlasObjectId(jane.getRelationshipAttribute("mentor"));
+        assertNull(janeMentorId);
+
+        List<AtlasObjectId> janeMenteesId = toAtlasObjectIds(jane.getRelationshipAttribute("mentees"));
+        assertNull(janeMenteesId);
+
+        List<AtlasObjectId> janeSubordinateIds = toAtlasObjectIds(jane.getRelationshipAttribute("subordinates"));
+        assertNotNull(janeSubordinateIds);
+        assertEquals(janeSubordinateIds.size(), 2);
+        assertObjectIdsContains(janeSubordinateIds, maxId);
+        assertObjectIdsContains(janeSubordinateIds, johnId);
+
+        // Julius Manager validation
+        AtlasObjectId juliusDepartmentId = toAtlasObjectId(julius.getRelationshipAttribute("department"));
+        assertNotNull(juliusDepartmentId);
+        assertObjectIdEquals(juliusDepartmentId, hrId);
+
+        AtlasObjectId juliusManagerId = toAtlasObjectId(julius.getRelationshipAttribute("manager"));
+        assertNull(juliusManagerId);
+
+        AtlasObjectId juliusMentorId = toAtlasObjectId(julius.getRelationshipAttribute("mentor"));
+        assertNull(juliusMentorId);
+
+        List<AtlasObjectId> juliusMenteesId = toAtlasObjectIds(julius.getRelationshipAttribute("mentees"));
+        assertNotNull(juliusMenteesId);
+        assertEquals(juliusMenteesId.size(), 1);
+        assertObjectIdsContains(juliusMenteesId, maxId);
+
+        List<AtlasObjectId> juliusSubordinateIds = toAtlasObjectIds(julius.getRelationshipAttribute("subordinates"));
+        assertNull(juliusSubordinateIds);
+    }
+
+    @Test
+    public void testRelationshipAttributeUpdate_NonComposite_OneToMany() throws Exception {
+        AtlasObjectId maxId    = employeeNameIdMap.get("Max");
+        AtlasObjectId juliusId = employeeNameIdMap.get("Julius");
+        AtlasObjectId janeId   = employeeNameIdMap.get("Jane");
+
+        // Change Max's Employee.manager reference to Julius and apply the change as a partial update.
+        // This should also update Julius to add Max to the inverse Manager.subordinates reference.
+        AtlasEntity maxEntityForUpdate = new AtlasEntity(EMPLOYEE_TYPE);
+        maxEntityForUpdate.setRelationshipAttribute("manager", juliusId);
+
+        AtlasEntityType        employeeType   = typeRegistry.getEntityTypeByName(EMPLOYEE_TYPE);
+        Map<String, Object>    uniqAttributes = Collections.<String, Object>singletonMap("name", "Max");
+        EntityMutationResponse updateResponse = entityStore.updateByUniqueAttributes(employeeType, uniqAttributes , new AtlasEntityWithExtInfo(maxEntityForUpdate));
+
+        List<AtlasEntityHeader> partialUpdatedEntities = updateResponse.getPartialUpdatedEntities();
+        assertEquals(partialUpdatedEntities.size(), 3);
+        // 3 entities should have been updated:
+        // * Max to change the Employee.manager reference
+        // * Julius to add Max to Manager.subordinates
+        // * Jane to remove Max from Manager.subordinates
+
+        AtlasEntitiesWithExtInfo updatedEntities = entityStore.getByIds(ImmutableList.of(maxId.getGuid(), juliusId.getGuid(), janeId.getGuid()));
+
+        // Max's manager updated as Julius
+        AtlasEntity maxEntity = updatedEntities.getEntity(maxId.getGuid());
+        verifyRelationshipAttributeValue(maxEntity, "manager", juliusId.getGuid());
+
+        // Max added to the subordinate list of Julius
+        AtlasEntity juliusEntity = updatedEntities.getEntity(juliusId.getGuid());
+        verifyRelationshipAttributeList(juliusEntity, "subordinates", ImmutableList.of(maxId));
+
+        // Max removed from the subordinate list of Julius
+        AtlasEntity janeEntity = updatedEntities.getEntity(janeId.getGuid());
+
+        // Jane's subordinates list includes John and Max for soft delete
+        // Jane's subordinates list includes only John for hard delete
+        verifyRelationshipAttributeUpdate_NonComposite_OneToMany(janeEntity);
+    }
+
+    @Test
+    public void testRelationshipAttributeUpdate_NonComposite_ManyToOne() throws Exception {
+        AtlasEntity a1 = new AtlasEntity("A");
+        a1.setAttribute(NAME, "a1_name");
+
+        AtlasEntity a2 = new AtlasEntity("A");
+        a2.setAttribute(NAME, "a2_name");
+
+        AtlasEntity a3 = new AtlasEntity("A");
+        a3.setAttribute(NAME, "a3_name");
+
+        AtlasEntity b = new AtlasEntity("B");
+        b.setAttribute(NAME, "b_name");
+
+        AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
+        entitiesWithExtInfo.addEntity(a1);
+        entitiesWithExtInfo.addEntity(a2);
+        entitiesWithExtInfo.addEntity(a3);
+        entitiesWithExtInfo.addEntity(b);
+        entityStore.createOrUpdate(new AtlasEntityStream(entitiesWithExtInfo) , false);
+
+        AtlasEntity bPartialUpdate = new AtlasEntity("B");
+        bPartialUpdate.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+        init();
+        EntityMutationResponse response = entityStore.updateByUniqueAttributes(typeRegistry.getEntityTypeByName("B"),
+                                                                               Collections.singletonMap(NAME, b.getAttribute(NAME)),
+                                                                               new AtlasEntityWithExtInfo(bPartialUpdate));
+        // Verify 3 entities were updated:
+        // * set b.manyA reference to a1 and a2
+        // * set inverse a1.oneB reference to b
+        // * set inverse a2.oneB reference to b
+        assertEquals(response.getPartialUpdatedEntities().size(), 3);
+        AtlasEntitiesWithExtInfo updatedEntities = entityStore.getByIds(ImmutableList.of(a1.getGuid(), a2.getGuid(), b.getGuid()));
+
+        AtlasEntity a1Entity = updatedEntities.getEntity(a1.getGuid());
+        verifyRelationshipAttributeValue(a1Entity, "oneB", b.getGuid());
+
+        AtlasEntity a2Entity = updatedEntities.getEntity(a2.getGuid());
+        verifyRelationshipAttributeValue(a2Entity, "oneB", b.getGuid());
+
+        AtlasEntity bEntity = updatedEntities.getEntity(b.getGuid());
+        verifyRelationshipAttributeList(bEntity, "manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+
+        bPartialUpdate.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a3)));
+        init();
+        response = entityStore.updateByUniqueAttributes(typeRegistry.getEntityTypeByName("B"),
+                                                        Collections.singletonMap(NAME, b.getAttribute(NAME)),
+                                                        new AtlasEntityWithExtInfo(bPartialUpdate));
+        // Verify 4 entities were updated:
+        // * set b.manyA reference to a3
+        // * set inverse a3.oneB reference to b
+        // * disconnect inverse a1.oneB reference to b
+        // * disconnect inverse a2.oneB reference to b
+        assertEquals(response.getPartialUpdatedEntities().size(), 4);
+        init();
+
+        updatedEntities = entityStore.getByIds(ImmutableList.of(a1.getGuid(), a2.getGuid(), a3.getGuid(), b.getGuid()));
+        a1Entity        = updatedEntities.getEntity(a1.getGuid());
+        a2Entity        = updatedEntities.getEntity(a2.getGuid());
+        bEntity         = updatedEntities.getEntity(b.getGuid());
+
+        AtlasEntity a3Entity = updatedEntities.getEntity(a3.getGuid());
+        verifyRelationshipAttributeValue(a3Entity, "oneB", b.getGuid());
+
+        verifyRelationshipAttributeUpdate_NonComposite_ManyToOne(a1Entity, a2Entity, a3Entity, bEntity);
+    }
+
+    @Test
+    public void testRelationshipAttributeUpdate_NonComposite_OneToOne() throws Exception {
+        AtlasEntity a1 = new AtlasEntity("A");
+        a1.setAttribute(NAME, "a1_name");
+
+        AtlasEntity a2 = new AtlasEntity("A");
+        a2.setAttribute(NAME, "a2_name");
+
+        AtlasEntity b = new AtlasEntity("B");
+        b.setAttribute(NAME, "b_name");
+
+        AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
+        entitiesWithExtInfo.addEntity(a1);
+        entitiesWithExtInfo.addEntity(a2);
+        entitiesWithExtInfo.addEntity(b);
+
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesWithExtInfo) , false);
+
+        AtlasEntity partialUpdateB = new AtlasEntity("B");
+        partialUpdateB.setRelationshipAttribute("a", getAtlasObjectId(a1));
+
+        init();
+        AtlasEntityType bType = typeRegistry.getEntityTypeByName("B");
+
+        response = entityStore.updateByUniqueAttributes(bType, Collections.singletonMap(NAME, b.getAttribute(NAME)), new AtlasEntityWithExtInfo(partialUpdateB));
+        List<AtlasEntityHeader> partialUpdatedEntitiesHeader = response.getPartialUpdatedEntities();
+        // Verify 2 entities were updated:
+        // * set b.a reference to a1
+        // * set inverse a1.b reference to b
+        assertEquals(partialUpdatedEntitiesHeader.size(), 2);
+        AtlasEntitiesWithExtInfo partialUpdatedEntities = entityStore.getByIds(ImmutableList.of(a1.getGuid(), b.getGuid()));
+
+        AtlasEntity a1Entity = partialUpdatedEntities.getEntity(a1.getGuid());
+        verifyRelationshipAttributeValue(a1Entity, "b", b.getGuid());
+
+        AtlasEntity bEntity = partialUpdatedEntities.getEntity(b.getGuid());
+        verifyRelationshipAttributeValue(bEntity, "a", a1.getGuid());
+
+        init();
+
+        // Update b.a to reference a2.
+        partialUpdateB.setRelationshipAttribute("a", getAtlasObjectId(a2));
+        response = entityStore.updateByUniqueAttributes(bType, Collections.<String, Object>singletonMap(NAME, b.getAttribute(NAME)), new AtlasEntityWithExtInfo(partialUpdateB));
+        partialUpdatedEntitiesHeader = response.getPartialUpdatedEntities();
+        // Verify 3 entities were updated:
+        // * set b.a reference to a2
+        // * set a2.b reference to b
+        // * disconnect a1.b reference
+        assertEquals(partialUpdatedEntitiesHeader.size(), 3);
+        partialUpdatedEntities = entityStore.getByIds(ImmutableList.of(a1.getGuid(), a2.getGuid(), b.getGuid()));
+
+        bEntity = partialUpdatedEntities.getEntity(b.getGuid());
+        verifyRelationshipAttributeValue(bEntity, "a", a2.getGuid());
+
+        AtlasEntity a2Entity = partialUpdatedEntities.getEntity(a2.getGuid());
+        verifyRelationshipAttributeValue(a2Entity, "b", b.getGuid());
+
+        a1Entity = partialUpdatedEntities.getEntity(a1.getGuid());
+        verifyRelationshipAttributeUpdate_NonComposite_OneToOne(a1Entity, bEntity);
+    }
+
+    @Test
+    public void testRelationshipAttributeUpdate_NonComposite_ManyToMany() throws Exception {
+        AtlasEntity a1 = new AtlasEntity("A");
+        a1.setAttribute(NAME, "a1_name");
+
+        AtlasEntity a2 = new AtlasEntity("A");
+        a2.setAttribute(NAME, "a2_name");
+
+        AtlasEntity a3 = new AtlasEntity("A");
+        a3.setAttribute(NAME, "a3_name");
+
+        AtlasEntity b1 = new AtlasEntity("B");
+        b1.setAttribute(NAME, "b1_name");
+
+        AtlasEntity b2 = new AtlasEntity("B");
+        b2.setAttribute(NAME, "b2_name");
+
+        AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
+        entitiesWithExtInfo.addEntity(a1);
+        entitiesWithExtInfo.addEntity(a2);
+        entitiesWithExtInfo.addEntity(a3);
+        entitiesWithExtInfo.addEntity(b1);
+        entitiesWithExtInfo.addEntity(b2);
+        entityStore.createOrUpdate(new AtlasEntityStream(entitiesWithExtInfo) , false);
+
+        AtlasEntity b1PartialUpdate = new AtlasEntity("B");
+        b1PartialUpdate.setRelationshipAttribute("manyToManyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+        init();
+        EntityMutationResponse response = entityStore.updateByUniqueAttributes(typeRegistry.getEntityTypeByName("B"),
+                                                                               Collections.singletonMap(NAME, b1.getAttribute(NAME)),
+                                                                               new AtlasEntityWithExtInfo(b1PartialUpdate));
+
+        List<AtlasEntityHeader> updatedEntityHeaders = response.getPartialUpdatedEntities();
+        assertEquals(updatedEntityHeaders.size(), 3);
+
+        AtlasEntitiesWithExtInfo updatedEntities = entityStore.getByIds(ImmutableList.of(a1.getGuid(), a2.getGuid(), b1.getGuid()));
+
+        AtlasEntity b1Entity = updatedEntities.getEntity(b1.getGuid());
+        verifyRelationshipAttributeList(b1Entity, "manyToManyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+        AtlasEntity a1Entity = updatedEntities.getEntity(a1.getGuid());
+        verifyRelationshipAttributeList(a1Entity, "manyB", ImmutableList.of(getAtlasObjectId(b1)));
+
+        AtlasEntity a2Entity = updatedEntities.getEntity(a2.getGuid());
+        verifyRelationshipAttributeList(a2Entity, "manyB", ImmutableList.of(getAtlasObjectId(b1)));
+    }
+
+    protected abstract void verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1, AtlasEntity b);
+
+    protected abstract void verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity entity) throws Exception;
+
+    protected abstract void verifyRelationshipAttributeUpdate_NonComposite_ManyToOne(AtlasEntity a1, AtlasEntity a2, AtlasEntity a3, AtlasEntity b);
+
+    private static void assertObjectIdsContains(List<AtlasObjectId> objectIds, AtlasObjectId objectId) {
+        assertTrue(CollectionUtils.isNotEmpty(objectIds));
+        assertTrue(objectIds.contains(objectId));
+    }
+
+    private static void assertObjectIdEquals(AtlasObjectId objId1, AtlasObjectId objId2) {
+        assertTrue(objId1.equals(objId2));
+    }
+
+    private static List<AtlasObjectId> toAtlasObjectIds(Object objectIds) {
+        if (objectIds instanceof List) {
+            return (List<AtlasObjectId>) objectIds;
+        }
+
+        return null;
+    }
+
+    private static AtlasObjectId toAtlasObjectId(Object objectId) {
+        if (objectId instanceof AtlasObjectId) {
+            return (AtlasObjectId) objectId;
+        }
+
+        return null;
+    }
+
+    private AtlasEntity getEntityFromStore(String guid) throws AtlasBaseException {
+        AtlasEntityWithExtInfo entity = guid != null ? entityStore.getById(guid) : null;
+
+        return entity != null ? entity.getEntity() : null;
+    }
+
+    protected static void verifyRelationshipAttributeList(AtlasEntity entity, String relationshipAttrName, List<AtlasObjectId> expectedValues) {
+        Object refValue = entity.getRelationshipAttribute(relationshipAttrName);
+        assertTrue(refValue instanceof List);
+
+        List<AtlasObjectId> refList = (List<AtlasObjectId>) refValue;
+        assertEquals(refList.size(), expectedValues.size());
+
+        if (expectedValues.size() > 0) {
+            assertTrue(refList.containsAll(expectedValues));
+        }
+    }
+
+    protected static void verifyRelationshipAttributeValue(AtlasEntity entity, String relationshipAttrName, String expectedGuid) {
+        Object refValue = entity.getRelationshipAttribute(relationshipAttrName);
+        if (expectedGuid == null) {
+            assertNull(refValue);
+        }
+        else {
+            assertTrue(refValue instanceof AtlasObjectId);
+            AtlasObjectId referencedObjectId = (AtlasObjectId) refValue;
+            assertEquals(referencedObjectId.getGuid(), expectedGuid);
+        }
+    }
+}
\ No newline at end of file