ATLAS-2481: entity-classification edges - labelled as 'classifiedAs', with additional properties

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 813fc87..605742d 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -116,9 +116,12 @@
     public static final String ATTRIBUTE_NAME_VERSION  = "version";
     public static final String TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct";
 
-    public static final String CLASSIFICATION_ENTITY_GUID          = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
-    public static final String CLASSIFICATION_PROPAGATE_KEY        = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
-    public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
+    public static final String CLASSIFICATION_ENTITY_GUID                     = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
+    public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY            = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
+    public static final String CLASSIFICATION_VERTEX_PROPAGATE_KEY            = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
+    public static final String CLASSIFICATION_EDGE_NAME_PROPERTY_KEY          = INTERNAL_PROPERTY_KEY_PREFIX + "name";
+    public static final String CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "isPropagated";
+    public static final String CLASSIFICATION_LABEL                           = "classifiedAs";
 
     private Constants() {
     }
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index 09f584e..b1e2c5d 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -105,6 +105,15 @@
     AtlasGraphIndex getGraphIndex(String indexName);
 
     /**
+     * Checks if a vertex-centric edge exists already.
+     *
+     * @param label
+     * @param indexName
+     * @return
+     */
+    boolean edgeIndexExist(String label, String indexName);
+
+    /**
      * Creates a mixed Vertex index for the graph.
      *
      * @param name the name of the index to create
@@ -123,6 +132,16 @@
     void createEdgeMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys);
 
     /**
+     * Creates a vertex-centric edge index for the graph.
+     *
+     * @param label edge label name
+     * @param indexName name of the edge index
+     * @param edgeDirection direction of the edge to index
+     * @param propertyKeys edge property keys to be added to the index
+     */
+    void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys);
+
+    /**
      * Creates a full text index for the given property.
      *
      * @param index the name of the index to create
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
index 73db22e..7bdbeab 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
@@ -52,6 +52,26 @@
      */
     AtlasGraphQuery<V, E> in(String propertyKey, Collection<?> values);
 
+    /**
+     * Executes the query and returns the matching edges.
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges();
+
+    /**
+     * Executes the query and returns the matching edges till the max limit
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges(int limit);
+
+    /**
+     * Executes the query and returns the matching edges from given offset till the max limit
+     * @param offset starting offset
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges(int offset, int limit);
 
     /**
      * Executes the query and returns the matching vertices.
@@ -60,12 +80,6 @@
     Iterable<AtlasVertex<V, E>> vertices();
 
     /**
-     * Executes the query and returns the matching edges.
-     * @return
-     */
-    Iterable<AtlasEdge<V, E>> edges();
-
-    /**
      * Executes the query and returns the matching vertices from given offset till the max limit
      * @param limit max number of vertices
      * @return
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
index 9beb6a3..1b69f74 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
@@ -67,4 +67,21 @@
      * @return
      */
     long count();
+
+    /**
+     * Specifies the edge label that should be queried.
+     *
+     * @param label
+     * @return
+     */
+    AtlasVertexQuery<V, E> label(String label);
+
+    /**
+     * Returns edges that matches property key and value.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    AtlasVertexQuery<V, E> has(String key, Object value);
 }
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
index 5c57d6f..7566559 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
@@ -48,6 +48,21 @@
 
     /**
      * Executes graph query
+     * @param limit Max edges to return
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges(int limit);
+
+    /**
+     * Executes graph query
+     * @param offset Starting offset
+     * @param limit Max edges to return
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges(int offset, int limit);
+
+    /**
+     * Executes graph query
      * @param limit Max vertices to return
      * @return
      */
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
index 163bf48..96b9705 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
@@ -157,6 +157,45 @@
     }
 
     @Override
+    public Iterable<AtlasEdge<V, E>> edges(int limit) {
+        return edges(0, limit);
+    }
+
+    @Override
+    public Iterable<AtlasEdge<V, E>> edges(int offset, int limit) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Executing: " + queryCondition);
+        }
+
+        Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
+        Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
+
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+        Set<AtlasEdge<V, E>> result = new HashSet<>();
+        long resultIdx = 0;
+        for(AndCondition andExpr : queryCondition.getAndTerms()) {
+            if (result.size() == limit) {
+                break;
+            }
+
+            NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+            for(AtlasEdge<V, E> edge : andQuery.edges(offset + limit)) {
+                if (resultIdx >= offset) {
+                    result.add(edge);
+
+                    if (result.size() == limit) {
+                        break;
+                    }
+                }
+
+                resultIdx++;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
     public Iterable<AtlasVertex<V, E>> vertices(int limit) {
         return vertices(0, limit);
     }
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index cf1965b..4e18432 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.base.Preconditions;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.janusgraph.core.Cardinality;
 import org.janusgraph.core.EdgeLabel;
 import org.janusgraph.core.PropertyKey;
@@ -83,6 +85,22 @@
     }
 
     @Override
+    public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
+        EdgeLabel edgeLabel = management.getEdgeLabel(label);
+
+        if (edgeLabel == null) {
+            edgeLabel = management.makeEdgeLabel(label).make();
+        }
+
+        Direction     direction = AtlasJanusObjectFactory.createDirection(edgeDirection);
+        PropertyKey[] keys      = AtlasJanusObjectFactory.createPropertyKeys(propertyKeys);
+
+        if (management.getRelationIndex(edgeLabel, indexName) == null) {
+            management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
+        }
+    }
+
+    @Override
     public void createFullTextMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
         IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class);
 
@@ -192,6 +210,13 @@
     }
 
     @Override
+    public boolean edgeIndexExist(String label, String indexName) {
+        EdgeLabel edgeLabel = management.getEdgeLabel(label);
+
+        return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
+    }
+
+    @Override
     public void createVertexCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) {
         IndexBuilder indexBuilder = management.buildIndex(propertyName, Vertex.class);
 
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusObjectFactory.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusObjectFactory.java
index 6d31c7b..b631510 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusObjectFactory.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusObjectFactory.java
@@ -25,6 +25,8 @@
 import org.janusgraph.core.Cardinality;
 import org.janusgraph.core.PropertyKey;
 
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Factory that serves up instances of Janus/Tinkerpop classes that correspond to
@@ -61,7 +63,7 @@
     /**
      * Converts a Multiplicity to a Cardinality.
      *
-     * @param multiplicity
+     * @param cardinality
      * @return
      */
     public static Cardinality createCardinality(AtlasCardinality cardinality) {
@@ -82,4 +84,16 @@
         return ((AtlasJanusPropertyKey)key).getWrappedPropertyKey();
     }
 
+    public static PropertyKey[] createPropertyKeys(List<AtlasPropertyKey> keys) {
+        PropertyKey[] ret = new PropertyKey[keys.size()];
+
+        int i = 0;
+        for (AtlasPropertyKey key : keys) {
+            ret[i] = createPropertyKey(key);
+
+            i++;
+        }
+
+        return ret;
+    }
 }
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertexQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertexQuery.java
index 30c6e6e..9b34c0f 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertexQuery.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertexQuery.java
@@ -76,5 +76,15 @@
         return query.count();
     }
 
