ATLAS-2932: DSL Refactoring for using Traversal
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphTraversal.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphTraversal.java
index 881bb1e..71e327d 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphTraversal.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphTraversal.java
@@ -54,14 +54,16 @@
 
     public abstract Map<String, Collection<V>> getAtlasVertexMap();
 
-    public abstract List<E> getAtlasEdgeList();
-
     public abstract Set<E> getAtlasEdgeSet();
 
     public abstract Map<String, E> getAtlasEdgeMap();
 
     public abstract TextPredicate textPredicate();
 
+    public abstract AtlasGraphTraversal textRegEx(String key, String value);
+
+    public abstract AtlasGraphTraversal textContainsRegEx(String value, String removeRedundantQuotes);
+
     public interface TextPredicate {
 
         /**
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphTraversal.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphTraversal.java
index c33c4f4..946610c 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphTraversal.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphTraversal.java
@@ -25,6 +25,8 @@
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.janusgraph.core.attribute.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +39,7 @@
 import java.util.function.BiPredicate;
 
 public class AtlasJanusGraphTraversal extends AtlasGraphTraversal<AtlasJanusVertex, AtlasJanusEdge> {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphTraversal.class);
 
     private List resultList;
     private Set  resultSet;
@@ -103,56 +106,30 @@
 
     @Override
     public Map<String, Collection<AtlasJanusVertex>> getAtlasVertexMap() {
-        List                                      list = getResultList();
-        Map<String, Collection<AtlasJanusVertex>> ret;
-
-        if (CollectionUtils.isNotEmpty(list)) {
-            ret = new HashMap<>(list.size());
-            if (list.size() == 1 && list.get(0) instanceof Map) {
-                Map aMap = (Map) list.get(0);
-                for (Object key : aMap.keySet()) {
-
-                    if (!(key instanceof String)) {
-                        continue;
-                    }
-
-                    Object value = aMap.get(key);
-                    if (value instanceof List) {
-                        Collection<AtlasJanusVertex> values = new ArrayList<>();
-                        for (Object o : (List) value) {
-                            if (o instanceof Vertex) {
-                                values.add(GraphDbObjectFactory.createVertex((AtlasJanusGraph) atlasGraph, (Vertex) o));
-                            }
-                        }
-                        ret.put((String) key, values);
-                    }
-                }
-            }
-        } else {
-            ret = Collections.emptyMap();
+        List list = getResultList();
+        if (CollectionUtils.isEmpty(list) || !(list.get(0) instanceof Map)) {
+            return Collections.emptyMap();
         }
 
-        return ret;
-    }
+        Map<String, Collection<AtlasJanusVertex>> ret = new HashMap<>();
+        Map map = (Map) list.get(0);
+        for (Object key : map.keySet()) {
+            if (!(key instanceof String)) {
+                continue;
+            }
 
-    @Override
-    public List<AtlasJanusEdge> getAtlasEdgeList() {
-        List                 list = getResultList();
-        List<AtlasJanusEdge> ret;
-
-        if (CollectionUtils.isNotEmpty(list)) {
-            if (list.size() == 1 && list.get(0) instanceof Map) {
-                ret = Collections.emptyList();
-            } else {
-                ret = new ArrayList<>(list.size());
-                for (Object o : list) {
-                    if (o instanceof Edge) {
-                        ret.add(GraphDbObjectFactory.createEdge((AtlasJanusGraph) atlasGraph, (Edge) o));
+            Object value = map.get(key);
+            if (value instanceof List) {
+                Collection<AtlasJanusVertex> values = new ArrayList<>();
+                for (Object o : (List) value) {
+                    if (o instanceof Vertex) {
+                        values.add(GraphDbObjectFactory.createVertex((AtlasJanusGraph) atlasGraph, (Vertex) o));
+                    } else {
+                        LOG.warn("{} is not a vertex.", o.getClass().getSimpleName());
                     }
                 }
+                ret.put((String) key, values);
             }
-        } else {
-            ret = Collections.emptyList();
         }
 
         return ret;
@@ -187,6 +164,15 @@
         return new JanusGraphPredicate();
     }
 
+    @Override
+    public AtlasGraphTraversal textRegEx(String key, String value) {
+        return (AtlasGraphTraversal) this.has(key, Text.textRegex(value));
+    }
+
+    @Override
+    public AtlasGraphTraversal textContainsRegEx(String key, String value) {
+        return (AtlasGraphTraversal) this.has(key, Text.textContainsRegex(value));
+    }
 
     public static class JanusGraphPredicate implements TextPredicate {
         @Override
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index ea9f26d..396aad0 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -74,7 +74,8 @@
 
     HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
     STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
-    REBUILD_INDEX("atlas.rebuild.index", false);
+    REBUILD_INDEX("atlas.rebuild.index", false),
+    DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 01a6c30..4ae2d12 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -19,6 +19,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.SortOrder;
@@ -26,34 +27,46 @@
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasSearchResultScrubRequest;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.*;
+import org.apache.atlas.model.discovery.AtlasAggregationEntry;
+import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
-import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
+import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
+import org.apache.atlas.model.discovery.QuickSearchParameters;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.profile.AtlasUserSavedSearch;
-import org.apache.atlas.query.AtlasDSL;
-import org.apache.atlas.query.GremlinQuery;
 import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.query.executors.DSLQueryExecutor;
+import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
+import org.apache.atlas.query.executors.TraversalBasedExecutor;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.userprofile.UserProfileService;
-import org.apache.atlas.type.*;
+import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
 import org.apache.atlas.util.SearchPredicateUtil;
 import org.apache.atlas.util.SearchTracker;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections.Predicate;
 import org.apache.commons.collections4.IteratorUtils;
 import org.apache.commons.lang.StringUtils;
@@ -67,7 +80,15 @@
 import javax.inject.Inject;
 import javax.script.ScriptEngine;
 import javax.script.ScriptException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.atlas.AtlasErrorCode.*;
 import static org.apache.atlas.SortOrder.ASCENDING;
@@ -82,7 +103,6 @@
 public class EntityDiscoveryService implements AtlasDiscoveryService {
     private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
     private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
-    private static final String SORT_ATTRIBUTE_NAME = "sortAttributeName";
 
     private final AtlasGraph                      graph;
     private final EntityGraphRetriever            entityRetriever;
@@ -96,6 +116,7 @@
     private final String                          indexSearchPrefix;
     private final UserProfileService              userProfileService;
     private final SuggestionsProvider             suggestionsProvider;
+    private final DSLQueryExecutor                dslQueryExecutor;
 
     @Inject
     EntityDiscoveryService(AtlasTypeRegistry typeRegistry,
@@ -115,58 +136,16 @@
         this.indexSearchPrefix        = AtlasGraphUtilsV2.getIndexSearchPrefix();
         this.userProfileService       = userProfileService;
         this.suggestionsProvider      = new SuggestionsProviderImpl(graph, typeRegistry);
+        this.dslQueryExecutor         = AtlasConfiguration.DSL_EXECUTOR_TRAVERSAL.getBoolean()
+                                            ? new TraversalBasedExecutor(typeRegistry, graph, entityRetriever)
+                                            : new ScriptEngineBasedExecutor(typeRegistry, graph, entityRetriever);
+        LOG.info("DSL Executor: {}", this.dslQueryExecutor.getClass().getSimpleName());
     }
 
     @Override
     @GraphTransaction
     public AtlasSearchResult searchUsingDslQuery(String dslQuery, int limit, int offset) throws AtlasBaseException {
-        AtlasSearchResult ret          = new AtlasSearchResult(dslQuery, AtlasQueryType.DSL);
-        GremlinQuery      gremlinQuery = toGremlinQuery(dslQuery, limit, offset);
-        String            queryStr     = gremlinQuery.queryStr();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Executing DSL: query={}, gremlinQuery={}", dslQuery, queryStr);
-        }
-
-        Object result = graph.executeGremlinScript(queryStr, false);
-
-        if (result instanceof List && CollectionUtils.isNotEmpty((List)result)) {
-            List   queryResult  = (List) result;
-            Object firstElement = queryResult.get(0);
-
-            if (firstElement instanceof AtlasVertex) {
-                for (Object element : queryResult) {
-                    if (element instanceof AtlasVertex) {
-                        ret.addEntity(entityRetriever.toAtlasEntityHeaderWithClassifications((AtlasVertex)element));
-                    } else {
-                        LOG.warn("searchUsingDslQuery({}): expected an AtlasVertex; found unexpected entry in result {}", dslQuery, element);
-                    }
-                }
-            } else if (gremlinQuery.hasSelectList()) {
-                ret.setAttributes(toAttributesResult(queryResult, gremlinQuery));
-            } else if (firstElement instanceof Map) {
-                for (Object element : queryResult) {
-                    if (element instanceof Map) {
-                        Map map = (Map)element;
-
-                        for (Object key : map.keySet()) {
-                            Object value = map.get(key);
-
-                            if (value instanceof List && CollectionUtils.isNotEmpty((List)value)) {
-                                for (Object o : (List) value) {
-                                    Object entry = o;
-                                    if (entry instanceof AtlasVertex) {
-                                        ret.addEntity(entityRetriever.toAtlasEntityHeader((AtlasVertex) entry));
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            } else {
-                LOG.warn("searchUsingDslQuery({}/{}): found unexpected entry in result {}", dslQuery, dslQuery, gremlinQuery.queryStr());
-            }
-        }
+        AtlasSearchResult ret = dslQueryExecutor.execute(dslQuery, limit, offset);
 
         scrubSearchResults(ret);
 
@@ -764,86 +743,11 @@
         return ret;
     }
 
-    private GremlinQuery toGremlinQuery(String query, int limit, int offset) throws AtlasBaseException {
-        QueryParams                 params       = QueryParams.getNormalizedParams(limit, offset);
-        GremlinQuery                gremlinQuery = new AtlasDSL.Translator(query, typeRegistry, params.offset(), params.limit()).translate();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Translated Gremlin Query: {}", gremlinQuery.queryStr());
-        }
-
-        return gremlinQuery;
-    }
-
     private AtlasIndexQuery toAtlasIndexQuery(String fullTextQuery) {
         String graphQuery = String.format(indexSearchPrefix + "\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, fullTextQuery);
         return graph.indexQuery(Constants.FULLTEXT_INDEX, graphQuery);
     }
 
-    private AttributeSearchResult toAttributesResult(List results, GremlinQuery query) {
-        AttributeSearchResult ret = new AttributeSearchResult();
-        List<String> names = (List<String>) results.get(0);
-        List<List<Object>> values = extractValues(results.subList(1, results.size()));
-
-        ret.setName(names);
-        ret.setValues(values);
-        return ret;
-    }
-
-    private List<String> extractNames(List results) {
-        List<String> names = new ArrayList<>();
-        for (Object obj : results) {
-            if (obj instanceof Map) {
-                Map map = (Map) obj;
-                if (MapUtils.isNotEmpty(map)) {
-                    for (Object key : map.keySet()) {
-                        names.add((String) key);
-                    }
-                    return names;
-                }
-            } else if (obj instanceof List) {
-                List list = (List) obj;
-                if (CollectionUtils.isNotEmpty(list)) {
-                    for(Object o : list) {
-                        names.add((String) o);
-                    }
-                }
-            }
-        }
-
-        return names;
-    }
-
-    private List<List<Object>> extractValues(List results) {
-        List<List<Object>> values = new ArrayList<>();
-
-        for (Object obj : results) {
-            if (obj instanceof Map) {
-                Map map = (Map) obj;
-                List<Object> list = new ArrayList<>();
-                if (MapUtils.isNotEmpty(map)) {
-                    for (Object key : map.keySet()) {
-                       Object vals = map.get(key);
-                       if(vals instanceof List) {
-                           List l = (List) vals;
-                           list.addAll(l);
-                       }
-
-                    }
-
-                    values.add(list);
-                }
-            } else if (obj instanceof List) {
-                List list = (List) obj;
-                if (CollectionUtils.isNotEmpty(list)) {
-                    values.add(list);
-                }
-            }
-        }
-
-        return values;
-    }
-
     private boolean skipDeletedEntities(boolean excludeDeletedEntities, AtlasVertex<?, ?> vertex) {
         return excludeDeletedEntities && GraphHelper.getStatus(vertex) == DELETED;
     }
diff --git a/repository/src/main/java/org/apache/atlas/query/AtlasDSL.java b/repository/src/main/java/org/apache/atlas/query/AtlasDSL.java
index b8a744b..777d194 100644
--- a/repository/src/main/java/org/apache/atlas/query/AtlasDSL.java
+++ b/repository/src/main/java/org/apache/atlas/query/AtlasDSL.java
@@ -113,7 +113,7 @@
         private final String                      query;
 
         public Translator(String query, AtlasTypeRegistry typeRegistry, int offset, int limit) throws AtlasBaseException {
-            this.query = query;
+            this.query        = query;
             this.queryContext = Parser.parse(query);
             this.typeRegistry = typeRegistry;
             this.offset       = offset;
@@ -121,41 +121,52 @@
         }
 
         public GremlinQuery translate() throws AtlasBaseException {
-            QueryMetadata queryMetadata = new QueryMetadata(queryContext);
-            GremlinQueryComposer gremlinQueryComposer = new GremlinQueryComposer(typeRegistry, queryMetadata, limit, offset);
-            DSLVisitor dslVisitor = new DSLVisitor(gremlinQueryComposer);
+            QueryMetadata queryMetadata        = new QueryMetadata(queryContext);
+            GremlinQueryComposer queryComposer = new GremlinQueryComposer(typeRegistry, queryMetadata, limit, offset);
 
-            queryContext.accept(dslVisitor);
+            queryContext.accept(new DSLVisitor(queryComposer));
 
-            processErrorList(gremlinQueryComposer);
+            processErrorList(queryComposer);
 
-            String gremlinQuery = gremlinQueryComposer.get();
-
-            return new GremlinQuery(gremlinQuery, queryMetadata.hasSelect());
+            return new GremlinQuery(queryComposer.get(), queryMetadata, queryComposer.clauses(), queryComposer.getSelectComposer());
         }
 
         private void processErrorList(GremlinQueryComposer gremlinQueryComposer) throws AtlasBaseException {
-            final String errorMessage;
-
-            if (CollectionUtils.isNotEmpty(gremlinQueryComposer.getErrorList())) {
-                errorMessage = StringUtils.join(gremlinQueryComposer.getErrorList(), ", ");
-                LOG.warn("DSL Errors: {}", errorMessage);
-                throw new AtlasBaseException(AtlasErrorCode.INVALID_DSL_QUERY, this.query, errorMessage);
+            if (CollectionUtils.isEmpty(gremlinQueryComposer.getErrorList())) {
+                return;
             }
+
+            final String errorMessage = StringUtils.join(gremlinQueryComposer.getErrorList(), ", ");
+            LOG.warn("DSL Errors: {}", errorMessage);
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_DSL_QUERY, this.query, errorMessage);
         }
     }
 
     public static class QueryMetadata {
-        private boolean hasSelect;
-        private boolean hasGroupBy;
-        private boolean hasOrderBy;
-        private boolean hasLimitOffset;
+        private final boolean hasSelect;
+        private final boolean hasGroupBy;
+        private final boolean hasOrderBy;
+        private final boolean hasLimitOffset;
+        private final int     resolvedLimit;
+        private final int     resolvedOffset;
 
         public QueryMetadata(AtlasDSLParser.QueryContext queryContext) {
-            hasSelect  = queryContext != null && queryContext.selectClause() != null;
-            hasGroupBy = queryContext != null && queryContext.groupByExpression() != null;
-            hasOrderBy = queryContext != null && queryContext.orderByExpr() != null;
+            hasSelect      = queryContext != null && queryContext.selectClause() != null;
+            hasGroupBy     = queryContext != null && queryContext.groupByExpression() != null;
+            hasOrderBy     = queryContext != null && queryContext.orderByExpr() != null;
             hasLimitOffset = queryContext != null && queryContext.limitOffset() != null;
+
+            if (hasLimitOffset) {
+                AtlasDSLParser.LimitOffsetContext  limitOffsetContext = queryContext.limitOffset();
+                AtlasDSLParser.LimitClauseContext  limitClause        = limitOffsetContext.limitClause();
+                AtlasDSLParser.OffsetClauseContext offsetClause       = limitOffsetContext.offsetClause();
+
+                resolvedLimit = (limitClause != null) ? Integer.parseInt(limitClause.NUMBER().getText()) : 0;
+                resolvedOffset = (offsetClause != null) ? Integer.parseInt(offsetClause.NUMBER().getText()) : 0;
+            } else {
+                resolvedLimit = 0;
+                resolvedOffset = 0;
+            }
         }
 
         public boolean hasSelect() {
@@ -177,5 +188,13 @@
         public boolean needTransformation() {
             return (hasGroupBy && hasSelect && hasOrderBy) || hasSelect;
         }
+
+        public int getResolvedLimit() {
+            return resolvedLimit;
+        }
+
+        public int getResolvedOffset() {
+            return resolvedOffset;
+        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/query/DSLVisitor.java b/repository/src/main/java/org/apache/atlas/query/DSLVisitor.java
index ca0b4be..ce95b76 100644
--- a/repository/src/main/java/org/apache/atlas/query/DSLVisitor.java
+++ b/repository/src/main/java/org/apache/atlas/query/DSLVisitor.java
@@ -18,14 +18,36 @@
 
 package org.apache.atlas.query;
 
-import org.apache.atlas.query.antlr4.AtlasDSLParser.*;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.AliasExprContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.AtomEContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.CompEContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.ComparisonClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.CountClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.ExprContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.ExprRightContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.FromExpressionContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.FromSrcContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.GroupByExpressionContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.HasClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.HasTermClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.IdentifierContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.IsClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.LimitOffsetContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.MaxClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.MinClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.OrderByExprContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.SelectExprContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.SelectExpressionContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.SingleQrySrcContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.SumClauseContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.ValueArrayContext;
+import org.apache.atlas.query.antlr4.AtlasDSLParser.WhereClauseContext;
 import org.apache.atlas.query.antlr4.AtlasDSLParserBaseVisitor;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -65,8 +87,10 @@
         if (!(ctx.getParent() instanceof GroupByExpressionContext)) {
             String[] items  = new String[ctx.selectExpression().size()];
             String[] labels = new String[ctx.selectExpression().size()];
-
-            SelectClauseComposer selectClauseComposer = new SelectClauseComposer();
+            int      countIdx = -1;
+            int      sumIdx   = -1;
+            int      minIdx   = -1;
+            int      maxIdx   = -1;
 
             for (int i = 0; i < ctx.selectExpression().size(); i++) {
                 SelectExpressionContext selectExpression = ctx.selectExpression(i);
@@ -80,24 +104,23 @@
 
                 if (Objects.nonNull(countClause)) {
                     items[i] = "count";
-                    selectClauseComposer.setCountIdx(i);
+                    countIdx = i;
                 } else if (Objects.nonNull(sumClause)) {
                     items[i] = sumClause.expr().getText();
-                    selectClauseComposer.setSumIdx(i);
+                    sumIdx   = i;
                 } else if (Objects.nonNull(minClause)) {
                     items[i] = minClause.expr().getText();
-                    selectClauseComposer.setMinIdx(i);
+                    minIdx   = i;
                 } else if (Objects.nonNull(maxClause)) {
                     items[i] = maxClause.expr().getText();
-                    selectClauseComposer.setMaxIdx(i);
+                    maxIdx   = i;
                 } else {
                     items[i] = selectExpression.expr().getText();
                 }
             }
 
-            selectClauseComposer.setItems(items);
-            selectClauseComposer.setAttributes(items);
-            selectClauseComposer.setLabels(labels);
+            SelectClauseComposer selectClauseComposer = new SelectClauseComposer(labels, items, items, countIdx, sumIdx, minIdx, maxIdx);
+
             gremlinQueryComposer.addSelect(selectClauseComposer);
         }
         return super.visitSelectExpr(ctx);
@@ -183,24 +206,15 @@
     }
 
     private void visitHasClause(GremlinQueryComposer gqc, HasClauseContext ctx) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("=> DSLVisitor.visitHasClause({})", ctx);
-        }
-
         gqc.addFromProperty(ctx.arithE().getText(), ctx.identifier().getText());
         super.visitHasClause(ctx);
     }
 
     private void visitHasTermClause(GremlinQueryComposer gqc, HasTermClauseContext ctx) {
-        if (ctx.expr() != null) {
-            processExpr(ctx.expr(), gqc);
-        } else if (ctx.identifier() != null) {
-            gqc.addHasTerm(ctx.arithE().getText(), ctx.identifier().getText());
-        }
+        gqc.addHasTerm(ctx.arithE().getText(), ctx.identifier().getText());
         super.visitHasTermClause(ctx);
     }
 
-
     private void inferFromClause(SingleQrySrcContext ctx) {
         if (ctx.fromExpression() != null) {
             return;
@@ -229,32 +243,20 @@
         if (CollectionUtils.isNotEmpty(expr.exprRight())) {
             processExprRight(expr, gremlinQueryComposer);
         } else {
-            GremlinQueryComposer nestedProcessor = gremlinQueryComposer.createNestedProcessor();
-            processExpr(expr.compE(), nestedProcessor);
-
-            GremlinClauseList gcl = nestedProcessor.getQueryClauses();
-            if (gcl.size() > 1) {
-                if (nestedProcessor.isPrimitive()) {
-                    nestedProcessor.remove(GremlinClause.NESTED_START);
-                    GremlinQueryComposer.GremlinClauseValue gv = gcl.get(0);
-                    gremlinQueryComposer.add(gv);
-                } else {
-                    gremlinQueryComposer.addAndClauses(Collections.singletonList(nestedProcessor.get()));
-                }
-            }
+            processExpr(expr.compE(), gremlinQueryComposer);
         }
     }
 
     private void processExprRight(final ExprContext expr, GremlinQueryComposer gremlinQueryComposer) {
         GremlinQueryComposer nestedProcessor = gremlinQueryComposer.createNestedProcessor();
 
-        List<String> nestedQueries = new ArrayList<>();
+        List<GremlinQueryComposer> nestedQueries = new ArrayList<>();
         String       prev          = null;
 
         // Process first expression then proceed with the others
         // expr -> compE exprRight*
         processExpr(expr.compE(), nestedProcessor);
-        nestedQueries.add(nestedProcessor.get());
+        nestedQueries.add(nestedProcessor);
 
         // Record all processed attributes
         gremlinQueryComposer.addProcessedAttributes(nestedProcessor.getAttributesProcessed());
@@ -270,7 +272,7 @@
                     GremlinQueryComposer orClause = nestedProcessor.createNestedProcessor();
                     orClause.addOrClauses(nestedQueries);
                     nestedQueries.clear();
-                    nestedQueries.add(orClause.get());
+                    nestedQueries.add(orClause);
 
                     // Record all processed attributes
                     gremlinQueryComposer.addProcessedAttributes(orClause.getAttributesProcessed());
@@ -285,7 +287,7 @@
                     GremlinQueryComposer andClause = nestedProcessor.createNestedProcessor();
                     andClause.addAndClauses(nestedQueries);
                     nestedQueries.clear();
-                    nestedQueries.add(andClause.get());
+                    nestedQueries.add(andClause);
 
                     // Record all processed attributes
                     gremlinQueryComposer.addProcessedAttributes(andClause.getAttributesProcessed());
@@ -293,7 +295,7 @@
                 prev = OR;
             }
             processExpr(exprRight.compE(), nestedProcessor);
-            nestedQueries.add(nestedProcessor.get());
+            nestedQueries.add(nestedProcessor);
 
             // Record all processed attributes
             gremlinQueryComposer.addProcessedAttributes(nestedProcessor.getAttributesProcessed());
diff --git a/repository/src/main/java/org/apache/atlas/query/GremlinClause.java b/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
index 9704b0f..7a98ddd 100644
--- a/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
+++ b/repository/src/main/java/org/apache/atlas/query/GremlinClause.java
@@ -18,7 +18,7 @@
 
 package org.apache.atlas.query;
 
-enum GremlinClause {
+public enum GremlinClause {
     AS("as('%s')"),
     DEDUP("dedup()"),
     G("g"),
@@ -45,8 +45,6 @@
     TO_LIST("toList()"),
     STRING_CONTAINS("has('%s', org.janusgraph.core.attribute.Text.textRegex(%s))"),
     TEXT_CONTAINS("has('%s', org.janusgraph.core.attribute.Text.textContainsRegex(%s))"),
-    TEXT_PREFIX("has('%s', org.janusgraph.core.attribute.Text.textContainsPrefix(%s))"),
-    TEXT_SUFFIX("has('%s', org.janusgraph.core.attribute.Text.textContainsRegex(\".*\" + %s))"),
     TRAIT("outE('classifiedAs').has('__name', within('%s')).outV()"),
     ANY_TRAIT("or(has('__traitNames'), has('__propagatedTraitNames'))"),
     NO_TRAIT("and(hasNot('__traitNames'), hasNot('__propagatedTraitNames'))"),
@@ -70,6 +68,7 @@
     // idx of the tuple field to be sorted on
     INLINE_TUPLE_SORT_ASC(".sort{a,b -> a[%s] <=> b[%s]}"),
     INLINE_TUPLE_SORT_DESC(".sort{a,b -> b[%s] <=> a[%s]}"),
+    TERM("and(__.in('r:AtlasGlossarySemanticAssignment').has('AtlasGlossaryTerm.%s', eq('%s')).dedup())"),
     V("V()"),
     VALUE_MAP("valueMap(%s)");
 
diff --git a/repository/src/main/java/org/apache/atlas/query/GremlinClauseList.java b/repository/src/main/java/org/apache/atlas/query/GremlinClauseList.java
index 9f30e4d..8d491ac 100644
--- a/repository/src/main/java/org/apache/atlas/query/GremlinClauseList.java
+++ b/repository/src/main/java/org/apache/atlas/query/GremlinClauseList.java
@@ -17,20 +17,23 @@
  */
 package org.apache.atlas.query;
 
-import org.apache.atlas.type.AtlasEntityType;
-
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
-class GremlinClauseList {
+public class GremlinClauseList {
     private final List<GremlinQueryComposer.GremlinClauseValue> list;
+    private final Map<Integer, List<GremlinClauseList>>         subClauses;
 
     GremlinClauseList() {
-        this.list = new LinkedList<>();
+        this.list       = new LinkedList<>();
+        this.subClauses = new LinkedHashMap<>();
     }
 
-    public void add(GremlinQueryComposer.GremlinClauseValue g) {
-        list.add(g);
+    public void add(GremlinQueryComposer.GremlinClauseValue clauseValue) {
+        list.add(clauseValue);
     }
 
     public void add(int idx, GremlinQueryComposer.GremlinClauseValue g) {
@@ -38,11 +41,11 @@
     }
 
     public void add(GremlinClause clause, String... args) {
-        list.add(new GremlinQueryComposer.GremlinClauseValue(clause, clause.get(args)));
+        list.add(new GremlinQueryComposer.GremlinClauseValue(clause, args));
     }
 
     public void add(int i, GremlinClause clause, String... args) {
-        list.add(i, new GremlinQueryComposer.GremlinClauseValue(clause, clause.get(args)));
+        list.add(i, new GremlinQueryComposer.GremlinClauseValue(clause, args));
     }
 
     public GremlinQueryComposer.GremlinClauseValue getAt(int i) {
@@ -50,7 +53,7 @@
     }
 
     public String getValue(int i) {
-        return list.get(i).getValue();
+        return list.get(i).getClauseWithValue();
     }
 
     public GremlinQueryComposer.GremlinClauseValue get(int i) {
@@ -90,4 +93,29 @@
         list.remove(index);
         return gcv;
     }
+
+    public List<GremlinQueryComposer.GremlinClauseValue> getList() {
+        return list;
+    }
+
+    public void addSubClauses(int index, GremlinClauseList queryClauses) {
+        if (!this.subClauses.containsKey(index)) {
+            this.subClauses.put(index, new ArrayList<>());
+        }
+
+        this.subClauses.get(index).add(queryClauses);
+    }
+
+    public boolean hasSubClause(int i) {
+        return subClauses.containsKey(i);
+    }
+
+    public List<GremlinClauseList> getSubClauses(int i) {
+        return subClauses.get(i);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("list.size: %d, subClauses.size: %d", this.size(), this.subClauses.size());
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/query/GremlinQuery.java b/repository/src/main/java/org/apache/atlas/query/GremlinQuery.java
index 531f7ae..e4c217c 100644
--- a/repository/src/main/java/org/apache/atlas/query/GremlinQuery.java
+++ b/repository/src/main/java/org/apache/atlas/query/GremlinQuery.java
@@ -17,20 +17,54 @@
  */
 package org.apache.atlas.query;
 
-public class GremlinQuery {
-    private final String queryStr;
-    private final boolean hasSelect;
+import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
 
-    public GremlinQuery(String text, boolean hasSelect) {
-        this.queryStr = text;
-        this.hasSelect = hasSelect;
+import java.util.Objects;
+
+public class GremlinQuery {
+    private final String                 queryStr;
+    private final AtlasDSL.QueryMetadata queryMetadata;
+    private final GremlinClauseList      clauses;
+    private final SelectClauseComposer   selectComposer;
+
+    private AtlasGraphTraversal traversal;
+
+    public GremlinQuery(String gremlinQuery, AtlasDSL.QueryMetadata queryMetadata, GremlinClauseList clauses, SelectClauseComposer selectComposer) {
+        this.queryStr = gremlinQuery;
+        this.queryMetadata = queryMetadata;
+        this.clauses = clauses;
+        this.selectComposer = selectComposer;
     }
 
     public String queryStr() {
         return queryStr;
     }
 
-    public boolean hasSelectList() {
-        return hasSelect;
+    public GremlinClauseList getClauses() {
+        return clauses;
     }
-}
\ No newline at end of file
+
+    public SelectClauseComposer getSelectComposer() {
+        return selectComposer;
+    }
+
+    public boolean hasValidSelectClause() {
+        return Objects.nonNull(selectComposer) && !selectComposer.getIsSelectNoop();
+    }
+
+    public AtlasDSL.QueryMetadata getQueryMetadata() {
+        return queryMetadata;
+    }
+
+    public void setResult(AtlasGraphTraversal traversal) {
+        this.traversal = traversal;
+    }
+
+    public AtlasGraphTraversal getTraversal() {
+        return traversal;
+    }
+
+    public boolean hasSelectList() {
+        return queryMetadata.hasSelect();
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/query/GremlinQueryComposer.java b/repository/src/main/java/org/apache/atlas/query/GremlinQueryComposer.java
index 87c8bd2..c36f5b0 100644
--- a/repository/src/main/java/org/apache/atlas/query/GremlinQueryComposer.java
+++ b/repository/src/main/java/org/apache/atlas/query/GremlinQueryComposer.java
@@ -19,25 +19,24 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.discovery.TermSearchProcessor;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.glossary.GlossaryUtils;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasBuiltInTypes;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -51,68 +50,66 @@
 
 import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS;
 import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
 
 public class GremlinQueryComposer {
-    private static final Logger LOG                 = LoggerFactory.getLogger(GremlinQueryComposer.class);
-    private static final String ISO8601_FORMAT      = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    private static final String ISO8601_DATE_FORMAT = "yyyy-MM-dd";
+    private static final String ISO8601_FORMAT              = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    private static final String ISO8601_DATE_FORMAT         = "yyyy-MM-dd";
     private static final String REGEX_ALPHA_NUMERIC_PATTERN = "[a-zA-Z0-9]+";
+    private static final String EMPTY_STRING                = "";
+    private static final int    DEFAULT_QUERY_RESULT_LIMIT  = 25;
+    private static final int    DEFAULT_QUERY_RESULT_OFFSET = 0;
 
     private static final ThreadLocal<DateFormat[]> DSL_DATE_FORMAT = ThreadLocal.withInitial(() -> {
-        final String formats[] = {ISO8601_FORMAT, ISO8601_DATE_FORMAT};
+        final String formats[] = { ISO8601_FORMAT, ISO8601_DATE_FORMAT };
+
         DateFormat[] dfs       = new DateFormat[formats.length];
+
         for (int i = 0; i < formats.length; i++) {
             dfs[i] = new SimpleDateFormat(formats[i]);
+
             dfs[i].setTimeZone(TimeZone.getTimeZone("UTC"));
         }
+
         return dfs;
     });
 
-    private final String            EMPTY_STRING                = "";
-    private final int               DEFAULT_QUERY_RESULT_LIMIT  = 25;
-    private final int               DEFAULT_QUERY_RESULT_OFFSET = 0;
-    private final GremlinClauseList queryClauses                = new GremlinClauseList();
-    private final Set<String>       attributesProcessed         = new HashSet<>();
+    private final GremlinClauseList      queryClauses                = new GremlinClauseList();
+    private final Set<String>            attributesProcessed         = new HashSet<>();
     private final Lookup                 lookup;
-    private final boolean                isNestedQuery;
     private final AtlasDSL.QueryMetadata queryMetadata;
-    private int providedLimit  = DEFAULT_QUERY_RESULT_LIMIT;
-    private int providedOffset = DEFAULT_QUERY_RESULT_OFFSET;
-    private Context context;
-    private boolean isPrimitive = true;
 
-    public GremlinQueryComposer(Lookup registryLookup, final AtlasDSL.QueryMetadata qmd, boolean isNestedQuery) {
-        this.isNestedQuery = isNestedQuery;
-        this.lookup = registryLookup;
-        this.queryMetadata = qmd;
+    private final int                    providedLimit;
+    private final int                    providedOffset;
+    private final Context                context;
+    private final GremlinQueryComposer   parent;
+
+    public GremlinQueryComposer(Lookup registryLookup, Context context, AtlasDSL.QueryMetadata qmd, int limit, int offset, GremlinQueryComposer parent) {
+        this.lookup         = registryLookup;
+        this.context        = context;
+        this.queryMetadata  = qmd;
+        this.providedLimit  = limit;
+        this.providedOffset = offset;
+        this.parent         = parent;
 
         init();
     }
 
-    public GremlinQueryComposer(AtlasTypeRegistry typeRegistry, final AtlasDSL.QueryMetadata qmd, int limit, int offset) {
-        this(new RegistryBasedLookup(typeRegistry), qmd, false);
-        this.context = new Context(lookup);
+    public GremlinQueryComposer(Lookup registryLookup, AtlasDSL.QueryMetadata qmd, int limit, int offset) {
+        this(registryLookup, new Context(registryLookup), qmd, limit, offset, null);
+    }
 
-        providedLimit = limit;
-        providedOffset = offset < 0 ? DEFAULT_QUERY_RESULT_OFFSET : offset;
+    public GremlinQueryComposer(AtlasTypeRegistry typeRegistry, AtlasDSL.QueryMetadata qmd, int limit, int offset) {
+        this(new RegistryBasedLookup(typeRegistry), qmd, limit, offset);
     }
 
     @VisibleForTesting
     GremlinQueryComposer(Lookup lookup, Context context, final AtlasDSL.QueryMetadata qmd) {
-        this.isNestedQuery = false;
-        this.lookup = lookup;
-        this.context = context;
-        this.queryMetadata = qmd;
-
-        init();
+        this(lookup, context, qmd, DEFAULT_QUERY_RESULT_LIMIT, DEFAULT_QUERY_RESULT_OFFSET, null);
     }
 
     public void addFrom(String typeName) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addFrom(typeName={})", typeName);
-        }
-
         IdentifierHelper.Info typeInfo = createInfo(typeName);
 
         if (context.shouldRegister(typeInfo.get())) {
@@ -144,11 +141,7 @@
     }
 
     public void addFromProperty(String typeName, String attribute) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addFromProperty(typeName={}, attribute={})", typeName, attribute);
-        }
-
-        if (!isNestedQuery) {
+        if (!isNestedQuery()) {
             addFrom(typeName);
         }
 
@@ -156,7 +149,7 @@
     }
 
     public void addIsA(String typeName, String traitName) {
-        if (!isNestedQuery) {
+        if (!isNestedQuery()) {
             addFrom(typeName);
         }
 
@@ -172,36 +165,21 @@
     }
 
     public void addHasTerm(String typeName, String termName) {
-        if (!isNestedQuery) {
-            addFrom(typeName);
-        }
-
-        String qualifiedAttributeName      = GlossaryUtils.QUALIFIED_NAME_ATTR;
-        String qualifiedAttributeSeperator = String.valueOf(GlossaryUtils.invalidNameChars[0]);
-        String name                        = GlossaryUtils.NAME;
-        String operator                    = SearchParameters.Operator.EQ.toString();
         String attributeToSearch;
-        String[] terms                     = termName.split(qualifiedAttributeSeperator);
+        String qualifiedAttributeSeperator = String.valueOf(GlossaryUtils.invalidNameChars[0]);
+        String[] terms = termName.split(qualifiedAttributeSeperator);
 
         if (terms.length > 1) {
-            attributeToSearch = TermSearchProcessor.ATLAS_GLOSSARY_TERM_ATTR_MEANINGS + GlossaryUtils.invalidNameChars[1] + qualifiedAttributeName;
+            attributeToSearch = GlossaryUtils.QUALIFIED_NAME_ATTR;;
         } else {
             termName = terms[0];
-            attributeToSearch = TermSearchProcessor.ATLAS_GLOSSARY_TERM_ATTR_MEANINGS + GlossaryUtils.invalidNameChars[1] + name;
+            attributeToSearch = GlossaryUtils.NAME;;
         }
 
-        addWhere(attributeToSearch, operator , termName);
-    }
-
-    public boolean isPrimitive(){
-        return isPrimitive;
+        add(GremlinClause.TERM, attributeToSearch, IdentifierHelper.removeQuotes(termName));
     }
 
     public void addWhere(String lhs, String operator, String rhs) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addWhere(lhs={}, operator={}, rhs={})", lhs, operator, rhs);
-        }
-
         String                currentType = context.getActiveTypeName();
 
         IdentifierHelper.Info org         = null;
@@ -211,7 +189,6 @@
             org = lhsI;
             lhsI = createInfo(lhs);
             lhsI.setTypeName(org.getTypeName());
-            isPrimitive = false;
         }
 
         if (!context.validator.isValidQualifiedName(lhsI.getQualifiedName(), lhsI.getRaw())) {
@@ -242,7 +219,8 @@
             } else if (op == SearchParameters.Operator.IN) {
                 add(GremlinClause.HAS_OPERATOR, getPropertyForClause(lhsI), "within", rhs);
             } else {
-                add(GremlinClause.HAS_OPERATOR, getPropertyForClause(lhsI), op.getSymbols()[1], rhs);
+                Object normalizedRhs = getNormalizedAttrVal(lhsI, IdentifierHelper.removeQuotes(rhs));
+                addWithNormalizedValue(GremlinClause.HAS_OPERATOR, getPropertyForClause(lhsI), op.getSymbols()[1], normalizedRhs, rhs);
             }
         }
         // record that the attribute has been processed so that the select clause doesn't add a attr presence check
@@ -250,6 +228,12 @@
 
         if (org != null && org.isReferredType()) {
             add(GremlinClause.DEDUP);
+            if (org.getEdgeDirection() != null) {
+                GremlinClause gremlinClauseForEdgeLabel = org.getEdgeDirection().equals(IN) ? GremlinClause.OUT : GremlinClause.IN;
+                add(gremlinClauseForEdgeLabel, org.getEdgeLabel());
+            } else {
+                add(GremlinClause.OUT, org.getEdgeLabel());
+            }
             context.registerActive(currentType);
         }
     }
@@ -275,7 +259,29 @@
                 }
                 break;
         }
-        add(clause, getPropertyForClause(lhsI), op.getSymbols()[1], rhs);
+        Object normalizedRhs = getNormalizedAttrVal(lhsI, IdentifierHelper.removeQuotes(rhs));
+        addWithNormalizedValue(clause, getPropertyForClause(lhsI), op.getSymbols()[1], normalizedRhs, rhs);
+    }
+
+    private Object getNormalizedAttrVal(IdentifierHelper.Info attrInfo, String attrVal) {
+        AtlasEntityType entityType = context.getActiveEntityType();
+        String          attrName   = attrInfo.getAttributeName();
+
+        if (entityType == null || StringUtils.isEmpty(attrVal)) {
+            return attrVal;
+        }
+
+        AtlasType attributeType = entityType.getAttributeType(attrName);
+        if (attributeType == null) {
+            return attrVal;
+        }
+
+        Object normalizedValue = attributeType.getNormalizedValue(attrVal);
+        if (normalizedValue != null && attributeType instanceof AtlasBuiltInTypes.AtlasDateType) {
+            return ((Date) normalizedValue).getTime();
+        }
+
+        return normalizedValue;
     }
 
     private boolean containsNumberAndLettersOnly(String rhs) {
@@ -286,11 +292,13 @@
         return rhs.replace("'", "").replace("\"", "") + context.getNumericTypeFormatter();
     }
 
-    public void addAndClauses(List<String> clauses) {
+    public void addAndClauses(List<GremlinQueryComposer> queryComposers) {
+        List<String> clauses = addToSubClause(queryComposers);
         add(GremlinClause.AND, String.join(",", clauses));
     }
 
-    public void addOrClauses(List<String> clauses) {
+    public void addOrClauses(List<GremlinQueryComposer> queryComposers) {
+        List<String> clauses = addToSubClause(queryComposers);
         add(GremlinClause.OR, String.join(",", clauses));
     }
 
@@ -313,42 +321,24 @@
     }
 
     public GremlinQueryComposer createNestedProcessor() {
-        GremlinQueryComposer qp = new GremlinQueryComposer(lookup, queryMetadata, true);
-        qp.context = this.context;
-        return qp;
+        return new GremlinQueryComposer(lookup, this.context, queryMetadata, this.providedLimit, this.providedOffset, this);
     }
 
     public void addFromAlias(String typeName, String alias) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addFromAlias(typeName={}, alias={})", typeName, alias);
-        }
-
         addFrom(typeName);
         addAsClause(alias);
         context.registerAlias(alias);
     }
 
     public void addAsClause(String alias) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addAsClause(stepName={})", alias);
-        }
-
         add(GremlinClause.AS, alias);
     }
 
     public void addGroupBy(String item) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addGroupBy(item={})", item);
-        }
-
         addGroupByClause(item);
     }
 
     public void addLimit(String limit, String offset) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addLimit(limit={}, offset={})", limit, offset);