+    @Override
+    public AtlasVertexQuery<AtlasJanusVertex, AtlasJanusEdge> label(String label) {
+        query.labels(label);
+        return this;
+    }
 
+    @Override
+    public AtlasVertexQuery<AtlasJanusVertex, AtlasJanusEdge> has(String key, Object value) {
+        query.has(key, value);
+        return this;
+    }
 }
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
index 679e3dc..d3c976d 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.graphdb.janus.query;
 
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.janusgraph.core.JanusGraphEdge;
 import org.janusgraph.core.JanusGraphQuery;
 import org.janusgraph.core.JanusGraphVertex;
@@ -64,6 +65,28 @@
     }
 
     @Override
+    public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int limit) {
+        Iterable<JanusGraphEdge> it = query.limit(limit).edges();
+        return graph.wrapEdges(it);
+    }
+
+    @Override
+    public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int offset, int limit) {
+        List<Edge>               result = new ArrayList<>(limit);
+        Iterator<? extends Edge> iter   = query.limit(offset + limit).edges().iterator();
+
+        for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+            if (resultIdx < offset) {
+                continue;
+            }
+
+            result.add(iter.next());
+        }
+
+        return graph.wrapEdges(result);
+    }
+
+    @Override
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> vertices(int limit) {
         Iterable<JanusGraphVertex> it = query.limit(limit).vertices();
         return graph.wrapVertices(it);
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0GraphManagement.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0GraphManagement.java
index 32e4d02..fadd596 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0GraphManagement.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0GraphManagement.java
@@ -20,14 +20,15 @@
 import com.thinkaurelius.titan.core.Cardinality;
 import com.thinkaurelius.titan.core.EdgeLabel;
 import com.thinkaurelius.titan.core.PropertyKey;
-import com.thinkaurelius.titan.core.schema.Mapping;
 import com.thinkaurelius.titan.core.schema.PropertyKeyMaker;
 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
 import com.thinkaurelius.titan.core.schema.TitanManagement;
+import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Element;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.atlas.repository.graphdb.AtlasCardinality;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
 import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
 import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
@@ -62,6 +63,20 @@
     }
 
     @Override
+    public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
+        EdgeLabel edgeLabel = management.getEdgeLabel(label);
+
+        if (edgeLabel == null) {
+            edgeLabel = management.makeEdgeLabel(label).make();
+        }
+
+        Direction     direction = TitanObjectFactory.createDirection(edgeDirection);
+        PropertyKey[] keys      = TitanObjectFactory.createPropertyKeys(propertyKeys);
+
+        management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
+    }
+
+    @Override
     public void createFullTextMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
     }
 
@@ -201,4 +216,10 @@
         return GraphDbObjectFactory.createGraphIndex(index);
     }
 
+    @Override
+    public boolean edgeIndexExist(String label, String indexName) {
+        EdgeLabel edgeLabel = management.getEdgeLabel(label);
+
+        return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
+    }
 }
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
index 091e7d4..39e9d6d 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
@@ -77,4 +77,16 @@
     public long count() {
         return vertexQuery.count();
     }
+
+    @Override
+    public AtlasVertexQuery<Titan0Vertex, Titan0Edge> label(String label) {
+        vertexQuery.labels(label);
+        return this;
+    }
+
+    @Override
+    public AtlasVertexQuery<Titan0Vertex, Titan0Edge> has(String key, Object value) {
+        vertexQuery.has(key, value);
+        return this;
+    }
 }
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/TitanObjectFactory.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/TitanObjectFactory.java
index 8dff1e4..b7c25b8 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/TitanObjectFactory.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/TitanObjectFactory.java
@@ -25,6 +25,9 @@
 import com.thinkaurelius.titan.core.PropertyKey;
 import com.tinkerpop.blueprints.Direction;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Factory that serves up instances of Titan/Tinkerpop classes that correspond to
  * graph database abstraction layer/Atlas classes.
@@ -60,7 +63,7 @@
     /**
      * Converts a Multiplicity to a Cardinality.
      *
-     * @param multiplicity
+     * @param cardinality
      * @return
      */
     public static Cardinality createCardinality(AtlasCardinality cardinality) {
@@ -81,4 +84,16 @@
         return ((Titan0PropertyKey)key).getWrappedPropertyKey();
     }
 
+    public static PropertyKey[] createPropertyKeys(List<AtlasPropertyKey> keys) {
+        PropertyKey[] ret = new PropertyKey[keys.size()];
+
+        int i = 0;
+        for (AtlasPropertyKey key : keys) {
+            ret[i] = createPropertyKey(key);
+
+            i++;
+        }
+
+        return ret;
+    }
 }
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
index 60e5107..2903ae2 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
@@ -23,6 +23,7 @@
 import com.thinkaurelius.titan.core.attribute.Text;
 import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
 import com.tinkerpop.blueprints.Compare;
+import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
@@ -64,6 +65,29 @@
         Iterable it = query.edges();
         return graph.wrapEdges(it);
     }
+
+    @Override
+    public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
+        Iterable it = query.limit(limit).edges();
+        return graph.wrapEdges(it);
+    }
+
+    @Override
+    public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int offset, int limit) {
+        List<Edge>     result = new ArrayList<>(limit);
+        Iterator<Edge> iter   = query.limit(offset + limit).edges().iterator();
+
+        for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+            if (resultIdx < offset) {
+                continue;
+            }
+
+            result.add(iter.next());
+        }
+
+        return graph.wrapEdges(result);
+    }
+
     @Override
     public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
         Iterable it = query.limit(limit).vertices();
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 6ac7786..4a86670 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -125,6 +125,8 @@
     CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06B", "Update to classification {0} is not allowed from propagated entity"),
     CLASSIFICATION_DELETE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06C", "Delete of classification {0} is not allowed from propagated entity"),
     CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-06D", "Classification {0} is not associated with entity"),
+    NO_CLASSIFICATIONS_FOUND_FOR_ENTITY(400, "ATLAS-400-00-06E", "No classifications associated with entity: {0}"),
+    INVALID_CLASSIFICATION_PARAMS(400, "ATLAS-400-00-06F", "Invalid classification parameters passed for {0} operation for entity: {1}"),
 
     UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
 
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 4a18ed1..36fb0bc 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -462,7 +462,7 @@
                 AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, resultAttributes);
 
                 if(searchParameters.getIncludeClassificationAttributes()) {
-                    entity.setClassifications(entityRetriever.getClassifications(atlasVertex));
+                    entity.setClassifications(entityRetriever.getAllClassifications(atlasVertex));
                 }
 
                 ret.addEntity(entity);
diff --git a/repository/src/main/java/org/apache/atlas/query/GremlinClause.java b/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
index 77212bc..454b343 100644
--- a/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
+++ b/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
@@ -46,7 +46,7 @@
     TEXT_CONTAINS("has('%s', org.janusgraph.core.attribute.Text.textRegex(%s))"),
     TEXT_PREFIX("has('%s', org.janusgraph.core.attribute.Text.textPrefix(%s))"),
     TEXT_SUFFIX("has('%s', org.janusgraph.core.attribute.Text.textRegex(\".*\" + %s))"),
-    TRAIT("or(has('__traitNames', within('%s')), has('__propagatedTraitNames', within('%s')))"),
+    TRAIT("outE('classifiedAs').has('__name', within('%s')).outV()"),
     SELECT_NOOP_FN("def f(r){ r }; "),
     SELECT_FN("def f(r){ t=[[%s]]; %s r.each({t.add([%s])}); t.unique(); }; "),
     SELECT_ONLY_AGG_FN("def f(r){ t=[[%s]]; %s t.add([%s]); t;}; "),
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index e609366..5672d9d 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -37,6 +37,7 @@
 import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graphdb.AtlasCardinality;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
 import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
@@ -59,6 +60,7 @@
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -66,6 +68,9 @@
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
 import static org.apache.atlas.repository.Constants.BACKING_INDEX;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
 import static org.apache.atlas.repository.Constants.EDGE_INDEX;
 import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
@@ -280,6 +285,11 @@
             createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true);
             createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
 
+            // create vertex-centric index
+            createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
+            createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
+            createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, Arrays.asList(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY));
+
             // create edge indexes
             createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true);
 
@@ -475,6 +485,55 @@
         return propertyKey;
     }
 
+    private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection,
+                                          String propertyName, Class propertyClass, AtlasCardinality cardinality) {
+        AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
+
+        if (propertyKey == null) {
+            propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for property: {} of type: {} ",
+                        edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
+        }
+
+        final String indexName = edgeLabel + propertyKey.getName();
+
+        if (!management.edgeIndexExist(edgeLabel, indexName)) {
+            management.createEdgeIndex(edgeLabel, indexName, edgeDirection, Collections.singletonList(propertyKey));
+
+            LOG.info("Created vertex-centric index for edge label: {} direction: {} for property: {} of type: {}",
+                    edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
+        }
+    }
+
+    private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection, List<String> propertyNames) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for properties: {}",
+                    edgeLabel, edgeDirection.name(), propertyNames);
+        }
+
+        String                 indexName    = edgeLabel;
+        List<AtlasPropertyKey> propertyKeys = new ArrayList<>();
+
+        for (String propertyName : propertyNames) {
+            AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
+
+            if (propertyKey != null) {
+                propertyKeys.add(propertyKey);
+                indexName = indexName + propertyKey.getName();
+            }
+        }
+
+        if (!management.edgeIndexExist(edgeLabel, indexName) && CollectionUtils.isNotEmpty(propertyKeys)) {
+            management.createEdgeIndex(edgeLabel, indexName, edgeDirection, propertyKeys);
+
+            LOG.info("Created vertex-centric index for edge label: {} direction: {} for properties: {}", edgeLabel, edgeDirection.name(), propertyNames);
+        }
+    }
+
+
     private AtlasPropertyKey createEdgeIndex(AtlasGraphManagement management, String propertyName, Class propertyClass,
                                              AtlasCardinality cardinality, boolean createCompositeIndex) {
         AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 97e4941..9e8077c 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -26,11 +26,11 @@
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -74,8 +74,11 @@
 import java.util.UUID;
 
 import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
 import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@@ -181,24 +184,39 @@
         return vertexWithoutIdentity;
     }
 
+    public AtlasEdge addClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex, boolean isPropagated) {
+        AtlasEdge ret = addEdge(entityVertex, classificationVertex, CLASSIFICATION_LABEL);
+
+        if (ret != null) {
+            AtlasGraphUtilsV1.setProperty(ret, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex));
+            AtlasGraphUtilsV1.setProperty(ret, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated);
+        }
+
+        return ret;
+    }
+
     public AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
+        AtlasEdge ret;
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex));
         }
 
-        AtlasEdge edge = graph.addEdge(fromVertex, toVertex, edgeLabel);
+        ret = graph.addEdge(fromVertex, toVertex, edgeLabel);
 
-        setProperty(edge, Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
-        setProperty(edge, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
-        setProperty(edge, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
-        setProperty(edge, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
-        setProperty(edge, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
+        if (ret != null) {
+            setProperty(ret, Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
+            setProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
+            setProperty(ret, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
+            setProperty(ret, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
+            setProperty(ret, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Added {}", string(edge));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Added {}", string(ret));
+            }
         }
 
-        return edge;
+        return ret;
     }
 
     public AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) throws RepositoryException {
@@ -364,6 +382,91 @@
         return null;
     }
 
+    public static boolean isPropagationEnabled(AtlasVertex classificationVertex) {
+        boolean ret = false;
+
+        if (classificationVertex != null) {
+            Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_VERTEX_PROPAGATE_KEY, Boolean.class);
+
+            ret = (enabled == null) ? true : enabled;
+        }
+
+        return ret;
+    }
+
+    public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
+        AtlasVertex ret  = null;
+        AtlasEdge   edge = getClassificationEdge(entityVertex, classificationName);
+
+        if (edge != null) {
+            ret = edge.getInVertex();
+        }
+
+        return ret;
+    }
+
+    public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, String classificationName) {
+        AtlasEdge ret   = null;
+        Iterable  edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
+                                              .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
+                                              .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();
+        if (edges != null) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
+
+            if (iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
+
+                ret = (edge != null) ? edge : null;
+            }
+        }
+
+        return ret;
+    }
+
+    public static List<String> getPropagatedEntities(AtlasVertex classificationVertex) {
+        List<String>      ret            = new ArrayList<>();
+        List<AtlasVertex> entityVertices = getPropagatedEntityVertices(classificationVertex);
+
+        if (CollectionUtils.isNotEmpty(entityVertices)) {
+            for (AtlasVertex entityVertex : entityVertices) {
+                ret.add(getGuid(entityVertex));
+            }
+        }
+
+        return ret;
+    }
+
+    public static List<AtlasVertex> getPropagatedEntityVertices(AtlasVertex classificationVertex) {
+        List<AtlasVertex> ret             = new ArrayList<>();
+        List<AtlasEdge>   propagatedEdges = getPropagatedEdges(classificationVertex);
+
+        if (CollectionUtils.isNotEmpty(propagatedEdges)) {
+            for (AtlasEdge propagatedEdge : propagatedEdges) {
+                ret.add(propagatedEdge.getOutVertex());
+            }
+        }
+
+        return ret;
+    }
+
+    public static List<AtlasEdge> getPropagatedEdges(AtlasVertex classificationVertex) {
+        List<AtlasEdge> ret   = new ArrayList<>();
+        Iterable        edges = classificationVertex.query().direction(AtlasEdgeDirection.IN).label(CLASSIFICATION_LABEL)
+                                                    .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, true)
+                                                    .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex)).edges();
+        if (edges != null) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
+
+            while (iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
+
+                ret.add(edge);
+            }
+        }
+
+        return ret;
+    }
+
     public static Iterator<AtlasEdge> getIncomingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
         return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
     }