-        }
-
         SelectClauseComposer scc = context.getSelectClauseComposer();
         if (scc == null) {
             addLimitHelper(limit, offset);
@@ -366,16 +356,11 @@
     public String get() {
         close();
 
-        boolean mustTransform = !isNestedQuery && queryMetadata.needTransformation();
+        boolean mustTransform = !isNestedQuery() && queryMetadata.needTransformation();
         String  items[]       = getFormattedClauses(mustTransform);
         String s = mustTransform ?
                            getTransformedClauses(items) :
                            String.join(".", items);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Gremlin: {}", s);
-        }
-
         return s;
     }
 
@@ -384,10 +369,6 @@
     }
 
     public void addOrderBy(String name, boolean isDesc) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addOrderBy(name={}, isDesc={})", name, isDesc);
-        }
-
         IdentifierHelper.Info ia = createInfo(name);
         if (queryMetadata.hasSelect() && queryMetadata.hasGroupBy()) {
             addSelectTransformation(this.context.selectClauseComposer, getPropertyForClause(ia), isDesc);
@@ -399,7 +380,16 @@
         }
     }
 
-    public long getDateFormat(String s) {
+    public boolean hasFromClause() {
+        return queryClauses.contains(GremlinClause.HAS_TYPE) != -1 ||
+                       queryClauses.contains(GremlinClause.HAS_TYPE_WITHIN) != -1;
+    }
+
+    private void addWithNormalizedValue(GremlinClause clause, String propertyForClause, String symbol, Object normalizedRhs, String strValue) {
+        queryClauses.add(new GremlinClauseValue(clause, propertyForClause, symbol, normalizedRhs, strValue));
+    }
+
+    private long getDateFormat(String s) {
 
         for (DateFormat dateFormat : DSL_DATE_FORMAT.get()) {
             try {
@@ -412,9 +402,12 @@
         return -1;
     }
 
-    public boolean hasFromClause() {
-        return queryClauses.contains(GremlinClause.HAS_TYPE) != -1 ||
-                       queryClauses.contains(GremlinClause.HAS_TYPE_WITHIN) != -1;
+    private List<String> addToSubClause(List<GremlinQueryComposer> clauses) {
+        for (GremlinQueryComposer entry : clauses) {
+            this.addSubClauses(this.queryClauses.size(), entry.getQueryClauses());
+        }
+
+        return clauses.stream().map(x -> x.get()).collect(Collectors.toList());
     }
 
     private String getPropertyForClause(IdentifierHelper.Info ia) {
@@ -431,10 +424,6 @@
     }
 
     private void process(SelectClauseComposer scc) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addSelect(items.length={})", scc.getItems() != null ? scc.getItems().length : 0);
-        }
-
         if (scc.getItems() == null) {
             return;
         }
@@ -460,20 +449,20 @@
                 continue;
             }
 
-            scc.isSelectNoop = hasNoopCondition(ia);
-            if (scc.isSelectNoop) {
+            scc.setIsSelectNoop(hasNoopCondition(ia));
+            if (scc.getIsSelectNoop()) {
                 return;
             }
 
             if (introduceType(ia)) {
                 scc.incrementTypesIntroduced();
-                scc.isSelectNoop = !ia.hasParts();
+                scc.setIsSelectNoop(!ia.hasParts());
                 if (ia.hasParts()) {
                     scc.assign(i, getPropertyForClause(createInfo(ia.get())), GremlinClause.INLINE_GET_PROPERTY);
                 }
             } else {
                 scc.assign(i, getPropertyForClause(ia), GremlinClause.INLINE_GET_PROPERTY);
-                scc.incrementPrimitiveType();
+                scc.setIsPrimitiveAttr(i);
             }
         }
 
@@ -499,7 +488,8 @@
         String body     = String.join(".", Stream.of(items).filter(Objects::nonNull).collect(Collectors.toList()));
         String inlineFn = queryClauses.getValue(queryClauses.size() - 1);
         String funCall  = String.format(inlineFn, body);
-        if (isNestedQuery) {
+
+        if (isNestedQuery()) {
             ret = String.join(".", queryClauses.getValue(0), funCall);
         } else {
             ret = queryClauses.getValue(0) + funCall;
@@ -522,7 +512,7 @@
                                          final String orderByQualifiedAttrName,
                                          final boolean isDesc) {
         GremlinClause gremlinClause;
-        if (selectClauseComposer.isSelectNoop) {
+        if (selectClauseComposer.getIsSelectNoop()) {
             gremlinClause = GremlinClause.SELECT_NOOP_FN;
         } else if (queryMetadata.hasGroupBy()) {
             gremlinClause = selectClauseComposer.onlyAggregators() ? GremlinClause.SELECT_ONLY_AGG_GRP_FN : GremlinClause.SELECT_MULTI_ATTR_GRP_FN;
@@ -532,7 +522,7 @@
         if (StringUtils.isEmpty(orderByQualifiedAttrName)) {
             add(0, gremlinClause,
                 selectClauseComposer.getLabelHeader(),
-                selectClauseComposer.hasAssignmentExpr() ? selectClauseComposer.getAssignmentExprString() : EMPTY_STRING,
+                selectClauseComposer.getAssignmentExprString(),
                 selectClauseComposer.getItemsString(),
                 EMPTY_STRING);
         } else {
@@ -544,7 +534,7 @@
             String idxStr = String.valueOf(itemIdx);
             add(0, gremlinClause,
                 selectClauseComposer.getLabelHeader(),
-                selectClauseComposer.hasAssignmentExpr() ? selectClauseComposer.getAssignmentExprString() : EMPTY_STRING,
+                selectClauseComposer.getAssignmentExprString(),
                 selectClauseComposer.getItemsString(),
                 sortClause.get(idxStr, idxStr)
             );
@@ -570,8 +560,9 @@
     }
 
     private void close() {
-        if (isNestedQuery)
+        if (isNestedQuery()) {
             return;
+        }
 
         // Need de-duping at the end so that correct results are fetched
         if (queryClauses.size() > 2) {
@@ -596,6 +587,14 @@
         moveToLast(GremlinClause.INLINE_TRANSFORM_CALL);
     }
 
+    private boolean isNestedQuery() {
+        return this.parent != null;
+    }
+
+    private void addSubClauses(int index, GremlinClauseList queryClauses) {
+        this.queryClauses.addSubClauses(index, queryClauses);
+    }
+
     private void moveToLast(GremlinClause clause) {
         int index = queryClauses.contains(clause);
         if (-1 == index) {
@@ -620,7 +619,7 @@
     }
 
     private void init() {
-        if (!isNestedQuery) {
+        if (!isNestedQuery()) {
             add(GremlinClause.G);
             add(GremlinClause.V);
         } else {
@@ -647,10 +646,6 @@
     }
 
     private void addRangeClause(String startIndex, String endIndex) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addRangeClause(startIndex={}, endIndex={})", startIndex, endIndex);
-        }
-
         if (queryMetadata.hasSelect()) {
             add(queryClauses.size() - 1, GremlinClause.RANGE, startIndex, startIndex, endIndex, startIndex, startIndex, endIndex);
         } else {
@@ -659,18 +654,10 @@
     }
 
     private void addOrderByClause(IdentifierHelper.Info ia, boolean descr) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addOrderByClause(name={})", ia.getRaw(), descr);
-        }
-
         add((!descr) ? GremlinClause.ORDER_BY : GremlinClause.ORDER_BY_DESC, ia);
     }
 
     private void addGroupByClause(String name) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("addGroupByClause(name={})", name);
-        }
-
         IdentifierHelper.Info ia = createInfo(name);
         add(GremlinClause.GROUP_BY, ia);
     }
@@ -684,7 +671,7 @@
     }
 
     private void add(GremlinClause clause, String... args) {
-        queryClauses.add(new GremlinClauseValue(clause, clause.get(args)));
+        queryClauses.add(new GremlinClauseValue(clause, args));
     }
 
     public void add(GremlinClauseValue gv) {
@@ -692,7 +679,7 @@
     }
 
     private void add(int idx, GremlinClause clause, String... args) {
-        queryClauses.add(idx, new GremlinClauseValue(clause, clause.get(args)));
+        queryClauses.add(idx, new GremlinClauseValue(clause, args));
     }
 
     private void addTrait(GremlinClause clause, IdentifierHelper.Info idInfo) {
@@ -703,22 +690,54 @@
         add(clause, idInfo.get(), idInfo.get());
     }
 
-    static class GremlinClauseValue {
+    public GremlinClauseList clauses() {
+        return queryClauses;
+    }
+
+    public SelectClauseComposer getSelectComposer() {
+        return this.context.selectClauseComposer;
+    }
+
+    public static class GremlinClauseValue {
         private final GremlinClause clause;
         private final String        value;
+        private final String[]      values;
+        private final Object        rawValue;
 
-        public GremlinClauseValue(GremlinClause clause, String value) {
+        public GremlinClauseValue(GremlinClause clause, String property, String operator, Object rawValue, String str) {
             this.clause = clause;
-            this.value = value;
+            this.value = clause.get(property, operator, str);
+            this.values = new String[] {property, operator, str};
+            this.rawValue = rawValue;
+        }
+
+        public GremlinClauseValue(GremlinClause clause, String... values) {
+            this.clause = clause;
+            this.value = clause.get(values);
+            this.values = values;
+            this.rawValue = null;
         }
 
         public GremlinClause getClause() {
             return clause;
         }
 
-        public String getValue() {
+        public String getClauseWithValue() {
             return value;
         }
+
+        public String[] getValues() {
+            return values;
+        }
+
+        public Object getRawValue() {
+            return this.rawValue;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s", clause);
+        }
     }
 
     @VisibleForTesting
@@ -726,15 +745,15 @@
         private static final AtlasStructType UNKNOWN_TYPE = new AtlasStructType(new AtlasStructDef());
 
         private final Lookup lookup;
+        private final ClauseValidator       validator;
         private final Map<String, String>   aliasMap = new HashMap<>();
         private AtlasType                   activeType;
         private SelectClauseComposer        selectClauseComposer;
-        private ClauseValidator             validator;
         private String                      numericTypeFormatter = "";
 
         public Context(Lookup lookup) {
             this.lookup = lookup;
-            validator = new ClauseValidator(lookup);
+            this.validator = new ClauseValidator();
         }
 
         public void registerActive(String typeName) {
@@ -833,11 +852,9 @@
     }
 
     private static class ClauseValidator {
-        private final Lookup lookup;
         List<String> errorList = new ArrayList<>();
 
-        public ClauseValidator(Lookup lookup) {
-            this.lookup = lookup;
+        public ClauseValidator() {
         }
 
         public boolean isValid(Context ctx, GremlinClause clause, IdentifierHelper.Info ia) {
diff --git a/repository/src/main/java/org/apache/atlas/query/IdentifierHelper.java b/repository/src/main/java/org/apache/atlas/query/IdentifierHelper.java
index a1278f4..c53a324 100644
--- a/repository/src/main/java/org/apache/atlas/query/IdentifierHelper.java
+++ b/repository/src/main/java/org/apache/atlas/query/IdentifierHelper.java
@@ -127,6 +127,7 @@
     public static boolean isInCompleteValue(String s) {
         return StringUtils.isNotEmpty(s) && (StringUtils.equals(s, "1") || StringUtils.equalsIgnoreCase(s, "true"));
     }
+
     public static class Info {
         private String   raw;
         private String   actual;
diff --git a/repository/src/main/java/org/apache/atlas/query/SelectClauseComposer.java b/repository/src/main/java/org/apache/atlas/query/SelectClauseComposer.java
index 969fcd2..70685d8 100644
--- a/repository/src/main/java/org/apache/atlas/query/SelectClauseComposer.java
+++ b/repository/src/main/java/org/apache/atlas/query/SelectClauseComposer.java
@@ -18,33 +18,68 @@
 
 package org.apache.atlas.query;
 
+import org.apache.commons.lang.StringUtils;
+
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.StringJoiner;
 
-class SelectClauseComposer {
+public class SelectClauseComposer {
     private static final String COUNT_STR = "count";
     private static final String MIN_STR = "min";
     private static final String MAX_STR = "max";
     private static final String SUM_STR = "sum";
 
-    public boolean  isSelectNoop;
 
-    private String[]            labels;
-    private String[]            attributes; // Qualified names
-    private String[]            items;
-    private Map<String, String> itemAssignmentExprs;
+    private final String[]                     labels;
+    private final String[]                     attributes;
+    private final String[]                     items;
 
-    private int     countIdx = -1;
-    private int     sumIdx   = -1;
-    private int     maxIdx   = -1;
-    private int     minIdx   = -1;
-    private int     aggCount = 0;
-    private int     introducedTypesCount = 0;
-    private int     primitiveTypeCount = 0;
+    private final int                          countIdx;
+    private final int                          sumIdx;
+    private final int                          minIdx;
+    private final int                          maxIdx;
+    private final int                          aggCount;
+    private final Map<Integer, AggregatorFlag> aggregatorFlags      = new HashMap();
+    private final Set<Integer>                 isNumericAggregator  = new HashSet<>();
+    private final Set<Integer>                 isPrimitiveAttr      = new HashSet<>();
+    private final Map<String, String>          itemAssignmentExprs  = new LinkedHashMap<>();
+    private       boolean                      isSelectNoop         = false;
+    private       int                          introducedTypesCount = 0;
 
-    public SelectClauseComposer() {}
+    public SelectClauseComposer(String[] labels, String[] attributes, String[] items, int countIdx, int sumIdx, int minIdx, int maxIdx) {
+        this.labels     = labels;
+        this.attributes = Arrays.copyOf(attributes, attributes.length);
+        this.items      = Arrays.copyOf(items, items.length);
+        this.countIdx   = countIdx;
+        this.sumIdx     = sumIdx;
+        this.minIdx     = minIdx;
+        this.maxIdx     = maxIdx;
+        int aggCount = 0;
+        if (countIdx != -1) {
+            this.aggregatorFlags.put(countIdx, AggregatorFlag.COUNT);
+            aggCount++;
+        }
+        if (sumIdx != -1) {
+            this.aggregatorFlags.put(sumIdx, AggregatorFlag.SUM);
+            aggCount++;
+        }
+        if (maxIdx != -1) {
+            this.aggregatorFlags.put(maxIdx, AggregatorFlag.MAX);
+            aggCount++;
+        }
+        if (minIdx != -1) {
+            this.aggregatorFlags.put(minIdx, AggregatorFlag.MIN);
+
+            aggCount++;
+        }
+
+        this.aggCount = aggCount;
+    }
 
     public static boolean isKeyword(String s) {
         return COUNT_STR.equals(s) ||
@@ -55,26 +90,26 @@
 
     public String[] getItems() {
         return items;
-    }
 
-    public void setItems(final String[] items) {
-        this.items = Arrays.copyOf(items, items.length);
     }
 
     public boolean updateAsApplicable(int currentIndex, String propertyForClause, String qualifiedName) {
         boolean ret = false;
         if (currentIndex == getCountIdx()) {
             ret = assign(currentIndex, COUNT_STR, GremlinClause.INLINE_COUNT.get(), GremlinClause.INLINE_ASSIGNMENT);
+            this.isNumericAggregator.add(currentIndex);
         } else if (currentIndex == getMinIdx()) {
             ret = assign(currentIndex, MIN_STR, propertyForClause,  GremlinClause.INLINE_ASSIGNMENT, GremlinClause.INLINE_MIN);
+            this.isNumericAggregator.add(currentIndex);
         } else if (currentIndex == getMaxIdx()) {
             ret = assign(currentIndex, MAX_STR, propertyForClause, GremlinClause.INLINE_ASSIGNMENT, GremlinClause.INLINE_MAX);
+            this.isNumericAggregator.add(currentIndex);
         } else if (currentIndex == getSumIdx()) {
             ret = assign(currentIndex, SUM_STR, propertyForClause, GremlinClause.INLINE_ASSIGNMENT, GremlinClause.INLINE_SUM);
-        } else {
-            attributes[currentIndex] = qualifiedName;
+            this.isNumericAggregator.add(currentIndex);
         }
 
+        attributes[currentIndex] = qualifiedName;
         return ret;
     }
 
@@ -82,9 +117,6 @@
         return attributes;
     }
 
-    public void setAttributes(final String[] attributes) {
-        this.attributes = Arrays.copyOf(attributes, attributes.length);
-    }
 
     public boolean assign(int i, String qualifiedName, GremlinClause clause) {
         items[i] = clause.get(qualifiedName, qualifiedName);
@@ -95,14 +127,6 @@
         return labels;
     }
 
-    public void setLabels(final String[] labels) {
-        this.labels = labels;
-    }
-
-    public boolean hasAssignmentExpr() {
-        return itemAssignmentExprs != null && !itemAssignmentExprs.isEmpty();
-    }
-
     public boolean onlyAggregators() {
         return hasAggregators() && aggCount == items.length;
     }
@@ -120,7 +144,7 @@
     }
 
     public String getAssignmentExprString(){
-        return String.join(" ", itemAssignmentExprs.values());
+        return (!itemAssignmentExprs.isEmpty()) ? String.join(" ", itemAssignmentExprs.values()) : StringUtils.EMPTY;
     }
 
     public String getItem(int i) {
@@ -147,10 +171,6 @@
     }
 
     private boolean assign(String item, String assignExpr) {
-        if (itemAssignmentExprs == null) {
-            itemAssignmentExprs = new LinkedHashMap<>();
-        }
-
         itemAssignmentExprs.put(item, assignExpr);
         return true;
     }
@@ -170,40 +190,20 @@
         return countIdx;
     }
 
-    public void setCountIdx(final int countIdx) {
-        this.countIdx = countIdx;
-        aggCount++;
-    }
 
     public int getSumIdx() {
         return sumIdx;
     }
 
-    public void setSumIdx(final int sumIdx) {
-        this.sumIdx = sumIdx;
-        aggCount++;
-    }
 
     public int getMaxIdx() {
         return maxIdx;
     }
 
-    public void setMaxIdx(final int maxIdx) {
-        this.maxIdx = maxIdx;
-        aggCount++;
-    }
 
     public int getMinIdx() {
         return minIdx;
-    }
 
-    public void setMinIdx(final int minIdx) {
-        this.minIdx = minIdx;
-        aggCount++;
-    }
-
-    public boolean isAggregatorIdx(int idx) {
-        return getMinIdx() == idx || getMaxIdx() == idx || getCountIdx() == idx || getSumIdx() == idx;
     }
 
     private String getJoinedQuotedStr(String[] elements) {
@@ -226,10 +226,6 @@
         return introducedTypesCount;
     }
 
-    public void incrementPrimitiveType() {
-        primitiveTypeCount++;
-    }
-
     public boolean hasMultipleReferredTypes() {
         return getIntroducedTypesCount() > 1;
     }
@@ -239,6 +235,45 @@
     }
 
     private int getPrimitiveTypeCount() {
-        return primitiveTypeCount;
+        return isPrimitiveAttr.size();
+    }
+
+    public boolean getIsSelectNoop() {
+        return this.isSelectNoop;
+    }
+    public void setIsSelectNoop(boolean isSelectNoop) {
+        this.isSelectNoop = isSelectNoop;
+    }
+
+    public boolean isSumIdx(int idx) {
+        return aggregatorFlags.get(idx) == AggregatorFlag.SUM;
+    }
+
+    public boolean isMinIdx(int idx) {
+        return aggregatorFlags.get(idx) == AggregatorFlag.MIN;
+    }
+
+    public boolean isMaxIdx(int idx) {
+        return aggregatorFlags.get(idx) == AggregatorFlag.MAX;
+    }
+
+    public boolean isCountIdx(int idx) {
+        return aggregatorFlags.get(idx) == AggregatorFlag.COUNT;
+    }
+
+    public boolean isNumericAggregator(int idx) {
+        return isNumericAggregator.contains(idx);
+    }
+
+    public boolean isPrimitiveAttribute(int idx) {
+        return isPrimitiveAttr.contains(idx);
+    }
+
+    public void setIsPrimitiveAttr(int i) {
+        this.isPrimitiveAttr.add(i);
+    }
+
+    public enum AggregatorFlag {
+        NONE, COUNT, MIN, MAX, SUM
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/query/executors/DSLQueryExecutor.java b/repository/src/main/java/org/apache/atlas/query/executors/DSLQueryExecutor.java
new file mode 100644
index 0000000..2053634
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/query/executors/DSLQueryExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query.executors;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+
+public interface DSLQueryExecutor {
+    /***
+     * Query executors implement this interface.
+     * @param dslQuery DSL Query to be executed.
+     * @param limit Number of records.
+     * @param offset Execute same query with different offset to fetch additional results.
+     * @return
+     * @throws AtlasBaseException
+     */
+    AtlasSearchResult execute(String dslQuery, int limit, int offset) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/query/executors/GremlinClauseToTraversalTranslator.java b/repository/src/main/java/org/apache/atlas/query/executors/GremlinClauseToTraversalTranslator.java
new file mode 100644
index 0000000..c479af7
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/query/executors/GremlinClauseToTraversalTranslator.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query.executors;
+
+import org.apache.atlas.query.GremlinClause;
+import org.apache.atlas.query.GremlinClauseList;
+import org.apache.atlas.query.GremlinQueryComposer;
+import org.apache.atlas.query.IdentifierHelper;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
+import org.apache.atlas.type.Constants;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+public class GremlinClauseToTraversalTranslator {
+    private static final Logger LOG = LoggerFactory.getLogger(GremlinClauseToTraversalTranslator.class);
+
+    public static AtlasGraphTraversal run(AtlasGraph graph, GremlinClauseList clauseList) {
+        return new ClauseTranslator(graph).process(clauseList);
+    }
+
+    private static class ClauseTranslator {
+        private static final String ATTR_PROPERTY_NAME               = "__name";
+        private static final String EDGE_NAME_CLASSIFIED_AS          = "classifiedAs";
+        private static final String EDGE_NAME_TRAIT_NAMES            = "__traitNames";
+        private static final String EDGE_NAME_PROPAGATED_TRAIT_NAMES = "__propagatedTraitNames";
+
+        private final AtlasGraph graph;
+
+        public ClauseTranslator(AtlasGraph graph) {
+            this.graph = graph;
+        }
+
+        public AtlasGraphTraversal process(GremlinClauseList clauseList) {
+            Stack<List<AtlasGraphTraversal>> subTraversals = new Stack<>();
+            AtlasGraphTraversal ret = process(null, subTraversals, clauseList);
+            if (!subTraversals.isEmpty()) {
+                String errorMessage = "Sub-traversals found not to be empty! " + subTraversals.toString();
+                LOG.warn(errorMessage);
+                throw new RuntimeException(errorMessage);
+            }
+
+            return ret;
+        }
+
+        private AtlasGraphTraversal process(AtlasGraphTraversal traversal,
+                                            Stack<List<AtlasGraphTraversal>> collected,
+                                            GremlinClauseList clauseList) {
+            int size = clauseList.getList().size();
+            for (int index = 0; index < size; index++) {
+
+                if (clauseList.hasSubClause(index)) {
+                    List<GremlinClauseList> subClauses = clauseList.getSubClauses(index);
+
+                    collected.push(new ArrayList<>());
+                    for (GremlinClauseList sc : subClauses) {
+                        process(traversal, collected, sc);
+                    }
+                }
+
+                traversal = traverse(traversal, collected, clauseList.get(index));
+            }
+
+            return traversal;
+        }
+
+        private AtlasGraphTraversal traverse(AtlasGraphTraversal traversal,
+                                             Stack<List<AtlasGraphTraversal>> trLists,
+                                             GremlinQueryComposer.GremlinClauseValue clauseValue) {
+
+            GremlinClause clause = clauseValue.getClause();
+            String[] values = clauseValue.getValues();
+            switch (clause) {
+                case G:
+                    break;
+
+                case V:
+                    traversal = graph.V();
+                    break;
+
+                case AS:
+                    traversal.as(values[0]);
+                    break;
+
+                case AND: {
+                    if (trLists != null && !trLists.peek().isEmpty()) {
+                        List<AtlasGraphTraversal> subTraversals = trLists.pop();
+                        traversal.and(subTraversals.toArray(new Traversal[0]));
+                    } else {
+                        throw new RuntimeException("subTraversals not expected to be NULL: " + clause.toString());
+                    }
+                }
+                break;
+
+                case OR: {
+                    if (trLists != null && !trLists.peek().isEmpty()) {
+                        List<AtlasGraphTraversal> subTraversals = trLists.pop();
+                        traversal.or(subTraversals.toArray(new Traversal[0]));
+                    } else {
+                        throw new RuntimeException("subTraversals not expected to be NULL: " + clause.toString());
+                    }
+                }
+                break;
+
+                case HAS_PROPERTY:
+                    traversal.has(values[0]);
+                    break;
+
+                case HAS_NOT_PROPERTY:
+                    traversal.hasNot(values[0]);
+                    break;
+
+                case HAS_OPERATOR:
+                    P predicate = getPredicate(values[1], values[2], clauseValue.getRawValue());
+                    traversal.has(values[0], predicate);
+                    break;
+
+                case HAS_TYPE:
+                    traversal.has(Constants.TYPE_NAME_PROPERTY_KEY, values[0]);
+                    break;
+
+                case HAS_WITHIN:
+                    traversal.has(values[0], values[1]);
+                    break;
+
+                case IN:
+                    traversal.in(removeRedundantQuotes(values[0]));
+                    break;
+
+                case OUT:
+                    traversal.out(removeRedundantQuotes(values[0]));
+                    break;
+
+                case ANY_TRAIT:
+                    traversal.or(
+                            traversal.startAnonymousTraversal().has(EDGE_NAME_TRAIT_NAMES),
+                            traversal.startAnonymousTraversal().has(EDGE_NAME_PROPAGATED_TRAIT_NAMES));
+                    break;
+
+                case TRAIT:
+                    traversal.outE(EDGE_NAME_CLASSIFIED_AS).has(ATTR_PROPERTY_NAME, P.within(values[0])).outV();
+                    break;
+
+                case NO_TRAIT:
+                    traversal.and(
+                            traversal.startAnonymousTraversal().hasNot(EDGE_NAME_TRAIT_NAMES),
+                            traversal.startAnonymousTraversal().hasNot(EDGE_NAME_PROPAGATED_TRAIT_NAMES));
+                    break;
+
+                case DEDUP:
+                    traversal.dedup();
+                    break;
+
+                case LIMIT:
+                    traversal.limit(Scope.global, Long.valueOf(values[0]));
+                    break;
+
+                case TO_LIST:
+                    traversal.getAtlasVertexList();
+                    break;
+
+                case NESTED_START:
+                    traversal = traversal.startAnonymousTraversal();
+                    trLists.peek().add(traversal);
+                    break;
+
+                case HAS_TYPE_WITHIN:
+                    String[] subTypes = StringUtils.split(removeRedundantQuotes(values[0]), ',');
+                    traversal.has("__typeName", P.within(subTypes));
+                    break;
+
+                case GROUP_BY:
+                    traversal.has(values[0]).group().by(values[0]);
+                    break;
+
+                case ORDER_BY:
+                    traversal.has(values[0]).order().by(values[0]);
+                    break;
+
+                case ORDER_BY_DESC:
+                    traversal.has(values[0]).order().by(values[0], Order.desc);
+                    break;
+
+                case STRING_CONTAINS:
+                    traversal.textRegEx(values[0], removeRedundantQuotes(values[1]));
+                    break;
+
+                case TEXT_CONTAINS:
+                    traversal.textContainsRegEx(values[0], removeRedundantQuotes(values[1]));
+                    break;
+
+                case RANGE:
+                    traversal.dedup();
+                    long low  = Long.parseLong(values[1]);
+                    long high = low + Long.parseLong(values[2]);
+                    traversal.range(Scope.global, low, high);
+                    break;
+
+                case SELECT_FN:
+                case SELECT_NOOP_FN:
+                case SELECT_ONLY_AGG_GRP_FN:
+                case INLINE_TRANSFORM_CALL:
+                case SELECT_MULTI_ATTR_GRP_FN:
+                    break;
+
+                case TERM:
+                    String term = String.format("AtlasGlossaryTerm.%s", values[0]);
+                    traversal.and(
+                            traversal.startAnonymousTraversal()
+                            .in(org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL)
+                            .has(term, P.eq(values[1]))
+                            .dedup()
+                            );
+                    break;
+
+                default:
+                    LOG.warn("Clause not translated: {}. Can potentially lead to incorrect results.", clause);
+                    break;
+            }
+            return traversal;
+        }
+
+        private P getPredicate(String operator, String strRhs,Object rhs) {
+            switch (operator.toUpperCase()) {
+                case "LT":
+                    return P.lt(rhs);
+
+                case "GT":
+                    return P.gt(rhs);
+
+                case "LTE":
+                    return P.lte(rhs);
+
+                case "GTE":
+                    return P.gte(rhs);
+
+                case "EQ":
+                    return P.eq(rhs);
+
+                case "NEQ":
+                    return P.neq(rhs);
+
+                case "WITHIN":
+                    String[] strs = csvToArray(strRhs);
+                    return P.within(strs);
+
+                default:
+                    LOG.warn("Operator: {} not translated.", operator);
+                    return null;
+            }
+        }
+
+        private String[] csvToArray(String strRhs) {
+            String csvRow = StringUtils.replaceEach(strRhs, new String[]{"[", "]", "'"}, new String[]{"", "", ""});
+            return csvRow.split(",");
+        }
+
+        private String removeRedundantQuotes(String value) {
+            return IdentifierHelper.removeQuotes(value);
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/query/executors/ScriptEngineBasedExecutor.java b/repository/src/main/java/org/apache/atlas/query/executors/ScriptEngineBasedExecutor.java
new file mode 100644
index 0000000..fc84499
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/query/executors/ScriptEngineBasedExecutor.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query.executors;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.query.AtlasDSL;
+import org.apache.atlas.query.GremlinQuery;
+import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ScriptEngineBasedExecutor implements DSLQueryExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(ScriptEngineBasedExecutor.class);
+    private final AtlasTypeRegistry     typeRegistry;
+    private final AtlasGraph            graph;
+    private final EntityGraphRetriever  entityRetriever;
+
+    public ScriptEngineBasedExecutor(AtlasTypeRegistry typeRegistry, AtlasGraph graph, EntityGraphRetriever entityRetriever) {
+        this.typeRegistry    = typeRegistry;
+        this.graph           = graph;
+        this.entityRetriever = entityRetriever;
+    }
+
+    @Override
+    public AtlasSearchResult execute(String dslQuery, int limit, int offset) throws AtlasBaseException {
+        AtlasSearchResult ret          = new AtlasSearchResult(dslQuery, AtlasSearchResult.AtlasQueryType.DSL);
+        GremlinQuery gremlinQuery = toGremlinQuery(dslQuery, limit, offset);
+        String            queryStr     = gremlinQuery.queryStr();
+
+        Object result = graph.executeGremlinScript(queryStr, false);
+        if (result instanceof List && CollectionUtils.isNotEmpty((List)result)) {
+            List   queryResult  = (List) result;
+            Object firstElement = queryResult.get(0);
+
+            if (firstElement instanceof AtlasVertex) {
+                for (Object element : queryResult) {
+                    if (element instanceof AtlasVertex) {
+                        ret.addEntity(entityRetriever.toAtlasEntityHeaderWithClassifications((AtlasVertex)element));
+                    } else {
+                        LOG.warn("searchUsingDslQuery({}): expected an AtlasVertex; found unexpected entry in result {}", dslQuery, element);
+                    }
+                }
+            } else if (gremlinQuery.hasSelectList()) {
+                ret.setAttributes(toAttributesResult(queryResult, gremlinQuery));
+            } else if (firstElement instanceof Map) {
+                for (Object element : queryResult) {
+                    if (element instanceof Map) {
+                        Map map = (Map)element;
+
+                        for (Object key : map.keySet()) {
+                            Object value = map.get(key);
+
+                            if (value instanceof List && CollectionUtils.isNotEmpty((List)value)) {
+                                for (Object o : (List) value) {
+                                    Object entry = o;
+                                    if (entry instanceof AtlasVertex) {
+                                        ret.addEntity(entityRetriever.toAtlasEntityHeader((AtlasVertex) entry));
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            } else {
+                LOG.warn("searchUsingDslQuery({}/{}): found unexpected entry in result {}", dslQuery, dslQuery, gremlinQuery.queryStr());
+            }
+        }
+
+        return ret;
+    }
+
+    private GremlinQuery toGremlinQuery(String query, int limit, int offset) throws AtlasBaseException {
+        QueryParams params       = QueryParams.getNormalizedParams(limit, offset);
+        GremlinQuery                gremlinQuery = new AtlasDSL.Translator(query, typeRegistry, params.offset(), params.limit()).translate();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Translated Gremlin Query: {}", gremlinQuery.queryStr());
+        }
+
+        return gremlinQuery;
+    }
+
+    private AtlasSearchResult.AttributeSearchResult toAttributesResult(List results, GremlinQuery query) {
+        AtlasSearchResult.AttributeSearchResult ret = new AtlasSearchResult.AttributeSearchResult();
+        List<String> names = (List<String>) results.get(0);
+        List<List<Object>> values = extractValues(results.subList(1, results.size()));
+
+        ret.setName(names);
+        ret.setValues(values);
+        return ret;
+    }
+
+    private List<List<Object>> extractValues(List results) {
+        List<List<Object>> values = new ArrayList<>();
+
+        for (Object obj : results) {
+            if (obj instanceof Map) {
+                Map map = (Map) obj;
+                List<Object> list = new ArrayList<>();
+                if (MapUtils.isNotEmpty(map)) {
+                    for (Object key : map.keySet()) {
+                        Object vals = map.get(key);
+                        if(vals instanceof List) {
+                            List l = (List) vals;
+                            list.addAll(l);
+                        }
+
+                    }
+
+                    values.add(list);
+                }
+            } else if (obj instanceof List) {
+                List list = (List) obj;
+                if (CollectionUtils.isNotEmpty(list)) {
+                    values.add(list);
+                }
+            }
+        }
+
+        return values;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/query/executors/SelectClauseProjections.java b/repository/src/main/java/org/apache/atlas/query/executors/SelectClauseProjections.java
new file mode 100644
index 0000000..a104f19
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/query/executors/SelectClauseProjections.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query.executors;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.query.GremlinQuery;
+import org.apache.atlas.query.SelectClauseComposer;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SelectClauseProjections {
+    private static final Logger LOG = LoggerFactory.getLogger(SelectClauseProjections.class);
+
+    public static AtlasSearchResult usingList(final GremlinQuery queryInfo,
+                                             final EntityGraphRetriever entityRetriever,
+                                             final Collection<AtlasVertex> resultList) throws AtlasBaseException {
+        AtlasSearchResult ret = new AtlasSearchResult();
+        SelectClauseComposer SelectClauseInfo  = queryInfo.getSelectComposer();
+        AtlasSearchResult.AttributeSearchResult attributeSearchResult = new AtlasSearchResult.AttributeSearchResult();
+
+        attributeSearchResult.setName(Arrays.stream(SelectClauseInfo.getLabels()).collect(Collectors.toList()));
+
+        Collection<List<Object>> values = getProjectionRows(resultList, SelectClauseInfo, entityRetriever);
+
+        if (values instanceof List) {
+            attributeSearchResult.setValues((List) values);
+        } else if (values instanceof Set) {
+            attributeSearchResult.setValues(new ArrayList<>(values));
+        }
+
+        ret.setAttributes(attributeSearchResult);
+
+        return ret;
+    }
+
+    public static AtlasSearchResult usingMap(final GremlinQuery gremlinQuery,
+                                             final EntityGraphRetriever entityRetriever,
+                                             final Map<String, Collection<AtlasVertex>> resultMap) throws AtlasBaseException {
+        AtlasSearchResult ret = new AtlasSearchResult();
+        SelectClauseComposer  selectClauseInfo  = gremlinQuery.getSelectComposer();
+        AtlasSearchResult.AttributeSearchResult attributeSearchResult = new AtlasSearchResult.AttributeSearchResult();
+
+        attributeSearchResult.setName(Arrays.stream(selectClauseInfo.getLabels()).collect(Collectors.toList()));
+
+        List<List<Object>> values = new ArrayList<>();
+        for (Collection<AtlasVertex> value : resultMap.values()) {
+            Collection<List<Object>> projectionRows = getProjectionRows(value, selectClauseInfo, entityRetriever);
+            values.addAll(projectionRows);
+        }
+
+        attributeSearchResult.setValues(getSublistForGroupBy(gremlinQuery, values));
+        ret.setAttributes(attributeSearchResult);
+        return ret;
+    }
+
+    private static List<List<Object>> getSublistForGroupBy(GremlinQuery gremlinQuery, List<List<Object>> values) {
+        int startIndex = gremlinQuery.getQueryMetadata().getResolvedOffset() - 1 ;
+        if (startIndex < 0) {
+            startIndex = 0;
+        }
+
+        int endIndex = startIndex + gremlinQuery.getQueryMetadata().getResolvedLimit();
+        if (startIndex >= values.size()) {
+            endIndex = 0;
+            startIndex = 0;
+        }
+
+        if (endIndex >= values.size()) {
+            endIndex = values.size();
+        }
+
+        return values.subList(startIndex, endIndex);
+    }
+
+    private static Collection<List<Object>> getProjectionRows(final Collection<AtlasVertex> vertices,
+                                                              final SelectClauseComposer selectClauseComposer,
+                                                              final EntityGraphRetriever entityRetriever) throws AtlasBaseException {
+        Collection<List<Object>> values = new HashSet<>();
+
+        for (AtlasVertex vertex : vertices) {
+            List<Object> row = new ArrayList<>();
+
+            for (int idx = 0; idx < selectClauseComposer.getLabels().length; idx++) {
+                if (selectClauseComposer.isMinIdx(idx)) {
+                    row.add(computeMin(vertices, selectClauseComposer, idx));
+                } else if (selectClauseComposer.isMaxIdx(idx)) {
+                    row.add(computeMax(vertices, selectClauseComposer, idx));
+                } else if (selectClauseComposer.isCountIdx(idx)) {
+                    row.add(vertices.size());
+                } else if (selectClauseComposer.isSumIdx(idx)) {
+                    row.add(computeSum(vertices, selectClauseComposer, idx));
+                } else {
+                    if (selectClauseComposer.isPrimitiveAttribute(idx)) {
+                        String propertyName = selectClauseComposer.getAttribute(idx);
+                        row.add(vertex.getProperty(propertyName, Object.class));
+                    } else {
+                        row.add(entityRetriever.toAtlasEntityHeaderWithClassifications(vertex));
+                    }
+                }
+            }
+
+            values.add(row);
+        }
+
+        return values;
+    }
+
+    private static Number computeSum(final Collection<AtlasVertex> vertices, final SelectClauseComposer selectClauseComposer, final int idx) {
+        if (!selectClauseComposer.isNumericAggregator(idx)) {
+            return 0;
+        }
+
+        final String propertyName = selectClauseComposer.getAttribute(idx);
+        double sum = 0;
+        for (AtlasVertex vertex : vertices) {
+            Number value = vertex.getProperty(propertyName, Number.class);
+            if (value != null) {
+                sum += value.doubleValue();
+            } else {
+                LOG.warn("Property: {} for vertex: {} not found!", propertyName, vertex.getId());
+            }
+        }
+        return sum;
+    }
+
+    private static Object computeMax(final Collection<AtlasVertex> vertices, final SelectClauseComposer selectClauseComposer, final int idx) {
+        final String propertyName = selectClauseComposer.getAttribute(idx);
+
+        if (selectClauseComposer.isNumericAggregator(idx)) {
+            AtlasVertex maxV = Collections.max(vertices, new VertexPropertyComparator(propertyName));
+            return maxV.getProperty(propertyName, Object.class);
+        } else {
+            return Collections.max(vertices.stream().map(v -> v.getProperty(propertyName, String.class))
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList()), String.CASE_INSENSITIVE_ORDER);
+        }
+    }
+
+    private static Object computeMin(final Collection<AtlasVertex> vertices, final SelectClauseComposer selectClauseComposer, final int idx) {
+        final String propertyName = selectClauseComposer.getAttribute(idx);
+
+        if (selectClauseComposer.isNumericAggregator(idx)) {
+            AtlasVertex minV = Collections.min(vertices, new VertexPropertyComparator(propertyName));
+            return minV.getProperty(propertyName, Object.class);
+        } else {
+            return Collections.min(vertices.stream()
+                    .map(v -> v.getProperty(propertyName, String.class))
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList()), String.CASE_INSENSITIVE_ORDER);
+        }
+    }
+
+    static class VertexPropertyComparator implements Comparator<AtlasVertex> {
+        private String propertyName;
+
+        public VertexPropertyComparator(final String propertyName) {
+            this.propertyName = propertyName;
+        }
+
+        @Override
+        public int compare(final AtlasVertex o1, final AtlasVertex o2) {
+            Object p1 = o1 == null ? null : o1.getProperty(propertyName, Object.class);
+            Object p2 = o2 == null ? null : o2.getProperty(propertyName, Object.class);
+
+            if (p1 == null && p2 == null) {
+                return 0;
+            } else  if (p1 == null) {
+                return -1;
+            } else if (p2 == null) {
+                return 1;
+            }
+
+            if (p1 instanceof String && p2 instanceof String) {
+                return ((String) p1).compareTo((String) p2);
+            }
+            if (p1 instanceof Byte && p2 instanceof Byte) {
+                return ((Byte) p1).compareTo((Byte) p2);
+            }
+            if (p1 instanceof Short && p2 instanceof Short) {
+                return ((Short) p1).compareTo((Short) p2);
+            }
+            if (p1 instanceof Integer && p2 instanceof Integer) {
+                return ((Integer) p1).compareTo((Integer) p2);
+            }
+            if (p1 instanceof Float && p2 instanceof Float) {
+                return ((Float) p1).compareTo((Float) p2);
+            }
+            if (p1 instanceof Double && p2 instanceof Double) {
+                return ((Double) p1).compareTo((Double) p2);
+            }
+            if (p1 instanceof Long && p2 instanceof Long) {
+                return ((Long) p1).compareTo((Long) p2);
+            }
+            if (p1 instanceof BigInteger && p2 instanceof BigInteger) {
+                return ((BigInteger) p1).compareTo((BigInteger) p2);
+            }
+            if (p1 instanceof BigDecimal && p2 instanceof BigDecimal) {
+                return ((BigDecimal) p1).compareTo((BigDecimal) p2);
+            }
+
+            return 0;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/query/executors/TraversalBasedExecutor.java b/repository/src/main/java/org/apache/atlas/query/executors/TraversalBasedExecutor.java
new file mode 100644
index 0000000..a61d1d0
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/query/executors/TraversalBasedExecutor.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query.executors;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.query.AtlasDSL;
+import org.apache.atlas.query.GremlinQuery;
+import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class TraversalBasedExecutor implements DSLQueryExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(TraversalBasedExecutor.class);
+    private static final String DSL_KEYWORD_LIMIT             = "limit";
+    private static final String DSL_KEYWORD_OFFSET            = "offset";
+    private static final String DEFAULT_LIMIT_OFFSET_TEMPLATE = " limit %d offset %d";
+    private static final String CLAUSE_OFFSET_ZERO            = " offset 0";
+
+    private final AtlasTypeRegistry     typeRegistry;
+    private final AtlasGraph            graph;
+    private final EntityGraphRetriever  entityRetriever;
+
+    public TraversalBasedExecutor(AtlasTypeRegistry typeRegistry, AtlasGraph graph, EntityGraphRetriever entityRetriever) {
+        this.typeRegistry    = typeRegistry;
+        this.graph           = graph;
+        this.entityRetriever = entityRetriever;
+    }
+
+    @Override
+    public AtlasSearchResult execute(String dslQuery, int limit, int offset) throws AtlasBaseException {
+        AtlasSearchResult ret = new AtlasSearchResult(dslQuery, AtlasSearchResult.AtlasQueryType.DSL);
+        GremlinQuery gremlinQuery = toTraversal(dslQuery, limit, offset);
+        AtlasGraphTraversal<AtlasVertex, AtlasEdge> graphTraversal = gremlinQuery.getTraversal();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Executing DSL: query={}, gremlinQuery={}", dslQuery, graphTraversal.toString());
+        }
+
+        List<AtlasVertex> resultList = graphTraversal.getAtlasVertexList();
+        return (CollectionUtils.isNotEmpty(resultList))
+                ? getSearchResult(ret, gremlinQuery, resultList)
+                : getSearchResult(ret, gremlinQuery, graphTraversal.getAtlasVertexMap());
+    }
+
+    private AtlasSearchResult getSearchResult(AtlasSearchResult ret, GremlinQuery gremlinQuery, List<AtlasVertex> resultList) throws AtlasBaseException {
+        return gremlinQuery.hasValidSelectClause()
+                ? SelectClauseProjections.usingList(gremlinQuery, entityRetriever, resultList)
+                : processVertices(ret, resultList);
+    }
+
+    private AtlasSearchResult getSearchResult(AtlasSearchResult ret, GremlinQuery gremlinQuery, Map<String, Collection<AtlasVertex>> resultMap) throws AtlasBaseException {
+        if (MapUtils.isEmpty(resultMap)) {
+            return ret;
+        }
+
+        if (gremlinQuery.hasValidSelectClause()) {
+            return SelectClauseProjections.usingMap(gremlinQuery, entityRetriever, resultMap);
+        }
+
+        for (Collection<AtlasVertex> vertices : resultMap.values()) {
+            processVertices(ret, vertices);
+        }
+
+        return ret;
+    }
+
+    private AtlasSearchResult processVertices(final AtlasSearchResult ret, final Collection<AtlasVertex> vertices) throws AtlasBaseException {
+        for (AtlasVertex vertex : vertices) {
+            if (vertex == null) {
+                continue;
+            }
+
+            ret.addEntity(entityRetriever.toAtlasEntityHeaderWithClassifications(vertex));
+        }
+
+        return ret;
+    }
+
+    private GremlinQuery toTraversal(String query, int limit, int offset) throws AtlasBaseException {
+        QueryParams params = QueryParams.getNormalizedParams(limit, offset);
+        query = getStringWithLimitOffset(query, params);
+
+        AtlasDSL.Translator dslTranslator = new AtlasDSL.Translator(query, typeRegistry, params.offset(), params.limit());
+        GremlinQuery gremlinQuery = dslTranslator.translate();
+
+        AtlasGraphTraversal result = GremlinClauseToTraversalTranslator.run(this.graph, gremlinQuery.getClauses());
+        gremlinQuery.setResult(result);
+
+        return gremlinQuery;
+    }
+
+    private String getStringWithLimitOffset(String query, QueryParams params) {
+        if (!query.contains(DSL_KEYWORD_LIMIT) && !query.contains(DSL_KEYWORD_OFFSET)) {
+            query += String.format(DEFAULT_LIMIT_OFFSET_TEMPLATE, params.limit(), params.offset());
+        }
+
+        if (query.contains(DSL_KEYWORD_LIMIT) && !query.contains(DSL_KEYWORD_OFFSET)) {
+            query += CLAUSE_OFFSET_ZERO;
+        }
+
+        return query;
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/query/BaseDSLComposer.java b/repository/src/test/java/org/apache/atlas/query/BaseDSLComposer.java
new file mode 100644
index 0000000..0044eac
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/query/BaseDSLComposer.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.query.antlr4.AtlasDSLParser;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.lang.StringUtils;
+
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.FileAssert.fail;
+
+public class BaseDSLComposer {
+    protected AtlasTypeRegistry registry = mock(AtlasTypeRegistry.class);
+
+    protected AtlasDSLParser.QueryContext getParsedQuery(String query) {
+        AtlasDSLParser.QueryContext queryContext = null;
+        try {
+            queryContext = AtlasDSL.Parser.parse(query);
+        } catch (AtlasBaseException e) {
+            fail(e.getMessage());
+        }
+        return queryContext;
+    }
+
+    public static class TestLookup implements org.apache.atlas.query.Lookup {
+        AtlasTypeRegistry registry;
+
+        TestLookup(AtlasTypeRegistry typeRegistry) {
+            this.registry = typeRegistry;
+        }
+
+        @Override
+        public AtlasType getType(String typeName) throws AtlasBaseException {
+            AtlasType type;
+            if(typeName.equals("PII") || typeName.equals("Dimension")) {
+                type = mock(AtlasType.class);
+                when(type.getTypeCategory()).thenReturn(TypeCategory.CLASSIFICATION);
+            } else {
+                type = mock(AtlasEntityType.class);
+                when(type.getTypeCategory()).thenReturn(TypeCategory.ENTITY);
+
+                AtlasStructType.AtlasAttribute attr = mock(AtlasStructType.AtlasAttribute.class);
+                AtlasStructDef.AtlasAttributeDef def = mock(AtlasStructDef.AtlasAttributeDef.class);
+                when(def.getIndexType()).thenReturn(AtlasStructDef.AtlasAttributeDef.IndexType.DEFAULT);
+                when(attr.getAttributeDef()).thenReturn(def);
+
+                AtlasStructType.AtlasAttribute attr_s = mock(AtlasStructType.AtlasAttribute.class);
+                AtlasStructDef.AtlasAttributeDef def_s = mock(AtlasStructDef.AtlasAttributeDef.class);
+                when(def_s.getIndexType()).thenReturn(AtlasStructDef.AtlasAttributeDef.IndexType.STRING);
+
+                when(attr_s.getAttributeDef()).thenReturn(def_s);
+
+                when(((AtlasEntityType) type).getAttribute(anyString())).thenReturn(attr);
+                when(((AtlasEntityType) type).getAttribute(eq("name"))).thenReturn(attr_s);
+
+            }
+
+            if(typeName.equals("PIII")) {
+                throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND);
+            }
+            when(type.getTypeName()).thenReturn(typeName);
+            return type;
+        }
+
+        @Override
+        public String getQualifiedName(GremlinQueryComposer.Context context, String name) throws AtlasBaseException {
+            if(name.startsWith("__")) {
+                return name.equals("__state") || name.equals("__guid") ? name : "";
+            }
+
+            if(!hasAttribute(context, name)) {
+                throw new AtlasBaseException("Invalid attribute");
+            }
+
+            if(name.contains("."))
+                return name;
+
+            if(!context.getActiveTypeName().equals(name))
+                return String.format("%s.%s", context.getActiveTypeName(), name);
+            else
+                return name;
+        }
+
+        @Override
+        public boolean isPrimitive(GremlinQueryComposer.Context context, String attributeName) {
+            return attributeName.equals("name") ||
+                    attributeName.equals("owner") ||
+                    attributeName.equals("createTime") ||
+                    attributeName.equals("clusterName") ||
+                    attributeName.equals("__guid") ||
+                    attributeName.equals("__state") ||
+                    attributeName.equals("partitionSize");
+        }
+
+        @Override
+        public String getRelationshipEdgeLabel(GremlinQueryComposer.Context context, String attributeName) {
+            if (attributeName.equalsIgnoreCase("columns"))
+                return "__Table.columns";
+            if (attributeName.equalsIgnoreCase("db"))
+                return "__Table.db";
+            if (attributeName.equalsIgnoreCase("meanings"))
+                return "r:AtlasGlossarySemanticAssignment";
+            else
+                return "__DB.Table";
+        }
+
+        @Override
+        public AtlasRelationshipEdgeDirection getRelationshipEdgeDirection(GremlinQueryComposer.Context context, String attributeName) {
+            if (attributeName.equalsIgnoreCase("meanings")){
+                return IN;
+            }
+            return OUT;
+        }
+
+        @Override
+        public boolean hasAttribute(GremlinQueryComposer.Context context, String attributeName) {
+            return (context.getActiveTypeName().equals("Table") && attributeName.equals("db")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("columns")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("createTime")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("name")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("owner")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("clusterName")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("isFile")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("__guid")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("__state")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("partitionSize")) ||
+                    (context.getActiveTypeName().equals("Table") && attributeName.equals("meanings")) ||
+                    (context.getActiveTypeName().equals("hive_db") && attributeName.equals("name")) ||
+                    (context.getActiveTypeName().equals("hive_db") && attributeName.equals("owner")) ||
+                    (context.getActiveTypeName().equals("hive_db") && attributeName.equals("createTime")) ||
+                    (context.getActiveTypeName().equals("DB") && attributeName.equals("name")) ||
+                    (context.getActiveTypeName().equals("DB") && attributeName.equals("owner")) ||
+                    (context.getActiveTypeName().equals("DB") && attributeName.equals("clusterName")) ||
+                    (context.getActiveTypeName().equals("Asset") && attributeName.equals("name")) ||
+                    (context.getActiveTypeName().equals("Asset") && attributeName.equals("owner")) ||
+                    (context.getActiveTypeName().equals("AtlasGlossaryTerm") && attributeName.equals("name")) ||
+                    (context.getActiveTypeName().equals("AtlasGlossaryTerm") && attributeName.equals("qualifiedName"));
+        }
+
+        @Override
+        public boolean doesTypeHaveSubTypes(GremlinQueryComposer.Context context) {
+            return context.getActiveTypeName().equalsIgnoreCase("Asset");
+        }
+
+        @Override
+        public String getTypeAndSubTypes(GremlinQueryComposer.Context context) {
+            String[] str = new String[]{"'Asset'", "'Table'"};
+            return StringUtils.join(str, ",");
+        }
+
+        @Override
+        public boolean isTraitType(String typeName) {
+            return typeName.equals("PII") || typeName.equals("Dimension");
+        }
+
+        @Override
+        public String getTypeFromEdge(GremlinQueryComposer.Context context, String item) {
+            if(context.getActiveTypeName().equals("DB") && item.equals("Table")) {
+                return "Table";
+            }
+
+            if(context.getActiveTypeName().equals("Table") && item.equals("Column")) {
+                return "Column";
+            }
+
+            if(context.getActiveTypeName().equals("Table") && item.equals("db")) {
+                return "DB";
+            }
+
+            if(context.getActiveTypeName().equals("Table") && item.equals("columns")) {
+                return "Column";
+            }
+
+            if(context.getActiveTypeName().equals("Table") && item.equals("meanings")) {
+                return "AtlasGlossaryTerm";
+            }
+
+            if(context.getActiveTypeName().equals(item)) {
+                return null;
+            }
+            return context.getActiveTypeName();
+        }
+
+        @Override
+        public boolean isDate(GremlinQueryComposer.Context context, String attributeName) {
+            return attributeName.equals("createTime");
+        }
+
+        @Override
+        public boolean isNumeric(GremlinQueryComposer.Context context, String attrName) {
+            context.setNumericTypeFormatter("f");
+            return attrName.equals("partitionSize");
+        }
+
+        @Override
+        public String getVertexPropertyName(String typeName, String attrName) {
+            if (typeName.equals("Asset")) {
+                if (attrName.equals("name") || attrName.equals("owner")) {
+                    return String.format("%s.__s_%s", typeName, attrName);
+                }
+            }
+
+            return null;
+        }
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/query/DSLQueriesTest.java b/repository/src/test/java/org/apache/atlas/query/DSLQueriesTest.java
index 958c428..0b01894 100644
--- a/repository/src/test/java/org/apache/atlas/query/DSLQueriesTest.java
+++ b/repository/src/test/java/org/apache/atlas/query/DSLQueriesTest.java
@@ -22,6 +22,7 @@
 import org.apache.atlas.discovery.EntityDiscoveryService;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
@@ -35,9 +36,13 @@
 
 import javax.inject.Inject;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -191,7 +196,6 @@
 
     @Test(dataProvider = "comparisonQueriesProvider")
     public void comparison(String query, int expected) throws AtlasBaseException {
-        LOG.debug(query);
         AtlasSearchResult searchResult = discoveryService.searchUsingDslQuery(query, DEFAULT_LIMIT, 0);
         assertSearchResult(searchResult, expected, query);
 
@@ -202,23 +206,27 @@
     @DataProvider(name = "glossaryTermQueries")
     private Object[][] glossaryTermQueries() {
         return new Object[][]{
-                {"hive_table hasTerm modernTrade",2},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\"",2},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" where hive_table.name = \"time_dim\"",1},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" select name",2},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" limit 1",1},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" or hive_table hasTerm \"ecommerce@salesGlossary\"",3},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" and hive_table isA Dimension",1},
-                {"hive_table hasTerm \"modernTrade@salesGlossary\" and db.name = \"Sales\" or (hive_table.name = \"sales_fact_monthly_mv\")",2},
-                {"hive_table where hive_table hasTerm \"modernTrade@salesGlossary\"",2},
-                {"hive_table where (name = \"product_dim\" and hive_table hasTerm \"ecommerce@salesGlossary\")",1}
+                {"hive_table hasTerm modernTrade", 2, new ListValidator("logging_fact_monthly_mv", "time_dim")},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\"", 2, new ListValidator("logging_fact_monthly_mv", "time_dim")},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" where hive_table.name = \"time_dim\"", 1, new ListValidator("time_dim")},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" select name", 2, null},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" limit 1", 1, null},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" or hive_table hasTerm \"ecommerce@salesGlossary\"", 3, new ListValidator("logging_fact_monthly_mv", "time_dim", "product_dim")},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" and hive_table isA Dimension",1, new ListValidator( "time_dim")},
+                {"hive_table hasTerm \"modernTrade@salesGlossary\" and db.name = \"Sales\" or (hive_table.name = \"sales_fact_monthly_mv\")", 2, new ListValidator("sales_fact_monthly_mv", "time_dim")},
+                {"hive_table where hive_table hasTerm \"modernTrade@salesGlossary\"", 2, new ListValidator("logging_fact_monthly_mv", "time_dim")},
+                {"hive_table where (name = \"product_dim\" and hive_table hasTerm \"ecommerce@salesGlossary\")", 1, new ListValidator("product_dim")}
         };
     }
 
     @Test(dataProvider = "glossaryTermQueries")
-    public void glossaryTerm(String query, int expected) throws AtlasBaseException {
-        queryAssert(query, expected, DEFAULT_LIMIT, 0);
-        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+    public void glossaryTerm(String query, int expected, ListValidator lvExpected) throws AtlasBaseException {
+        AtlasSearchResult result = queryAssert(query, expected, DEFAULT_LIMIT, 0);
+        if (lvExpected == null) {
+            return;
+        }
+
+        ListValidator.assertLv(ListValidator.from(result), lvExpected);
     }
 
     @DataProvider(name = "basicProvider")
@@ -231,6 +239,7 @@
                 {"hive_db as d select d", 3},
                 {"hive_db where hive_db.name=\"Reporting\"", 1},
                 {"hive_db where hive_db.name=\"Reporting\" select name, owner", 1},
+                {"hive_column where name='time_id' select name", 1},
                 {"hive_db has name", 3},
                 {"from hive_table", 10},
                 {"hive_table", 10},
@@ -252,6 +261,7 @@
                 {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1", 1},
                 {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1", 1},
                 {"hive_table where (name = \"sales_fact\" and db.name = \"Sales\") select name, createTime", 1},
+                {"hive_table where (name = \"time_dim\" and db.name = \"Sales\") or (name = \"sales_fact\" and db.name = \"Sales\") select name, createTime", 2},
                 {"Dimension", 9},
                 {"JdbcAccess", 2},
                 {"ETL", 10},
@@ -259,14 +269,16 @@
                 {"PII", 4},
                 {"`Log Data`", 4},
                 {"DataSet where name='sales_fact'", 1},
-                {"Asset where name='sales_fact'", 1}
+                {"Asset where name='sales_fact'", 1},
+                {"hive_db _NOT_CLASSIFIED", 3},
+                {"_CLASSIFIED", 23}
         };
     }
 
     @Test(dataProvider = "basicProvider")
     public void basic(String query, int expected) throws AtlasBaseException {
         queryAssert(query, expected, DEFAULT_LIMIT, 0);
-        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+//        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
     }
 
     @DataProvider(name = "systemAttributesProvider")
@@ -277,6 +289,8 @@
                 {"hive_db as d where d.__state = 'ACTIVE'", 3},
                 {"hive_db select __guid", 3},
                 {"hive_db where __state = 'ACTIVE' select name, __guid, __state", 3},
+                {"hive_db where __isIncomplete=true", 0},
+                {"hive_db where __isIncomplete=false", 3},
         };
     }
 
@@ -285,9 +299,10 @@
         queryAssert(query, expected, DEFAULT_LIMIT, 0);
     }
 
-    private void queryAssert(String query, final int expected, final int limit, final int offset) throws AtlasBaseException {
+    private AtlasSearchResult queryAssert(String query, final int expected, final int limit, final int offset) throws AtlasBaseException {
         AtlasSearchResult searchResult = discoveryService.searchUsingDslQuery(query, limit, offset);
         assertSearchResult(searchResult, expected, query);
+        return searchResult;
     }
 
     @DataProvider(name = "limitProvider")
@@ -394,33 +409,33 @@
                 {"hive_db as d where owner = ['John ETL', 'Jane BI']", 2},
                 {"hive_db as d where owner = ['John ETL', 'Jane BI'] limit 10", 2},
                 {"hive_db as d where owner = ['John ETL', 'Jane BI'] limit 10 offset 1", 1},
+                {"hive_db where (name='Reporting' or ((name='Logging' and owner = 'Jane BI') and (name='Logging' and owner = 'John ETL')))", 1}
         };
     }
 
     @Test(dataProvider = "syntaxProvider")
     public void syntax(String query, int expected) throws AtlasBaseException {
-        LOG.debug(query);
         queryAssert(query, expected, DEFAULT_LIMIT, 0);
-        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+//        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
     }
 
     @DataProvider(name = "orderByProvider")
     private Object[][] orderByQueries() {
         return new Object[][]{
                 {"from hive_db as h orderby h.owner limit 3", 3, "owner", true},
-                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName ", 17, "c.qualifiedName", true},
-                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "c.qualifiedName", true},
-                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "c.qualifiedName", false},
+                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName ", 17, "qualifiedName", true},
+                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "qualifiedName", true},
+                {"hive_column as c select c.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "qualifiedName", false},
 
                 {"from hive_db orderby hive_db.owner limit 3", 3, "owner", true},
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName ", 17, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "hive_column.qualifiedName", false},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName ", 17, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "qualifiedName", false},
 
                 {"from hive_db orderby owner limit 3", 3, "owner", true},
-                {"hive_column select hive_column.qualifiedName orderby qualifiedName ", 17, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby qualifiedName limit 5", 5, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby qualifiedName desc limit 5", 5, "hive_column.qualifiedName", false},
+                {"hive_column select hive_column.qualifiedName orderby qualifiedName ", 17, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby qualifiedName limit 5", 5, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby qualifiedName desc limit 5", 5, "qualifiedName", false},
 
                 {"from hive_db orderby hive_db.owner limit 3", 3, "owner", true},
                 {"hive_db where hive_db.name=\"Reporting\" orderby owner", 1, "owner", true},
@@ -429,9 +444,8 @@
                 {"hive_db where hive_db.name=\"Reporting\" select name, owner orderby hive_db.name ", 1, "name", true},
                 {"hive_db has name orderby hive_db.owner limit 10 offset 0", 3, "owner", true},
 
-                {"from hive_table select hive_table.owner orderby hive_table.owner", 10, "hive_table.owner", true},
-                {"from hive_table select hive_table.owner orderby hive_table.owner limit 8", 8, "hive_table.owner", true},
-
+                {"from hive_table select hive_table.owner orderby hive_table.owner", 10, "owner", true},
+                {"from hive_table select hive_table.owner orderby hive_table.owner limit 8", 8, "owner", true},
                 {"hive_table orderby hive_table.name", 10, "name", true},
 
                 {"hive_table orderby hive_table.owner", 10, "owner", true},
@@ -439,11 +453,10 @@
                 {"hive_table orderby hive_table.owner limit 8 offset 0", 8, "owner", true},
                 {"hive_table orderby hive_table.owner desc limit 8 offset 0", 8, "owner", false},
 
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName ", 17, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "hive_column.qualifiedName", true},
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "hive_column.qualifiedName", false},
-
-                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5 offset 2", 5, "hive_column.qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName ", 17, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5", 5, "qualifiedName", true},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName desc limit 5", 5, "qualifiedName", false},
+                {"hive_column select hive_column.qualifiedName orderby hive_column.qualifiedName limit 5 offset 2", 5, "qualifiedName", true},
 
                 {"hive_column select qualifiedName orderby hive_column.qualifiedName", 17, "qualifiedName", true},
                 {"hive_column select qualifiedName orderby hive_column.qualifiedName limit 5", 5, "qualifiedName", true},
@@ -465,168 +478,180 @@
                 {"hive_db where hive_db has name orderby hive_db.owner limit 2 offset 0", 2, "owner", true},
                 {"hive_db where hive_db has name orderby hive_db.owner limit 2 offset 1", 2, "owner", true},
 
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime ", 1, "_col_1", true},
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 ", 1, "_col_1", true},
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 offset 0", 1, "_col_1", true},
-                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 offset 5", 0, "_col_1", true},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime ", 1, "createTime", true},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 ", 1, "createTime", true},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 offset 0", 1, "createTime", true},
+                {"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 orderby createTime limit 10 offset 5", 0, "createTime", true},
 
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name ", 1, "_col_0", true},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 0", 1, "_col_0", true},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 1", 0, "_col_0", true},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10", 1, "_col_0", true},
-                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 0 offset 1", 0, "_col_0", true},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name ", 1, "name", true},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 0", 1, "name", true},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10 offset 1", 0, "name", true},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 10", 1, "name", true},
+                {"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 orderby name limit 0 offset 1", 0, "name", true},
         };
     }
 
     @Test(dataProvider = "orderByProvider")
-    public void orderBy(String query, int expected, String orderBy, boolean ascending) throws AtlasBaseException {
-        LOG.debug(query);
-        queryAssert(query, expected, DEFAULT_LIMIT, 0);
-        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+    public void orderBy(String query, int expected, String attributeName, boolean ascending) throws AtlasBaseException {
+        AtlasSearchResult  searchResult = queryAssert(query, expected, DEFAULT_LIMIT, 0);
+        assertSortOrder(query, attributeName, ascending, searchResult.getEntities());
+
+        searchResult = queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+        assertSortOrder(query, attributeName, ascending, searchResult.getEntities());
+    }
+
+    private void assertSortOrder(String query, String attributeName, boolean ascending, List<AtlasEntityHeader> entities) {
+        if (entities == null) {
+            return;
+        }
+
+        AtlasEntityHeader prev = null;
+        for (AtlasEntityHeader current : entities) {
+            if (prev != null && current.hasAttribute(attributeName)) {
+                String lhs = (String) prev.getAttribute(attributeName);
+                String rhs = (String) current.getAttribute(attributeName);
+                int compResult = lhs.compareTo(rhs);
+                if (ascending) {
+                    assertTrue(compResult <= 0, query);
+                }
+                else {
+                    assertTrue(compResult >= 0, query);
+                }
+            }
+
+            prev = current;
+        }
     }
 
     @DataProvider(name = "likeQueriesProvider")
     private Object[][] likeQueries() {
         return new Object[][]{
-                {"hive_table qualifiedName like \"*time_dim*\"", 1},
-                {"hive_db where qualifiedName like \"qualified:R*\"", 1},
-                {"hive_table db.name=\"Sales\"", 4},
-                {"hive_table qualifiedName =\"Sales.time_dim\" AND db.name=\"Sales\"", 1},
-                {"hive_table qualifiedName like \"*time_dim*\" AND db.name=\"Sales\"", 1},
-                {"hive_table where name like \"sa?es*\"", 3},
-                {"hive_db where name like \"R*\"", 1},
-                {"hive_db where hive_db.name like \"R???rt?*\" or hive_db.name like \"S?l?s\" or hive_db.name like\"Log*\"", 3},
-                {"hive_db where hive_db.name like \"R???rt?*\" and hive_db.name like \"S?l?s\" and hive_db.name like\"Log*\"", 0},
-                {"hive_table where name like 'sales*' and db.name like 'Sa?es'", 1},
-                {"hive_table where db.name like \"Sa*\"", 4},
-                {"hive_table where db.name like \"Sa*\" and name like \"*dim\"", 3},
+                {"hive_table qualifiedName like \"*time_dim*\"", 1, new ListValidator("time_dim")},
+                {"hive_db where qualifiedName like \"qualified:R*\"", 1, new ListValidator("Reporting")},
+                {"hive_table db.name=\"Sales\"", 4, new ListValidator("customer_dim", "sales_fact", "time_dim", "product_dim")},
+                {"hive_table qualifiedName =\"Sales.time_dim\" AND db.name=\"Sales\"", 1, new ListValidator("time_dim")},
+                {"hive_table qualifiedName like \"*time_dim*\" AND db.name=\"Sales\"", 1, new ListValidator("time_dim")},
+                {"hive_table where name like \"sa?es*\"", 3, new ListValidator("sales_fact", "sales_fact_daily_mv", "sales_fact_monthly_mv")},
+                {"hive_db where name like \"R*\"", 1, new ListValidator("Reporting")},
+                {"hive_db where hive_db.name like \"R???rt?*\" or hive_db.name like \"S?l?s\" or hive_db.name like\"Log*\"", 3, new ListValidator("Reporting", "Sales", "Logging") },
+                {"hive_db where hive_db.name like \"R???rt?*\" and hive_db.name like \"S?l?s\" and hive_db.name like\"Log*\"", 0, new ListValidator()},
+                {"hive_table where name like 'sales*' and db.name like 'Sa?es'", 1, new ListValidator("sales_fact")},
+                {"hive_table where db.name like \"Sa*\"", 4, new ListValidator("customer_dim", "sales_fact", "time_dim", "product_dim")},
+                {"hive_table where db.name like \"Sa*\" and name like \"*dim\"", 3, new ListValidator("customer_dim", "product_dim", "time_dim")},
         };
     }
 
     @Test(dataProvider = "likeQueriesProvider")
-    public void likeQueries(String query, int expected) throws AtlasBaseException {
-        LOG.debug(query);
-        queryAssert(query, expected, DEFAULT_LIMIT, 0);
-        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0);
+    public void likeQueries(String query, int expected, ListValidator lv) throws AtlasBaseException {
+        queryAssert(query, expected, DEFAULT_LIMIT, 0, lv);
+        queryAssert(query.replace("where", " "), expected, DEFAULT_LIMIT, 0, lv);
+    }
+
+    private void queryAssert(String query, int expectedCount, int limit, int offset, ListValidator expected) throws AtlasBaseException {
+        AtlasSearchResult result = queryAssert(query, expectedCount, limit, offset);
+
+        ListValidator.assertLv(ListValidator.from(result), expected);
     }
 
     @DataProvider(name = "minMaxCountProvider")
     private Object[][] minMaxCountQueries() {
         return new Object[][]{
+                {"from hive_db select max(name), min(name)",
+                        new TableValidator("max(name)", "min(name)")
+                                .row("Sales", "Logging")},
                 {"from hive_db groupby (owner) select count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("count()")
-                                .withExpectedValues(1)
-                                .withExpectedValues(1)
-                                .withExpectedValues(1) },
-                { "from hive_db groupby (owner) select owner, name orderby owner",
-                        new FieldValueValidator()
-                                .withFieldNames("owner", "name")
-                                .withExpectedValues("Jane BI", "Reporting")
-                                .withExpectedValues("John ETL", "Sales")
-                                .withExpectedValues("Tim ETL", "Logging") },
-                { "from hive_db groupby (owner) select Asset.owner, Asset.name, count()",
-                        new FieldValueValidator()
-                                .withFieldNames("Asset.owner", "Asset.name", "count()")
-                                .withExpectedValues("Jane BI", "Reporting", 1)
-                                .withExpectedValues("Tim ETL", "Logging", 1)
-                                .withExpectedValues("John ETL", "Sales", 1) },
-                { "from hive_db groupby (owner) select count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("count()").
-                                withExpectedValues(1).
-                                withExpectedValues(1).
-                                withExpectedValues(1) },
-                { "from hive_db groupby (owner) select Asset.owner, count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("Asset.owner", "count()")
-                                .withExpectedValues("Jane BI", 1)
-                                .withExpectedValues("Tim ETL", 1)
-                                .withExpectedValues("John ETL", 1) },
-                { "from hive_db groupby (owner) select count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("count()")
-                                .withExpectedValues(1)
-                                .withExpectedValues(1)
-                                .withExpectedValues(1) },
+                        new TableValidator("count()")
+                                .row(1)
+                                .row(1)
+                                .row(1)},
+                {"from hive_db groupby (owner) select owner, name orderby owner",
+                        new TableValidator("owner", "name")
+                                .row("Jane BI", "Reporting")
+                                .row("John ETL", "Sales")
+                                .row("Tim ETL", "Logging")},
+                {"from hive_db groupby (owner) select Asset.owner, Asset.name, count()",
+                        new TableValidator("Asset.owner", "Asset.name", "count()")
+                                .row("Jane BI", "Reporting", 1)
+                                .row("Tim ETL", "Logging", 1)
+                                .row("John ETL", "Sales", 1)},
+                {"from hive_db groupby (owner) select count() ",
+                        new TableValidator("count()").
+                                row(1).
+                                row(1).
+                                row(1)},
+                {"from hive_db groupby (owner) select Asset.owner, count() ",
+                        new TableValidator("Asset.owner", "count()")
+                                .row("Jane BI", 1)
+                                .row("Tim ETL", 1)
+                                .row("John ETL", 1)},
+                {"from hive_db groupby (owner) select count() ",
+                        new TableValidator("count()")
+                                .row(1)
+                                .row(1)
+                                .row(1)},
+                {"from hive_db groupby (owner) select Asset.owner, count() ",
+                        new TableValidator("Asset.owner", "count()")
+                                .row("Jane BI", 1)
+                                .row("Tim ETL", 1)
+                                .row("John ETL", 1)},
 
-                { "from hive_db groupby (owner) select Asset.owner, count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("Asset.owner", "count()")
-                                .withExpectedValues("Jane BI", 1)
-                                .withExpectedValues("Tim ETL", 1)
-                                .withExpectedValues("John ETL", 1) },
+                {"from hive_db groupby (owner) select Asset.owner, max(Asset.name) ",
+                        new TableValidator("Asset.owner", "max(Asset.name)")
+                                .row("Tim ETL", "Logging")
+                                .row("Jane BI", "Reporting")
+                                .row("John ETL", "Sales")},
 
-                { "from hive_db groupby (owner) select Asset.owner, max(Asset.name) ",
-                        new FieldValueValidator()
-                                .withFieldNames("Asset.owner", "max(Asset.name)")
-                                .withExpectedValues("Tim ETL", "Logging")
-                                .withExpectedValues("Jane BI", "Reporting")
-                                .withExpectedValues("John ETL", "Sales") },
+                {"from hive_db groupby (owner) select max(Asset.name) ",
+                        new TableValidator("max(Asset.name)")
+                                .row("Logging")
+                                .row("Reporting")
+                                .row("Sales")},
 
-                { "from hive_db groupby (owner) select max(Asset.name) ",
-                        new FieldValueValidator()
-                                .withFieldNames("max(Asset.name)")
-                                .withExpectedValues("Logging")
-                                .withExpectedValues("Reporting")
-                                .withExpectedValues("Sales") },
+                {"from hive_db groupby (owner) select owner, Asset.name, min(Asset.name)  ",
+                        new TableValidator("owner", "Asset.name", "min(Asset.name)")
+                                .row("Tim ETL", "Logging", "Logging")
+                                .row("Jane BI", "Reporting", "Reporting")
+                                .row("John ETL", "Sales", "Sales")},
 
-                { "from hive_db groupby (owner) select owner, Asset.name, min(Asset.name)  ",
-                        new FieldValueValidator()
-                                .withFieldNames("owner", "Asset.name", "min(Asset.name)")
-                                .withExpectedValues("Tim ETL", "Logging", "Logging")
-                                .withExpectedValues("Jane BI", "Reporting", "Reporting")
-                                .withExpectedValues("John ETL", "Sales", "Sales") },
+                {"from hive_db groupby (owner) select owner, min(Asset.name)  ",
+                        new TableValidator("owner", "min(Asset.name)")
+                                .row("Tim ETL", "Logging")
+                                .row("Jane BI", "Reporting")
+                                .row("John ETL", "Sales")},
 
-                { "from hive_db groupby (owner) select owner, min(Asset.name)  ",
-                        new FieldValueValidator()
-                                .withFieldNames("owner", "min(Asset.name)")
-                                .withExpectedValues("Tim ETL", "Logging")
-                                .withExpectedValues("Jane BI", "Reporting")
-                                .withExpectedValues("John ETL", "Sales") },
-
-                { "from hive_db groupby (owner) select min(name)  ",
-                        new FieldValueValidator()
-                                .withFieldNames("min(name)")
-                                .withExpectedValues("Reporting")
-                                .withExpectedValues("Logging")
-                                .withExpectedValues("Sales") },
-                { "from hive_db groupby (owner) select min('name') ",
-                        new FieldValueValidator()
-                                .withFieldNames("min('name')")
-                                .withExpectedValues("name")
-                                .withExpectedValues("name")
-                                .withExpectedValues("name") },
-                { "from hive_db select count() ",
-                        new FieldValueValidator()
-                                .withFieldNames("count()")
-                                .withExpectedValues(3) },
-                { "from Person select count() as 'count', max(Person.age) as 'max', min(Person.age) as 'min'",
-                        new FieldValueValidator()
-                                .withFieldNames("'count'", "'max'", "'min'")
-                                .withExpectedValues(50, 0, 4) },
-                { "from Person select count() as 'count', sum(Person.age) as 'sum'",
-                        new FieldValueValidator()
-                                .withFieldNames("'count'", "'sum'")
-                                .withExpectedValues(4, 86) },
-//                { "from hive_db groupby (owner) select min(name) orderby name limit 2 ",
-//                        new FieldValueValidator()
-//                                .withFieldNames("min(name)")
-//                                .withExpectedValues("Logging")
-//                                .withExpectedValues("Reporting") },
-//                { "from hive_db groupby (owner) select min(name) orderby name desc limit 2 ",
-//                        new FieldValueValidator()
-//                                .withFieldNames("min(name)")
-//                                .withExpectedValues("Reporting")
-//                                .withExpectedValues("Sales") }
+                {"from hive_db groupby (owner) select min(name)  ",
+                        new TableValidator("min(name)")
+                                .row("Reporting")
+                                .row("Logging")
+                                .row("Sales")},
+                {"from hive_db groupby (owner) select min('name') ",
+                        new TableValidator("min('name')")
+                                .row("Reporting")
+                                .row("Logging")
+                                .row("Sales")},
+                {"from hive_db select count() ",
+                        new TableValidator("count()")
+                                .row(3)},
+                {"from Person select count() as 'count', max(Person.age) as 'max', min(Person.age) as 'min'",
+                        new TableValidator("'count'", "'max'", "'min'")
+                                .row(4, 50.0f, 0.0f)},
+                {"from Person select count() as 'count', sum(Person.age) as 'sum'",
+                        new TableValidator("'count'", "'sum'")
+                                .row(4, 86.0)},
+                {"from Asset where __isIncomplete = false groupby (__typeName)  select __typeName, count()",
+                        new TableValidator("__typeName", "count()")
+                                .row("Asset", 1)
+                                .row("hive_table", 10)
+                                .row("hive_column", 17)
+                                .row("hive_db", 3)
+                                .row("hive_process", 7)
+                }
         };
     }
 
     @Test(dataProvider = "minMaxCountProvider")