@@ -783,52 +886,31 @@
         return traitName;
     }
 
-    public static String getPropagatedEdgeLabel(String classificationName) {
-        return "propagated:" + classificationName;
+    public static List<String> getTraitNames(AtlasVertex entityVertex) {
+        return getTraitNames(entityVertex, false);
     }
 
-    public static List<String> getAllTraitNames(AtlasVertex<?, ?> entityVertex) {
-        ArrayList<String> ret = new ArrayList<>();
+    public static List<String> getAllTraitNames(AtlasVertex entityVertex) {
+        return getTraitNames(entityVertex, null);
+    }
 
-        if (entityVertex != null) {
-            Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+    public static List<String> getTraitNames(AtlasVertex entityVertex, Boolean propagated) {
+        List<String>     ret   = new ArrayList<>();
+        AtlasVertexQuery query = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL);
 
-            if (CollectionUtils.isNotEmpty(traitNames)) {
-                ret.addAll(traitNames);
-            }
-
-            traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
-
-            if (CollectionUtils.isNotEmpty(traitNames)) {
-                ret.addAll(traitNames);
-            }
+        if (propagated != null) {
+            query = query.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, propagated);
         }
 
-        return ret;
-    }
+        Iterable edges = query.edges();
 
-    public static List<String> getTraitNames(AtlasVertex<?,?> entityVertex) {
-        ArrayList<String> ret = new ArrayList<>();
+        if (edges != null) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
 
-        if (entityVertex != null) {
-            Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+            while (iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
 
-            if (CollectionUtils.isNotEmpty(traitNames)) {
-                ret.addAll(traitNames);
-            }
-        }
-
-        return ret;
-    }
-
-    public static List<String> getPropagatedTraitNames(AtlasVertex<?,?> entityVertex) {
-        ArrayList<String> ret = new ArrayList<>();
-
-        if (entityVertex != null) {
-            Collection<String> traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
-
-            if (CollectionUtils.isNotEmpty(traitNames)) {
-                ret.addAll(traitNames);
+                ret.add(AtlasGraphUtilsV1.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class));
             }
         }
 
@@ -918,25 +1000,6 @@
         return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
     }
 
-    public List<AtlasVertex> getPropagatedEntityVerticesFromClassification(AtlasVertex classificationVertex) {
-        List<AtlasVertex> ret = new ArrayList<>();
-
-        if (classificationVertex != null) {
-            String              classificationName = getTypeName(classificationVertex);
-            Iterator<AtlasEdge> iterator           = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
-
-            while (iterator != null && iterator.hasNext()) {
-                AtlasEdge propagatedEdge = iterator.next();
-
-                if (propagatedEdge != null) {
-                    ret.add(propagatedEdge.getOutVertex());
-                }
-            }
-        }
-
-        return ret;
-    }
-
     /**
      * For the given type, finds an unique attribute and checks if there is an existing instance with the same
      * unique value
@@ -1092,18 +1155,17 @@
     }
 
     public static AtlasVertex getAssociatedEntityVertex(AtlasVertex classificationVertex) {
-        AtlasVertex ret = null;
+        AtlasVertex ret   = null;
+        Iterable    edges = classificationVertex.query().direction(AtlasEdgeDirection.IN).label(CLASSIFICATION_LABEL)
+                                                .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
+                                                .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex)).edges();
+        if (edges != null) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
 
-        if (classificationVertex != null) {
-            Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getTypeName(classificationVertex));
-
-            while (iterator != null && iterator.hasNext()) {
+            if (iterator != null && iterator.hasNext()) {
                 AtlasEdge edge = iterator.next();
 
-                if (edge != null) {
-                    ret = edge.getOutVertex();
-                    break;
-                }
+                ret = edge.getOutVertex();
             }
         }
 
@@ -1564,7 +1626,7 @@
     }
 
     public static void removePropagatedTraitNameFromVertex(AtlasVertex entityVertex, String propagatedTraitName) {
-        List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
+        List<String> propagatedTraitNames = getTraitNames(entityVertex, true);
 
         if (CollectionUtils.isNotEmpty(propagatedTraitNames) && propagatedTraitNames.contains(propagatedTraitName)) {
             propagatedTraitNames.remove(propagatedTraitName);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 4056eb1..18ed533 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -40,6 +40,7 @@
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +48,12 @@
 import java.util.*;
 
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
 import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
 import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
-import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
 import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
@@ -350,21 +351,20 @@
         }
     }
 
-    public void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
+    public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
+        List<AtlasVertex> ret = new ArrayList<>();
+
         if (classificationVertex != null) {
-            String              classificationName = getTypeName(classificationVertex);
-            Iterator<AtlasEdge> iterator           = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
+            String          classificationName = getTypeName(classificationVertex);
+            List<AtlasEdge> propagatedEdges    = getPropagatedEdges(classificationVertex);
 
-            // remove classification from propagated entity vertices
-            while (iterator != null && iterator.hasNext()) {
-                AtlasEdge propagatedEdge = iterator.next();
-
-                if (propagatedEdge != null) {
+            if (CollectionUtils.isNotEmpty(propagatedEdges)) {
+                for (AtlasEdge propagatedEdge : propagatedEdges) {
                     AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex();
 
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
-                                getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
+                                getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
                     }
 
                     removePropagatedTraitName(propagatedEntityVertex, classificationName);
@@ -372,14 +372,18 @@
                     deleteEdge(propagatedEdge, true);
 
                     updateModificationMetadata(propagatedEntityVertex);
+
+                    ret.add(propagatedEntityVertex);
                 }
             }
         }
+
+        return ret;
     }
 
     private void removePropagatedTraitName(AtlasVertex entityVertex, String classificationName) {
         if (entityVertex != null && StringUtils.isNotEmpty(classificationName)) {
-            List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
+            List<String> propagatedTraitNames = getTraitNames(entityVertex, true);
 
             propagatedTraitNames.remove(classificationName);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 73e5aa2..57aa41b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -22,7 +22,6 @@
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TimeBoundary;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -74,15 +73,19 @@
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
 import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEntities;
 import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
 import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
+import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
 import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
 import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
@@ -260,7 +263,6 @@
 
         AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, classificationType.getAllSuperTypes());
         AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid());
-        AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_PROPAGATE_KEY, classification.isPropagate());
 
         return ret;
     }
@@ -1308,10 +1310,10 @@
                 throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
             }
 
-            String                    entityTypeName            = AtlasGraphUtilsV1.getTypeName(entityVertex);
-            final AtlasEntityType     entityType                = typeRegistry.getEntityTypeByName(entityTypeName);
-            List<AtlasVertex>         propagatedEntityVertices  = null;
-            List<AtlasClassification> propagagedClassifications = null;
+            final String                                entityTypeName        = AtlasGraphUtilsV1.getTypeName(entityVertex);
+            final AtlasEntityType                       entityType            = typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasVertex>                           entitiesToPropagateTo = null;
+            Map<AtlasVertex, List<AtlasClassification>> propagations          = null;
 
             for (AtlasClassification classification : classifications) {
                 String  classificationName = classification.getTypeName();
@@ -1338,22 +1340,30 @@
 
                 if (propagateTags) {
                     // compute propagatedEntityVertices only once
-                    if (propagatedEntityVertices == null) {
-                        propagatedEntityVertices = graphHelper.getImpactedVertices(guid);
+                    if (entitiesToPropagateTo == null) {
+                        entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
                     }
 
-                    if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                    if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                        if (propagations == null) {
+                            propagations = new HashMap<>(entitiesToPropagateTo.size());
+
+                            for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
+                                propagations.put(entityToPropagateTo, new ArrayList<>());
+                            }
+                        }
+
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(propagatedEntityVertices));
+                            LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
                         }
 
-                        if (propagagedClassifications == null) {
-                            propagagedClassifications = new ArrayList<>();
+                        List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+                        if (entitiesPropagatedTo != null) {
+                            for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
+                                propagations.get(entityPropagatedTo).add(classification);
+                            }
                         }
-
-                        propagagedClassifications.add(classification);
-
-                        addTagPropagation(classificationVertex, propagatedEntityVertices);
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName);
@@ -1369,171 +1379,235 @@
             // notify listeners on classification addition
             List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
 
-            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
-                notificationVertices.addAll(propagatedEntityVertices);
+            if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                notificationVertices.addAll(entitiesToPropagateTo);
             }
 
             for (AtlasVertex vertex : notificationVertices) {
                 String                    entityGuid           = GraphHelper.getGuid(vertex);
                 AtlasEntityWithExtInfo    entityWithExtInfo    = instanceConverter.getAndCacheEntity(entityGuid);
                 AtlasEntity               entity               = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
-                List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : Collections.emptyList();
+                List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : propagations.get(vertex);
 
-                entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
+                if (CollectionUtils.isNotEmpty(addedClassifications)) {
+                    entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
+                }
             }
         }
     }
 
-    public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
-        if (CollectionUtils.isNotEmpty(classificationNames)) {
-            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
+    public void deleteClassifications(String entityGuid, List<String> classificationNames) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(classificationNames)) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "delete", entityGuid);
+        }
 
-            if (entityVertex == null) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-            }
+        AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(entityGuid);
 
-            List<String> traitNames = getTraitNames(entityVertex);
+        if (entityVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
+        }
 
-            validateClassificationExists(traitNames, classificationNames);
+        List<String> traitNames = getTraitNames(entityVertex);
 
-            Set<String> impactedEntities = new HashSet<String>() {{ add(guid); }};
+        if (CollectionUtils.isEmpty(traitNames)) {
+            throw new AtlasBaseException(AtlasErrorCode.NO_CLASSIFICATIONS_FOUND_FOR_ENTITY, entityGuid);
+        }
 
-            for (String classificationName : classificationNames) {
-                AtlasEdge   classificationEdge   = graphHelper.getEdgeForLabel(entityVertex, getTraitLabel(classificationName));
-                AtlasVertex classificationVertex = classificationEdge.getInVertex();
+        validateClassificationExists(traitNames, classificationNames);
 
-                // remove classification from propagated entity vertices
-                boolean propagationEnabled = entityRetriever.isPropagationEnabled(classificationVertex);
+        Map<AtlasVertex, List<String>> removedClassifications = new HashMap<>();
 
-                if (propagationEnabled) {
-                    List<AtlasVertex> impactedEntityVertices = graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex);
+        for (String classificationName : classificationNames) {
+            AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
 
-                    if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
-                        removeTagPropagation(classificationVertex);
+            // remove classification from propagated entities if propagation is turned on
+            if (isPropagationEnabled(classificationVertex)) {
+                List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
 
-                        for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
-                            impactedEntities.add(GraphHelper.getGuid(impactedEntityVertex));
+                if (CollectionUtils.isNotEmpty(impactedVertices)) {
+                    for (AtlasVertex impactedVertex : impactedVertices) {
+                        List<String> classifications = removedClassifications.get(impactedVertex);
+
+                        if (classifications == null) {
+                            classifications = new ArrayList<>();
+
+                            removedClassifications.put(impactedVertex, classifications);
                         }
+
+                        classifications.add(classificationName);
                     }
                 }
-
-                // remove classification from associated entity vertex
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, getTypeName(entityVertex), guid, getTraitLabel(classificationName));
-                }
-
-                deleteHandler.deleteEdgeReference(classificationEdge, CLASSIFICATION, false, true, entityVertex);
-
-                traitNames.remove(classificationName);
             }
 
-            updateTraitNamesProperty(entityVertex, traitNames);
-
-            updateModificationMetadata(entityVertex);
-
-            for (String entityGuid : impactedEntities) {
-                AtlasEntityWithExtInfo    entityWithExtInfo          = instanceConverter.getAndCacheEntity(entityGuid);
-                AtlasEntity               entity                     = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
-                List<String>              deletedClassificationNames = StringUtils.equals(entityGuid, guid) ? classificationNames : Collections.emptyList();
-
-                entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+            // remove classifications from associated entity
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
+                            getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL);
             }
+
+            AtlasEdge edge = getClassificationEdge(entityVertex, classificationName);
+
+            deleteHandler.deleteEdgeReference(edge, CLASSIFICATION, false, true, entityVertex);
+
+            traitNames.remove(classificationName);
+        }
+
+        removedClassifications.put(entityVertex, classificationNames);
+
+        updateTraitNamesProperty(entityVertex, traitNames);
+
+        updateModificationMetadata(entityVertex);
+
+        for (Map.Entry<AtlasVertex, List<String>> entry : removedClassifications.entrySet()) {
+            String                 guid                       = GraphHelper.getGuid(entry.getKey());
+            List<String>           deletedClassificationNames = entry.getValue();
+            AtlasEntityWithExtInfo entityWithExtInfo          = instanceConverter.getAndCacheEntity(guid);
+            AtlasEntity            entity                     = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+
+            entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
         }
     }
 
     public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
-        if (CollectionUtils.isNotEmpty(classifications)) {
-            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
+        if (CollectionUtils.isEmpty(classifications)) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "update", guid);
+        }
 
-            if (entityVertex == null) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
+        if (entityVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        }
+
+        String                    entityTypeName           = AtlasGraphUtilsV1.getTypeName(entityVertex);
+        AtlasEntityType           entityType               = typeRegistry.getEntityTypeByName(entityTypeName);
+        List<AtlasClassification> updatedClassifications   = new ArrayList<>();
+        List<AtlasVertex>         entitiesToPropagateTo    = new ArrayList<>();
+        Map<AtlasVertex, List<AtlasClassification>> addedPropagations   = null;
+        Map<AtlasVertex, List<String>>              removedPropagations = null;
+
+        for (AtlasClassification classification : classifications) {
+            String classificationName       = classification.getTypeName();
+            String classificationEntityGuid = classification.getEntityGuid();
+
+            if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
+                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
             }
 
-            String                    entityTypeName           = AtlasGraphUtilsV1.getTypeName(entityVertex);
-            AtlasEntityType           entityType               = typeRegistry.getEntityTypeByName(entityTypeName);
-            List<AtlasClassification> updatedClassifications   = new ArrayList<>();
-            List<AtlasVertex>         propagatedEntityVertices = new ArrayList<>();
+            AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
 
-            for (AtlasClassification classification : classifications) {
-                String classificationName       = classification.getTypeName();
-                String classificationEntityGuid = classification.getEntityGuid();
+            if (classificationVertex == null) {
+                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
+            }
 
-                if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
-                    throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating classification {} for entity {}", classification, guid);
+            }
+
+            AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex);
+
+            validateAndNormalizeForUpdate(classification);
+
+            Map<String, Object> classificationAttributes = classification.getAttributes();
+
+            if (MapUtils.isNotEmpty(classificationAttributes)) {
+                for (String attributeName : classificationAttributes.keySet()) {
+                    currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName));
                 }
+            }
 
-                String    relationshipLabel  = getTraitLabel(entityTypeName, classificationName);
-                AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, relationshipLabel);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
+            }
 
-                if (classificationEdge == null) {
-                    throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
-                }
+            mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex);
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Updating classification {} for entity {}", classification, guid);
-                }
+            // handle update of 'propagate' flag
+            boolean currentTagPropagation = currentClassification.isPropagate();
+            boolean updatedTagPropagation = classification.isPropagate();
 
-                AtlasVertex         classificationVertex  = classificationEdge.getInVertex();
-                AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex);
-
-                validateAndNormalizeForUpdate(classification);
-
-                Map<String, Object> classificationAttributes = classification.getAttributes();
-
-                if (MapUtils.isNotEmpty(classificationAttributes)) {
-                    for (String attributeName : classificationAttributes.keySet()) {
-                        currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName));
-                    }
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
-                }
-
-                mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex);
-
-                // handle update of 'propagate' flag
-                boolean currentTagPropagation = currentClassification.isPropagate();
-                boolean updatedTagPropagation = classification.isPropagate();
-
-                // compute propagatedEntityVertices once and use it for subsequent iterations and notifications
-                if (CollectionUtils.isEmpty(propagatedEntityVertices)) {
-                    propagatedEntityVertices = (currentTagPropagation) ? graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex) :
-                            graphHelper.getImpactedVertices(guid);
-                }
-
-                if (currentTagPropagation != updatedTagPropagation) {
-                    if (updatedTagPropagation) {
-                        addTagPropagation(classificationVertex, propagatedEntityVertices);
-                    } else {
-                        removeTagPropagation(classificationVertex);
+            // compute propagatedEntityVertices once and use it for subsequent iterations and notifications
+            if (currentTagPropagation != updatedTagPropagation) {
+                if (updatedTagPropagation) {
+                    if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
+                        entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
                     }
 
-                    AtlasGraphUtilsV1.setProperty(classificationVertex, Constants.CLASSIFICATION_PROPAGATE_KEY, updatedTagPropagation);
+                    if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                        if (addedPropagations == null) {
+                            addedPropagations = new HashMap<>(entitiesToPropagateTo.size());
+
+                            for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
+                                addedPropagations.put(entityToPropagateTo, new ArrayList<>());
+                            }
+                        }
+
+                        List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+                        if (entitiesPropagatedTo != null) {
+                            for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
+                                addedPropagations.get(entityPropagatedTo).add(classification);
+                            }
+                        }
+                    }
+                } else {
+                    List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
+
+                    if (CollectionUtils.isNotEmpty(impactedVertices)) {
+                        if (removedPropagations == null) {
+                            removedPropagations = new HashMap<>();
+
+                            for (AtlasVertex impactedVertex : impactedVertices) {
+                                List<String> removedClassifications = removedPropagations.get(impactedVertex);
+
+                                if (removedClassifications == null) {
+                                    removedClassifications = new ArrayList<>();
+
+                                    removedPropagations.put(impactedVertex, removedClassifications);
+                                }
+
+                                removedClassifications.add(classification.getTypeName());
+                            }
+                        }
+                    }
                 }
-
-                updatedClassifications.add(currentClassification);
             }
 
-            // notify listeners on classification update
-            List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+            updatedClassifications.add(currentClassification);
+        }
 
-            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
-                notificationVertices.addAll(propagatedEntityVertices);
-            }
+        // notify listeners on classification update
+        List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
 
-            for (AtlasVertex vertex : notificationVertices) {
-                String                    entityGuid                = GraphHelper.getGuid(vertex);
-                AtlasEntityWithExtInfo    entityWithExtInfo         = instanceConverter.getAndCacheEntity(entityGuid);
-                AtlasEntity               entity                    = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
-                List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
+        if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+            notificationVertices.addAll(entitiesToPropagateTo);
+        }
 
-                entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
+        for (AtlasVertex vertex : notificationVertices) {
+            String                    entityGuid                = GraphHelper.getGuid(vertex);
+            AtlasEntityWithExtInfo    entityWithExtInfo         = instanceConverter.getAndCacheEntity(entityGuid);
+            AtlasEntity               entity                    = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+            List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
+
+            entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
+        }
+
+        if (removedPropagations != null) {
+            for (Map.Entry<AtlasVertex, List<String>> entry : removedPropagations.entrySet()) {
+                AtlasVertex            vertex                 = entry.getKey();
+                List<String>           removedClassifications = entry.getValue();
+                String                 entityGuid             = GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo entityWithExtInfo      = instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity            entity                 = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+
+                entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications);
             }
         }
     }
 
-    private void addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
+    private List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
+        List<AtlasVertex> ret = null;
+
         if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) {
             String                  classificationName = getTypeName(classificationVertex);
             AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
@@ -1545,19 +1619,27 @@
                 if (classificationType.canApplyToEntityType(entityType)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex),
-                                GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
+                                   GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
                     }
 
-                    graphHelper.addEdge(propagatedEntityVertex, classificationVertex, getPropagatedEdgeLabel(classificationName));
+                    if (ret == null) {
+                        ret = new ArrayList<>();
+                    }
+
+                    ret.add(propagatedEntityVertex);
+
+                    graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true);
 
                     addListProperty(propagatedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
                 }
             }
         }
+
+        return ret;
     }
 
-    private void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
-        deleteHandler.removeTagPropagation(classificationVertex);
+    private List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
+        return deleteHandler.removeTagPropagation(classificationVertex);
     }
 
     private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification,
@@ -1571,17 +1653,18 @@
             // if 'null', don't update existing value in the classification
         }
 
+        AtlasGraphUtilsV1.setProperty(traitInstanceVertex, Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY, classification.isPropagate());
+
         // map all the attributes to this newly created AtlasVertex
         mapAttributes(classification, traitInstanceVertex, operation, context);
 
-        // add an edge to the newly created AtlasVertex from the parent
-        String relationshipLabel = getTraitLabel(entityType.getTypeName(), classification.getTypeName());
+        AtlasEdge ret = getClassificationEdge(parentInstanceVertex, getTypeName(traitInstanceVertex));
 
-        try {
-           return graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel);
-        } catch (RepositoryException e) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        if (ret == null) {
+            ret = graphHelper.addClassificationEdge(parentInstanceVertex, traitInstanceVertex, false);
         }
+
+        return ret;
     }
 
     public void deleteClassifications(String guid) throws AtlasBaseException {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index 73ed6b4..3885157 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -63,7 +63,6 @@
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +71,7 @@
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
 import static org.apache.atlas.repository.Constants.*;
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
 import static org.apache.atlas.repository.graph.GraphHelper.edgeExists;
 import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
 import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
@@ -80,12 +80,9 @@
 import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
 import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
 import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
-import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
 import static org.apache.atlas.repository.graph.GraphHelper.removePropagatedTraitNameFromVertex;
 import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -181,7 +178,7 @@
     public AtlasEntityHeader toAtlasEntityHeaderWithClassifications(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
         AtlasEntityHeader ret = toAtlasEntityHeader(entityVertex, attributes);
 
-        ret.setClassifications(getClassifications(entityVertex));
+        ret.setClassifications(getAllClassifications(entityVertex));
 
         return ret;
     }
@@ -446,120 +443,18 @@
         }
     }
 
-    public boolean isPropagationEnabled(AtlasVertex classificationVertex) {
-        boolean ret = false;
-
-        if (classificationVertex != null) {
-            Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class);
-
-            ret = enabled == null ? true : enabled;
-        }
-
-        return ret;
-    }
-
-    public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-
-        if (instanceVertex == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
-
-        return getClassifications(instanceVertex);
-    }
-
-    public List<AtlasClassification> getClassifications(AtlasVertex instanceVertex) throws AtlasBaseException {
-        final List<AtlasClassification> classifications           = getClassifications(instanceVertex, null);
-        final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(instanceVertex, null);
-
-        classifications.addAll(propagatedClassifications);
-
-        return classifications;
-    }
-
-    public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-
-        if (instanceVertex == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
-
-        List<AtlasClassification> classifications = null;
-
-        try {
-            classifications = getClassifications(instanceVertex, classificationName);
-        } catch (AtlasBaseException excp) {
-            // ignore and look for propagated classifications
-            classifications = getPropagatedClassifications(instanceVertex, classificationName);
-        }
-
-        if(CollectionUtils.isEmpty(classifications)) {
-            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
-        }
-
-        return classifications.get(0);
-    }
-
-
-    private List<AtlasClassification> getClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
-        List<AtlasClassification> ret                 = new ArrayList<>();
-        List<String>              classificationNames = getTraitNames(instanceVertex);
-
-        if (CollectionUtils.isNotEmpty(classificationNames)) {
-            if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                if (classificationNames.contains(classificationNameFilter)) {
-                    ret.add(getClassification(instanceVertex, classificationNameFilter));
-                }
-            } else {
-                for (String classificationName : classificationNames) {
-                    ret.add(getClassification(instanceVertex, classificationName));
-                }
-            }
-        }
-
-        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
-            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
-        }
-
-        return ret;
-    }
-
-    private List<AtlasClassification> getPropagatedClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
-        List<AtlasClassification> ret                 = new ArrayList<>();
-        List<String>              classificationNames = getPropagatedTraitNames(instanceVertex);
-
-        if (CollectionUtils.isNotEmpty(classificationNames)) {
-            if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                if (classificationNames.contains(classificationNameFilter)) {
-                    ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationNameFilter));
-                }
-            } else {
-                for (String classificationName : new HashSet<>(classificationNames)) {
-                    ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationName));
-                }
-            }
-        }
-
-        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
-            throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
-        }
-
-        return ret;
-    }
-
-    private List<AtlasClassification> getAllPropagatedClassifications(AtlasVertex vertex, String classificationName) throws AtlasBaseException {
-        List<AtlasClassification> ret       = new ArrayList<>();
-        String                    edgeLabel = getPropagatedEdgeLabel(classificationName);
-        Iterable<AtlasEdge>       edges     = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
+    public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException {
+        List<AtlasClassification> ret   = new ArrayList<>();
+        Iterable                  edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
 
         if (edges != null) {
-            for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
+
+            while (iterator.hasNext()) {
                 AtlasEdge edge = iterator.next();
 
                 if (edge != null) {
-                    AtlasClassification classification = toAtlasClassification(edge.getInVertex());
-
-                    ret.add(classification);
+                    ret.add(toAtlasClassification(edge.getInVertex()));
                 }
             }
         }
@@ -567,42 +462,21 @@
         return ret;
     }
 
-    protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex instanceVertex) {
-        List<AtlasVertex> ret                              = new ArrayList<>();
-        List<AtlasVertex> classificationVertices           = getPropagationEnabledClassificationVertices(instanceVertex, false);
-        List<AtlasVertex> propagatedClassificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, true);
+    protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
+        List<AtlasVertex> ret   = new ArrayList<>();
+        Iterable          edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
 
-        if (CollectionUtils.isNotEmpty(classificationVertices)) {
-            ret.addAll(classificationVertices);
-        }
+        if (edges != null) {
+            Iterator<AtlasEdge> iterator = edges.iterator();
 
-        if (CollectionUtils.isNotEmpty(propagatedClassificationVertices)) {
-            ret.addAll(propagatedClassificationVertices);
-        }
+            while (iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
 
-        return ret;
-    }
+                if (edge != null) {
+                    AtlasVertex classificationVertex = edge.getInVertex();
 
-    private List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex vertex, boolean propagated) {
-        List<AtlasVertex> ret                 = new ArrayList<>();
-        List<String>      classificationNames = (propagated) ? getPropagatedTraitNames(vertex) : getTraitNames(vertex);
-
-        if (CollectionUtils.isNotEmpty(classificationNames)) {
-            for (String classificationName : classificationNames) {
-                String              traitLabel   = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
-                Iterable<AtlasEdge> edges        = vertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
-
-                if (edges != null) {
-                    for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
-                        AtlasEdge edge = iterator.next();
-
-                        if (edge != null) {
-                            AtlasVertex classificationVertex = edge.getInVertex();
-
-                            if (isPropagationEnabled(classificationVertex)) {
-                                ret.add(classificationVertex);
-                            }
-                        }
+                    if (isPropagationEnabled(classificationVertex)) {
+                        ret.add(classificationVertex);
                     }
                 }
             }
@@ -611,32 +485,10 @@
         return ret;
     }
 
-    public AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException {
-        AtlasClassification ret = getClassification(instanceVertex, classificationName, false);
-
-        // if no classification with the given name was directly associated, look for a propagated classification
-        if (ret == null) {
-            ret = getClassification(instanceVertex, classificationName, true);
-        }
-
-        return ret;
-    }
-
-    private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName, boolean propagated) throws AtlasBaseException {
-        String              traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
-        Iterable<AtlasEdge> edges      = instanceVertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
-        AtlasEdge           edge       = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null;
-        AtlasClassification ret        = edge != null ? toAtlasClassification(edge.getInVertex()) : null;
-
-        return ret;
-    }
-
     private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException {
-        final List<AtlasClassification> classifications           = getClassifications(entityVertex, null);
-        final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(entityVertex, null);
+        final List<AtlasClassification> classifications = getAllClassifications(entityVertex);
 
         entity.setClassifications(classifications);
-        entity.addClassifications(propagatedClassifications);
     }
 
     private Object mapVertexToAttribute(AtlasVertex entityVertex, AtlasAttribute attribute, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
@@ -1107,7 +959,6 @@
 
             for (AtlasVertex classificationVertex : classificationVertices) {
                 String                  classificationName     = getTypeName(classificationVertex);
-                String                  propagatedEdgeLabel    = getPropagatedEdgeLabel(classificationName);
                 AtlasVertex             associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
                 AtlasClassificationType classificationType     = typeRegistry.getClassificationTypeByName(classificationName);
 
@@ -1119,10 +970,10 @@
                         }
 
                         continue;
-                    } else if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
+                    } else if (edgeExists(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
-                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
                         }
 
                         continue;
@@ -1142,12 +993,12 @@
 
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]",
-                                  getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                                  getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
                     }
 
-                    graphHelper.addEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
+                    graphHelper.addClassificationEdge(impactedEntityVertex, classificationVertex, true);
 
-                    GraphHelper.addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                    addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
                 }
             }
         }
@@ -1164,19 +1015,18 @@
 
             for (AtlasVertex classificationVertex : classificationVertices) {
                 String            classificationName     = getTypeName(classificationVertex);
-                String            propagatedEdgeLabel    = getPropagatedEdgeLabel(classificationName);
                 AtlasVertex       associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
-                List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
+                List<AtlasVertex> referrals              = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
 
                 for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
                     if (referrals.contains(impactedEntityVertex)) {
                         if (LOG.isDebugEnabled()) {
                             if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
                                 LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
-                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName, getTypeName(associatedEntityVertex));
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
                             } else {
                                 LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
-                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName);
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
                             }
                         }
 
@@ -1184,14 +1034,14 @@
                     }
 
                     // remove propagated classification edge and classificationName from propagatedTraitNames vertex property
-                    if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
+                    if (edgeExists(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL)) {
                         try {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
-                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                                          getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
                             }
 
-                            AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
+                            AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL);
 
                             graphHelper.removeEdge(propagatedEdge);
 
@@ -1202,7 +1052,7 @@
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
-                                      getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
+                                      getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
                         }
                     }
                 }
diff --git a/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java b/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
index ff23436..9b4c91d 100644
--- a/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
+++ b/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
@@ -36,13 +36,13 @@
 public class GremlinQueryComposerTest {
     @Test
     public void classification() {
-        String expected = "g.V().or(has('__traitNames', within('PII')), has('__propagatedTraitNames', within('PII'))).dedup().limit(25).toList()";
+        String expected = "g.V().outE('classifiedAs').has('__name', within('PII')).outV().dedup().limit(25).toList()";
         verify("PII", expected);
     }
 
     @Test()
     public void dimension() {
-        String expected = "g.V().has('__typeName', 'Table').or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))).dedup().limit(25).toList()";
+        String expected = "g.V().has('__typeName', 'Table').outE('classifiedAs').has('__name', within('Dimension')).outV().dedup().limit(25).toList()";
         verify("Table isa Dimension", expected);
         verify("Table is Dimension", expected);
         verify("Table where Table is Dimension", expected);
@@ -295,14 +295,14 @@
     @Test
     public void keywordsInWhereClause() {
         verify("Table as t where t has name and t isa Dimension",
-                "g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension')))).dedup().limit(25).toList()");
+                "g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.outE('classifiedAs').has('__name', within('Dimension')).outV()).dedup().limit(25).toList()");
         verify("Table as t where t has name and t.name = 'sales_fact'",
                 "g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
         verify("Table as t where t is Dimension and t.name = 'sales_fact'",
-                "g.V().has('__typeName', 'Table').as('t').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
-        verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
+                "g.V().has('__typeName', 'Table').as('t').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
+        verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
         verify("Table has name and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
-        verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
+        verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
         verify("Table has name and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
     }