-    public void minMaxCount(String query, FieldValueValidator fv) throws AtlasBaseException {
-        LOG.debug(query);
+    public void minMaxCount(String query, TableValidator fv) throws AtlasBaseException {
         queryAssert(query, fv);
-        queryAssert(query.replace("where", " "), fv);
     }
 
     @DataProvider(name = "errorQueriesProvider")
@@ -668,22 +693,12 @@
         discoveryService.searchUsingDslQuery(query, DEFAULT_LIMIT, 0);
     }
 
-    private void queryAssert(String query, FieldValueValidator fv) throws AtlasBaseException {
+    private void queryAssert(String query, TableValidator fv) throws AtlasBaseException {
         AtlasSearchResult searchResult = discoveryService.searchUsingDslQuery(query, DEFAULT_LIMIT, 0);
-        assertSearchResult(searchResult, fv);
-    }
-
-    private void assertSearchResult(AtlasSearchResult searchResult, FieldValueValidator expected) {
         assertNotNull(searchResult);
         assertNull(searchResult.getEntities());
 
-        assertEquals(searchResult.getAttributes().getName().size(), expected.getFieldNamesCount());
-        for (int i = 0; i < searchResult.getAttributes().getName().size(); i++) {
-            String s = searchResult.getAttributes().getName().get(i);
-            assertEquals(s, expected.fieldNames[i]);
-        }
-
-        assertEquals(searchResult.getAttributes().getValues().size(), expected.values.size());
+        TableValidator.assertFv(TableValidator.from(searchResult.getAttributes()), fv);
     }
 
     private void assertSearchResult(AtlasSearchResult searchResult, int expected, String query) {
@@ -700,25 +715,32 @@
         }
     }
 
-    private class FieldValueValidator {
-        class ResultObject {
-            Map<String, Object> fieldValues = new HashMap<>();
+    private static class TableValidator {
+        static class NameValueEntry {
+            Map<String, Object> items = new LinkedHashMap<>();
 
             public void setFieldValue(String string, Object object) {
-                fieldValues.put(string, object);
+                items.put(string, object);
             }
         }
 
-        private String[] fieldNames;
-        private List<ResultObject> values = new ArrayList<>();
+        public String[] fieldNames;
+        public List<NameValueEntry> values = new ArrayList<>();
 
-        public FieldValueValidator withFieldNames(String... fieldNames) {
+        public TableValidator() {
+        }
+
+        public TableValidator(String... fieldNames) {
+            header(fieldNames);
+        }
+
+        public TableValidator header(String... fieldNames) {
             this.fieldNames = fieldNames;
             return this;
         }
 
-        public FieldValueValidator withExpectedValues(Object... values) {
-            ResultObject obj = new ResultObject();
+        public TableValidator row(Object... values) {
+            NameValueEntry obj = new NameValueEntry();
             for (int i = 0; i < fieldNames.length; i++) {
                 obj.setFieldValue(fieldNames[i], values[i]);
             }
@@ -727,8 +749,75 @@
             return this;
         }
 
-        public int getFieldNamesCount() {
-            return (fieldNames != null) ? fieldNames.length : 0;
+        public static void assertFv(TableValidator actual, TableValidator expected) {
+            assertEquals(actual.fieldNames.length, expected.fieldNames.length);
+            assertEquals(actual.fieldNames, expected.fieldNames);
+            assertEquals(actual.values.size(), expected.values.size());
+
+            Map<String, Object> actualKeyItemsForCompare = new HashMap<>();
+            Map<String, Object> expectedItemsForCompare = new HashMap<>();
+            for (int i = 0; i < actual.values.size(); i++) {
+                getMapFrom(expectedItemsForCompare, expected.values.get(i).items);
+                getMapFrom(actualKeyItemsForCompare, actual.values.get(i).items);
+            }
+
+            for (String key : actualKeyItemsForCompare.keySet()) {
+                Object actualValue = actualKeyItemsForCompare.get(key);
+                Object expectedValue = expectedItemsForCompare.get(key);
+
+                assertNotNull(actualValue, "Key: " + key + ": Failed!");
+                assertEquals(actualValue, expectedValue, "Key: " + key + ": Value compare failed!");
+            }
+        }
+
+        private static Map<String, Object> getMapFrom(Map<String, Object> valuesMap, Map<String, Object> linkedHashMap) {
+            for (Map.Entry<String, Object> entry : linkedHashMap.entrySet()) {
+                String key = entry.getValue().toString();
+                valuesMap.put(key, linkedHashMap);
+                break;
+            }
+
+            return valuesMap;
+        }
+
+        public static TableValidator from(AtlasSearchResult.AttributeSearchResult searchResult) {
+            TableValidator fv = new TableValidator();
+            fv.header(searchResult.getName().toArray(new String[]{}));
+
+            for (int i = 0; i < searchResult.getValues().size(); i++) {
+                List list = searchResult.getValues().get(i);
+                fv.row(list.toArray());
+            }
+
+            return fv;
+        }
+    }
+
+    private static class ListValidator {
+        private Set<String> values;
+        public ListValidator(String... vals) {
+            values = Arrays.stream(vals).collect(Collectors.toSet());
+        }
+
+        public static void assertLv(ListValidator actual, ListValidator expected) {
+            String errorMessage = String.format("Expected: %s\r\nActual: %s", expected.values, actual.values);
+
+            assertEquals(actual.values.size(), expected.values.size(), errorMessage);
+            if (expected.values.size() > 0) {
+                for (String expectedVal : expected.values) {
+                    assertTrue(actual.values.contains(expectedVal), errorMessage);
+                }
+            }
+        }
+
+        public static ListValidator from(AtlasSearchResult result) {
+            ListValidator lv = new ListValidator();
+
+            if (result.getEntities() != null) {
+                lv.values.addAll(result.getEntities().stream().map(x -> x.getDisplayText()).collect(Collectors.toSet()));
+            }
+
+            return lv;
         }
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java b/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
index 6c69855..96e2840 100644
--- a/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
+++ b/repository/src/test/java/org/apache/atlas/query/GremlinQueryComposerTest.java
@@ -188,7 +188,7 @@
         verify("Table where owner like \"*Tab_*\"",
                 "g.V().has('__typeName', 'Table').has('Table.owner', org.janusgraph.core.attribute.Text.textRegex(\".*Tab_.*\")).dedup().limit(25).toList()");
         verify("from Table where (db.name = \"Reporting\")",
-                "g.V().has('__typeName', 'Table').and(__.out('__Table.db').has('DB.name', eq(\"Reporting\")).dedup()).dedup().limit(25).toList()");
+                "g.V().has('__typeName', 'Table').out('__Table.db').has('DB.name', eq(\"Reporting\")).dedup().in('__Table.db').dedup().limit(25).toList()");
     }
 
     @Test
@@ -312,7 +312,9 @@
                 {"hive_db where hive_db.name='Reporting' and hive_db.createTime < '2017-12-12T02:35:58.440Z'",
                         "g.V().has('__typeName', 'hive_db').and(__.has('hive_db.name', eq('Reporting')),__.has('hive_db.createTime', lt('1513046158440'))).dedup().limit(25).toList()"},
                 {"Table where db.name='Sales' and db.clusterName='cl1'",
-                        "g.V().has('__typeName', 'Table').and(__.out('__Table.db').has('DB.name', eq('Sales')).dedup(),__.out('__Table.db').has('DB.clusterName', eq('cl1')).dedup()).dedup().limit(25).toList()"},
+                        "g.V().has('__typeName', 'Table').and(__.out('__Table.db').has('DB.name', eq('Sales')).dedup().in('__Table.db'),__.out('__Table.db').has('DB.clusterName', eq('cl1')).dedup().in('__Table.db')).dedup().limit(25).toList()"},
+                {"hive_db where (hive_db.name='Reporting' or ((hive_db.name='Reporting' and hive_db.createTime > '2017-12-12T02:35:58.440Z') and (hive_db.name='Reporting' and hive_db.createTime < '2017-12-12T02:35:58.440Z')))",
+                        "g.V().has('__typeName', 'hive_db').or(__.has('hive_db.name', eq('Reporting')),__.and(__.and(__.has('hive_db.name', eq('Reporting')),__.has('hive_db.createTime', gt('1513046158440'))),__.and(__.has('hive_db.name', eq('Reporting')),__.has('hive_db.createTime', lt('1513046158440'))))).dedup().limit(25).toList()"}
         };
     }
 
@@ -330,12 +332,16 @@
                 "dedup().limit(25).toList()");
         verify("Table hasTerm \"sales@glossary\"", "g.V().has('__typeName', 'Table')." +
                 "and(__.in('r:AtlasGlossarySemanticAssignment')." +
-                "has('AtlasGlossaryTerm.qualifiedName', eq(\"sales@glossary\")).dedup())." +
+                "has('AtlasGlossaryTerm.qualifiedName', eq('sales@glossary')).dedup())." +
                 "dedup().limit(25).toList()");
-        verify("Table hasTerm \"sales@glossary\" and owner = \"fetl\"","g.V().has('__typeName', 'Table')." +
-                "and(__.in('r:AtlasGlossarySemanticAssignment').has('AtlasGlossaryTerm.qualifiedName', eq(\"sales@glossary\")).dedup()" +
-                ",__.has('Table.owner', eq(\"fetl\")))." +
-                "dedup().limit(25).toList()");
+        verify("Table hasTerm \"sales@glossary\" and owner = \"fetl\"",
+                "g.V().has('__typeName', 'Table')" +
+                        ".and(" +
+                            "__.and(" +
+                                "__.in('r:AtlasGlossarySemanticAssignment').has('AtlasGlossaryTerm.qualifiedName', eq('sales@glossary'))" +
+                                ".dedup())," +
+                                "__.has('Table.owner', eq(\"fetl\"))" +
+                        ").dedup().limit(25).toList()");
     }
 
 
@@ -373,7 +379,7 @@
     public void whereComplexAndSelect() {
         String exSel = "def f(r){ t=[['name']];  r.each({t.add([" +
                 "it.property('Table.name').isPresent() ? it.value('Table.name') : \"\"])}); t.unique(); }";
-        String exMain = "g.V().has('__typeName', 'Table').and(__.out('__Table.db').has('DB.name', eq(\"Reporting\")).dedup(),__.has('Table.name', eq(\"sales_fact\"))).dedup().limit(25).toList()";
+        String exMain = "g.V().has('__typeName', 'Table').and(__.out('__Table.db').has('DB.name', eq(\"Reporting\")).dedup().in('__Table.db'),__.has('Table.name', eq(\"sales_fact\"))).dedup().limit(25).toList()";
         verify("Table where db.name = \"Reporting\" and name =\"sales_fact\" select name", getExpected(exSel, exMain));
         verify("Table where db.name = \"Reporting\" and name =\"sales_fact\"", exMain);
     }
diff --git a/repository/src/test/java/org/apache/atlas/query/TraversalComposerTest.java b/repository/src/test/java/org/apache/atlas/query/TraversalComposerTest.java
new file mode 100644
index 0000000..6839620
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/query/TraversalComposerTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.query;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.query.antlr4.AtlasDSLParser;
+import org.apache.atlas.query.executors.GremlinClauseToTraversalTranslator;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+
+import static org.testng.Assert.assertEquals;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TraversalComposerTest extends BaseDSLComposer {
+    @Inject
+    public AtlasGraph graph;
+
+    @Test
+    public void queries() {
+        verify("hive_db",
+                "[JanusGraphStep([],[__typeName.eq(hive_db)]), DedupGlobalStep, RangeGlobalStep(0,25)]");
+
+        verify("hive_db where owner = 'hdfs'",
+                "[JanusGraphStep([],[__typeName.eq(hive_db), hive_db.owner.eq(hdfs)]), DedupGlobalStep, RangeGlobalStep(0,25)]");
+
+        verify("DB where owner = ['hdfs', 'anon']",
+                "[JanusGraphStep([],[__typeName.eq(DB), DB.owner.within([hdfs, anon])]), DedupGlobalStep, RangeGlobalStep(0,25)]");
+
+        verify("hive_db where hive_db.name='Reporting' and hive_db.createTime < '2017-12-12T02:35:58.440Z'",
+                "[JanusGraphStep([],[__typeName.eq(hive_db), hive_db.name.eq(Reporting), hive_db.createTime.lt(1513046158440)]), DedupGlobalStep, RangeGlobalStep(0,25)]");
+
+        // note that projections are not handled in the conversion
+        verify("DB as d select d.name, d.owner",
+                "[JanusGraphStep([],[__typeName.eq(DB)]), DedupGlobalStep@[d], RangeGlobalStep(0,25)]");
+
+        verify("Table groupby(owner) select name, owner, clusterName orderby name",
+                "[JanusGraphStep([],[__typeName.eq(Table)]), TraversalFilterStep([JanusGraphPropertiesStep([Table.owner],property)]), GroupStep(value(Table.owner),[FoldStep]), DedupGlobalStep, RangeGlobalStep(0,25)]");
+    }
+
+    private void verify(String dsl, String expected) {
+        AtlasDSLParser.QueryContext queryContext = getParsedQuery(dsl);
+        String actual = getTraversalAsStr(queryContext);
+        assertEquals(actual, expected, dsl);
+    }
+
+    private String getTraversalAsStr(AtlasDSLParser.QueryContext queryContext) {
+        org.apache.atlas.query.Lookup lookup = new TestLookup(registry);
+        GremlinQueryComposer.Context context = new GremlinQueryComposer.Context(lookup);
+        AtlasDSL.QueryMetadata queryMetadata = new AtlasDSL.QueryMetadata(queryContext);
+
+        GremlinQueryComposer gremlinQueryComposer = new GremlinQueryComposer(lookup, context, queryMetadata);
+        DSLVisitor qv = new DSLVisitor(gremlinQueryComposer);
+        qv.visit(queryContext);
+
+        gremlinQueryComposer.get();
+
+        AtlasGraphTraversal traversal = GremlinClauseToTraversalTranslator.run(graph, gremlinQueryComposer.clauses());
+        return traversal.toString();
+    }
+}