ATLAS-1880: search API with support for entity/tag attribute filters
diff --git a/client/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6141342..7e287e7 100644
--- a/client/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -22,6 +22,7 @@
 import com.sun.jersey.core.util.MultivaluedMapImpl;
 import org.apache.atlas.model.SearchFilter;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
@@ -100,9 +101,13 @@
     private static final String DISCOVERY_URI = BASE_URI + "v2/search";
     private static final String DSL_URI       = DISCOVERY_URI + "/dsl";
     private static final String FULL_TEXT_URI = DISCOVERY_URI + "/fulltext";
+    private static final String BASIC_SEARCH_URI = DISCOVERY_URI + "/basic";
+    private static final String FACETED_SEARCH_URI = BASIC_SEARCH_URI;
 
     private static final APIInfo DSL_SEARCH       = new APIInfo(DSL_URI, HttpMethod.GET, Response.Status.OK);
     private static final APIInfo FULL_TEXT_SEARCH = new APIInfo(FULL_TEXT_URI, HttpMethod.GET, Response.Status.OK);
+    private static final APIInfo BASIC_SEARCH = new APIInfo(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
+    private static final APIInfo FACETED_SEARCH = new APIInfo(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
 
 
     public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
@@ -398,6 +403,23 @@
         return callAPI(FULL_TEXT_SEARCH, AtlasSearchResult.class, queryParams);
     }
 
+    public AtlasSearchResult basicSearch(final String typeName, final String classification, final String query,
+                                         final boolean excludeDeletedEntities, final int limit, final int offset) throws AtlasServiceException {
+        MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+        queryParams.add("typeName", typeName);
+        queryParams.add("classification", classification);
+        queryParams.add(QUERY, query);
+        queryParams.add("excludeDeletedEntities", String.valueOf(excludeDeletedEntities));
+        queryParams.add(LIMIT, String.valueOf(limit));
+        queryParams.add(OFFSET, String.valueOf(offset));
+
+        return callAPI(BASIC_SEARCH, AtlasSearchResult.class, queryParams);
+    }
+
+    public AtlasSearchResult facetedSearch(SearchParameters searchParameters) throws AtlasServiceException {
+        return callAPI(FACETED_SEARCH, AtlasSearchResult.class, searchParameters);
+    }
+
     private <T> T getTypeDefByName(final String name, Class<T> typeDefClass) throws AtlasServiceException {
         String atlasPath = getAtlasPath(typeDefClass);
         APIInfo apiInfo = new APIInfo(String.format(GET_BY_NAME_TEMPLATE, atlasPath, name), HttpMethod.GET, Response.Status.OK);
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index ac02252..e8621cf 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -96,7 +96,9 @@
 
     public static final String QUALIFIED_NAME = "Referenceable.qualifiedName";
     public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
-
+    public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
+    public static final String INDEX_SEARCH_MAX_TYPES_COUNT = "atlas.graph.index.search.max-types-count";
+    public static final String INDEX_SEARCH_MAX_TAGS_COUNT = "atlas.graph.index.search.max-tags-count";
 
     private Constants() {
     }
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index e6c0d9f..a0ef6a9 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -59,6 +59,23 @@
         </layout>
     </appender>
 
+    <!-- Uncomment the following for perf logs -->
+    <!--
+    <appender name="perf_appender" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="file" value="${atlas.log.dir}/atlas_perf.log" />
+        <param name="datePattern" value="'.'yyyy-MM-dd" />
+        <param name="append" value="true" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d|%t|%m%n" />
+        </layout>
+    </appender>
+
+    <logger name="org.apache.atlas.perf" additivity="false">
+        <level value="debug" />
+        <appender-ref ref="perf_appender" />
+    </logger>
+    -->
+
     <logger name="org.apache.atlas" additivity="false">
         <level value="info"/>
         <appender-ref ref="FILE"/>
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
index 841edf7..73db22e 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
@@ -47,7 +47,7 @@
      * the specified list of values.
      *
      * @param propertyKey
-     * @param value
+     * @param values
      * @return
      */
     AtlasGraphQuery<V, E> in(String propertyKey, Collection<?> values);
@@ -56,7 +56,6 @@
     /**
      * Executes the query and returns the matching vertices.
      * @return
-     * @throws AtlasException
      */
     Iterable<AtlasVertex<V, E>> vertices();
 
@@ -66,16 +65,32 @@
      */
     Iterable<AtlasEdge<V, E>> edges();
 
+    /**
+     * Executes the query and returns the matching vertices from given offset till the max limit
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<AtlasVertex<V, E>> vertices(int limit);
+
+    /**
+     * Executes the query and returns the matching vertices from given offset till the max limit
+     * @param offset starting offset
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
+
 
     /**
      * Adds a predicate that the returned vertices must have the specified
      * property and that its value matches the criterion specified.
      *
      * @param propertyKey
-     * @param value
+     * @param op
+     * @param values
      * @return
      */
-    AtlasGraphQuery<V, E> has(String propertyKey, ComparisionOperator compMethod, Object values);
+    AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator op, Object values);
 
     /**
      * Adds a predicate that the vertices returned must satisfy the
@@ -94,17 +109,31 @@
     AtlasGraphQuery<V, E> createChildQuery();
 
 
+    interface QueryOperator {}
+
     /**
      * Comparison operators that can be used in an AtlasGraphQuery.
      */
-    enum ComparisionOperator {
+    enum ComparisionOperator implements QueryOperator {
+        GREATER_THAN,
         GREATER_THAN_EQUAL,
         EQUAL,
+        LESS_THAN,
         LESS_THAN_EQUAL,
         NOT_EQUAL
     }
 
     /**
+     * String/text matching that can be used in AtlasGraphQuery
+     */
+    enum MatchingOperator implements QueryOperator {
+        CONTAINS,
+        PREFIX,
+        SUFFIX,
+        REGEX
+    }
+
+    /**
      * Adds all of the predicates that have been added to this query to the
      * specified query.
      * @param otherQuery
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasIndexQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasIndexQuery.java
index 1ff9d5e..6bad173 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasIndexQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasIndexQuery.java
@@ -36,6 +36,14 @@
     Iterator<Result<V, E>> vertices();
 
     /**
+     * Gets the query results
+     * @param offset starting offset
+     * @param limit max number of results
+     * @return
+     */
+    Iterator<Result<V, E>> vertices(int offset, int limit);
+
+    /**
      * Query result from an index query.
      *
      * @param <V>
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
index 53f490f..9beb6a3 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertexQuery.java
@@ -42,15 +42,29 @@
     Iterable<AtlasVertex<V, E>> vertices();
 
     /**
+     * Returns the vertices that satisfy the query condition.
+     *
+     * @param limit Max number of vertices
+     * @return
+     */
+    Iterable<AtlasVertex<V, E>> vertices(int limit);
+
+    /**
      * Returns the incident edges that satisfy the query condition.
      * @return
      */
     Iterable<AtlasEdge<V, E>> edges();
 
     /**
+     * Returns the incident edges that satisfy the query condition.
+     * @param limit Max number of edges
+     * @return
+     */
+    Iterable<AtlasEdge<V, E>> edges(int limit);
+
+    /**
      * Returns the number of elements that match the query.
      * @return
      */
     long count();
-
 }
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/NativeTitanGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/NativeTitanGraphQuery.java
index 0211ff0..288b325 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/NativeTitanGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/NativeTitanGraphQuery.java
@@ -17,12 +17,12 @@
  */
 package org.apache.atlas.repository.graphdb.titan.query;
 
-import java.util.Collection;
-
 import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 
+import java.util.Collection;
+
 /**
  * Interfaces that provides a thin wrapper around GraphQuery (used by Titan0) and
  * TitanGraphQuery (used by Titan 1).
@@ -47,6 +47,22 @@
     Iterable<AtlasEdge<V, E>> edges();
 
     /**
+     * Executes graph query
+     * @param limit Max vertices to return
+     * @return
+     */
+    Iterable<AtlasVertex<V, E>> vertices(int limit);
+
+    /**
+     * Executes graph query
+     * @param offset Starting offset
+     * @param limit Max vertices to return
+     * @return
+     */
+    Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
+
+
+    /**
      * Adds an in condition to the query.
      *
      * @param propertyName
@@ -61,6 +77,5 @@
      * @param op
      * @param value
      */
-    void has(String propertyName, ComparisionOperator op, Object value);
-
+    void has(String propertyName, QueryOperator op, Object value);
 }
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/TitanGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/TitanGraphQuery.java
index 0077a21..c38f6cc 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/TitanGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/TitanGraphQuery.java
@@ -17,11 +17,8 @@
  */
 package org.apache.atlas.repository.graphdb.titan.query;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
@@ -33,6 +30,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
 /**
  * Abstract implementation of AtlasGraphQuery that is used by both Titan 0.5.4
  * and Titan 1.0.0.
@@ -123,11 +127,10 @@
     @Override
     public Iterable<AtlasVertex<V, E>> vertices() {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Executing: " + queryCondition.toString());
+            LOG.debug("Executing: " + queryCondition);
         }
 
-        //compute the overall result by unioning the results from all of the
-        //AndConditions together.
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
         Set<AtlasVertex<V, E>> result = new HashSet<>();
         for(AndCondition andExpr : queryCondition.getAndTerms()) {
             NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
@@ -141,11 +144,10 @@
     @Override
     public Iterable<AtlasEdge<V, E>> edges() {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Executing: " + queryCondition.toString());
+            LOG.debug("Executing: " + queryCondition);
         }
 
-        //compute the overall result by unioning the results from all of the
-        //AndConditions together.
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
         Set<AtlasEdge<V, E>> result = new HashSet<>();
         for(AndCondition andExpr : queryCondition.getAndTerms()) {
             NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
@@ -157,7 +159,46 @@
     }
 
     @Override
-    public AtlasGraphQuery<V, E> has(String propertyKey, ComparisionOperator operator,
+    public Iterable<AtlasVertex<V, E>> vertices(int limit) {
+        return vertices(0, limit);
+    }
+
+    @Override
+    public Iterable<AtlasVertex<V, E>> vertices(int offset, int limit) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Executing: " + queryCondition);
+        }
+
+        Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
+        Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
+
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+        Set<AtlasVertex<V, E>> result = new HashSet<>();
+        long resultIdx = 0;
+        for(AndCondition andExpr : queryCondition.getAndTerms()) {
+            if (result.size() == limit) {
+                break;
+            }
+
+            NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+            for(AtlasVertex<V, E> vertex : andQuery.vertices(offset + limit)) {
+                if (resultIdx >= offset) {
+                    result.add(vertex);
+
+                    if (result.size() == limit) {
+                        break;
+                    }
+                }
+
+                resultIdx++;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator,
             Object value) {
         queryCondition.andWith(new HasPredicate(propertyKey, operator, value));
         return this;
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/AndCondition.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/AndCondition.java
index 68f0eb2..db5093f 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/AndCondition.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/AndCondition.java
@@ -17,12 +17,12 @@
  */
 package org.apache.atlas.repository.graphdb.titan.query.expr;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
 import org.apache.atlas.repository.graphdb.titan.query.NativeTitanQueryFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Represents an AndCondition in a graph query.  Only vertices that
  * satisfy the conditions in all of the query predicates will be returned
@@ -78,7 +78,7 @@
     /**
      * Creates a NativeTitanGraphQuery that can be used to evaluate this condition.
      *
-     * @param graph
+     * @param factory
      * @return
      */
     public <V, E> NativeTitanGraphQuery<V, E> create(NativeTitanQueryFactory<V, E> factory) {
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/HasPredicate.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/HasPredicate.java
index 24e4f5b..0652c41 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/HasPredicate.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/titan/query/expr/HasPredicate.java
@@ -17,7 +17,7 @@
  */
 package org.apache.atlas.repository.graphdb.titan.query.expr;
 
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
 import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
 
 /**
@@ -27,11 +27,10 @@
 public class HasPredicate implements QueryPredicate {
 
     private String propertyName;
-    private ComparisionOperator op;
+    private QueryOperator op;
     private Object value;
 
-    public HasPredicate(String propertyName, ComparisionOperator op, Object value) {
-        super();
+    public HasPredicate(String propertyName, QueryOperator op, Object value) {
         this.propertyName = propertyName;
         this.op = op;
         this.value = value;
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java
index 54ff7cb..8e0928c 100644
--- a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java
@@ -285,7 +285,33 @@
                 }
 
                 if (index.isCompositeIndex()) {
-                    subcondition = indexCover((CompositeIndexType) index, conditions, subcover);
+                    CompositeIndexType compositeIndex = (CompositeIndexType)index;
+
+                    subcondition = indexCover(compositeIndex, conditions, subcover);
+
+                    // if this is unique index, use it!!
+                    if (compositeIndex.getCardinality() == Cardinality.SINGLE && subcondition != null) {
+                        bestCandidate         = null; // will cause the outer while() to bail out
+                        candidateSubcover     = subcover;
+                        candidateSubcondition = subcondition;
+                        candidateSupportsSort = supportsSort;
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("selected unique index {}", compositeIndex.getName());
+                        }
+
+                        if (coveredClauses.isEmpty()) {
+                            isSorted = candidateSupportsSort;
+                        }
+
+                        coveredClauses.clear();;
+                        coveredClauses.addAll(candidateSubcover);
+
+                        jointQuery = new JointIndexQuery();
+                        jointQuery.add(compositeIndex, serializer.getQuery(compositeIndex, (List<Object[]>)candidateSubcondition));
+
+                        break;
+                    }
                 } else {
                     subcondition = indexCover((MixedIndexType) index, conditions, serializer, subcover);
                     if (coveredClauses.isEmpty() && !supportsSort
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0IndexQuery.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0IndexQuery.java
index 1ed1734..c4a312d 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0IndexQuery.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0IndexQuery.java
@@ -19,6 +19,7 @@
 
 import java.util.Iterator;
 
+import com.google.common.base.Preconditions;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 
@@ -56,6 +57,26 @@
         return Iterators.transform(results, function);
     }
 
+    @Override
+    public Iterator<Result<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
+        Preconditions.checkArgument(offset >=0, "Index offset should be greater than or equals to 0");
+        Preconditions.checkArgument(limit >=0, "Index limit should be greater than or equals to 0");
+        Iterator<TitanIndexQuery.Result<Vertex>> results = wrappedIndexQuery
+                .offset(offset)
+                .limit(limit)
+                .vertices().iterator();
+
+        Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>> function =
+                new Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>>() {
+
+                    @Override
+                    public AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> apply(TitanIndexQuery.Result<Vertex> source) {
+                        return new ResultImpl(source);
+                    }
+                };
+        return Iterators.transform(results, function);
+    }
+
     private final class ResultImpl implements AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> {
         private TitanIndexQuery.Result<Vertex> wrappedResult;
 
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
index bd8b897..091e7d4 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0VertexQuery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.graphdb.titan0;
 
+import com.google.common.base.Preconditions;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -53,12 +54,26 @@
     }
 
     @Override
+    public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
+        Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
+        Iterable<Vertex> vertices = vertexQuery.limit(limit).vertices();
+        return graph.wrapVertices(vertices);
+    }
+
+    @Override
     public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges() {
         Iterable<Edge> edges = vertexQuery.edges();
         return graph.wrapEdges(edges);
     }
 
     @Override
+    public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
+        Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
+        Iterable<Edge> edges = vertexQuery.limit(limit).edges();
+        return graph.wrapEdges(edges);
+    }
+
+    @Override
     public long count() {
         return vertexQuery.count();
     }
diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
index 7ec6ffe..f1f1adb 100644
--- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
+++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/query/NativeTitan0GraphQuery.java
@@ -17,21 +17,25 @@
  */
 package org.apache.atlas.repository.graphdb.titan0.query;
 
-import java.util.Collection;
-
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
-import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
-import org.apache.atlas.repository.graphdb.titan0.Titan0Edge;
-import org.apache.atlas.repository.graphdb.titan0.Titan0Graph;
-import org.apache.atlas.repository.graphdb.titan0.Titan0Vertex;
-
+import com.google.common.collect.Lists;
 import com.thinkaurelius.titan.core.TitanGraphQuery;
 import com.thinkaurelius.titan.core.attribute.Contain;
+import com.thinkaurelius.titan.core.attribute.Text;
 import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
 import com.tinkerpop.blueprints.Compare;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
+import org.apache.atlas.repository.graphdb.titan0.Titan0Edge;
+import org.apache.atlas.repository.graphdb.titan0.Titan0Graph;
+import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
+import org.apache.atlas.repository.graphdb.titan0.Titan0Vertex;
+
+import java.util.*;
 
 /**
  * Titan 0.5.4 implementation of NativeTitanGraphQuery.
@@ -60,6 +64,28 @@
         Iterable it = query.edges();
         return graph.wrapEdges(it);
     }
+    @Override
+    public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
+        Iterable it = query.limit(limit).vertices();
+        return graph.wrapVertices(it);
+    }
+
+    @Override
+    public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
+        List<Vertex>     result = new ArrayList<>(limit);
+        Iterator<Vertex> iter   = query.limit(offset + limit).vertices().iterator();
+
+        for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+            if (resultIdx < offset) {
+                continue;
+            }
+
+            result.add(iter.next());
+        }
+
+        return graph.wrapVertices(result);
+    }
+
 
     @Override
     public void in(String propertyName, Collection<?> values) {
@@ -68,26 +94,48 @@
     }
 
     @Override
-    public void has(String propertyName, ComparisionOperator op, Object value) {
-
-        Compare c = getGremlinPredicate(op);
-        TitanPredicate pred = TitanPredicate.Converter.convert(c);
+    public void has(String propertyName, QueryOperator op, Object value) {
+        TitanPredicate pred;
+        if (op instanceof ComparisionOperator) {
+            Compare c = getGremlinPredicate((ComparisionOperator) op);
+            pred = TitanPredicate.Converter.convert(c);
+        } else {
+            pred = getGremlinPredicate((MatchingOperator) op);
+        }
         query.has(propertyName, pred, value);
     }
 
+    private Text getGremlinPredicate(MatchingOperator op) {
+        switch (op) {
+            case CONTAINS:
+                return Text.CONTAINS;
+            case PREFIX:
+                return Text.PREFIX;
+            case SUFFIX:
+                return Text.CONTAINS_REGEX;
+            case REGEX:
+                return Text.REGEX;
+            default:
+                throw new RuntimeException("Unsupported matching operator:" + op);
+        }
+    }
+
     private Compare getGremlinPredicate(ComparisionOperator op) {
         switch (op) {
-        case EQUAL:
-            return Compare.EQUAL;
-        case GREATER_THAN_EQUAL:
-            return Compare.GREATER_THAN_EQUAL;
-        case LESS_THAN_EQUAL:
-            return Compare.LESS_THAN_EQUAL;
-        case NOT_EQUAL:
-            return Compare.NOT_EQUAL;
-
-        default:
-            throw new RuntimeException("Unsupported comparison operator:" + op);
+            case EQUAL:
+                return Compare.EQUAL;
+            case GREATER_THAN:
+                return Compare.GREATER_THAN;
+            case GREATER_THAN_EQUAL:
+                return Compare.GREATER_THAN_EQUAL;
+            case LESS_THAN:
+                return Compare.LESS_THAN;
+            case LESS_THAN_EQUAL:
+                return Compare.LESS_THAN_EQUAL;
+            case NOT_EQUAL:
+                return Compare.NOT_EQUAL;
+            default:
+                throw new RuntimeException("Unsupported comparison operator:" + op);
         }
     }
 
diff --git a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1IndexQuery.java b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1IndexQuery.java
index 4073dd2..c7e4150 100644
--- a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1IndexQuery.java
+++ b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1IndexQuery.java
@@ -19,6 +19,7 @@
 
 import java.util.Iterator;
 
+import com.google.common.base.Preconditions;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 
@@ -56,6 +57,27 @@
         return Iterators.transform(results, function);
     }
 
+    @Override
+    public Iterator<Result<Titan1Vertex, Titan1Edge>> vertices(int offset, int limit) {
+        Preconditions.checkArgument(offset >=0, "Index offset should be greater than or equals to 0");
+        Preconditions.checkArgument(limit >=0, "Index limit should be greater than or equals to 0");
+        Iterator<TitanIndexQuery.Result<TitanVertex>> results = query
+                .offset(offset)
+                .limit(limit)
+                .vertices().iterator();
+
+        Function<TitanIndexQuery.Result<TitanVertex>, Result<Titan1Vertex, Titan1Edge>> function =
+                new Function<TitanIndexQuery.Result<TitanVertex>, Result<Titan1Vertex, Titan1Edge>>() {
+
+                    @Override
+                    public Result<Titan1Vertex, Titan1Edge> apply(TitanIndexQuery.Result<TitanVertex> source) {
+                        return new ResultImpl(source);
+                    }
+                };
+
+        return Iterators.transform(results, function);
+    }
+
     /**
      * Titan 1.0.0 implementation of AtlasIndexQuery.Result.
      */
diff --git a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1VertexQuery.java b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1VertexQuery.java
index 4452bcd..a761a74 100644
--- a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1VertexQuery.java
+++ b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/Titan1VertexQuery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.graphdb.titan1;
 
+import com.google.common.base.Preconditions;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -51,10 +52,23 @@
     }
 
     @Override
+    public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int limit) {
+        Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
+        Iterable vertices = query.limit(limit).vertices();
+        return graph.wrapVertices(vertices);
+    }
+
+    @Override
     public Iterable<AtlasEdge<Titan1Vertex, Titan1Edge>> edges() {
         Iterable edges = query.edges();
         return graph.wrapEdges(edges);
+    }
 
+    @Override
+    public Iterable<AtlasEdge<Titan1Vertex, Titan1Edge>> edges(int limit) {
+        Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
+        Iterable edges = query.limit(limit).edges();
+        return graph.wrapEdges(edges);
     }
 
     @Override
diff --git a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/query/NativeTitan1GraphQuery.java b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/query/NativeTitan1GraphQuery.java
index 1ca900d..9293dbd 100644
--- a/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/query/NativeTitan1GraphQuery.java
+++ b/graphdb/titan1/src/main/java/org/apache/atlas/repository/graphdb/titan1/query/NativeTitan1GraphQuery.java
@@ -17,11 +17,16 @@
  */
 package org.apache.atlas.repository.graphdb.titan1.query;
 
-import java.util.Collection;
-
 import com.thinkaurelius.titan.core.TitanEdge;
+import com.thinkaurelius.titan.core.TitanGraphQuery;
+import com.thinkaurelius.titan.core.TitanVertex;
+import com.thinkaurelius.titan.core.attribute.Contain;
+import com.thinkaurelius.titan.core.attribute.Text;
+import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
 import org.apache.atlas.repository.graphdb.titan1.Titan1Edge;
@@ -29,11 +34,9 @@
 import org.apache.atlas.repository.graphdb.titan1.Titan1GraphDatabase;
 import org.apache.atlas.repository.graphdb.titan1.Titan1Vertex;
 import org.apache.tinkerpop.gremlin.process.traversal.Compare;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 
-import com.thinkaurelius.titan.core.TitanGraphQuery;
-import com.thinkaurelius.titan.core.TitanVertex;
-import com.thinkaurelius.titan.core.attribute.Contain;
-import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
+import java.util.*;
 
 /**
  * Titan 1.0.0 implementation of NativeTitanGraphQuery.
@@ -61,32 +64,77 @@
     }
 
     @Override
+    public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int limit) {
+        Iterable<TitanVertex> it = query.limit(limit).vertices();
+        return graph.wrapVertices(it);
+    }
+
+    @Override
+    public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int offset, int limit) {
+        List<Vertex>               result = new ArrayList<>(limit);
+        Iterator<? extends Vertex> iter   = query.limit(offset + limit).vertices().iterator();
+
+        for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+            if (resultIdx < offset) {
+                continue;
+            }
+
+            result.add(iter.next());
+        }
+
+        return graph.wrapVertices(result);
+    }
+
+    @Override
     public void in(String propertyName, Collection<? extends Object> values) {
         query.has(propertyName, Contain.IN, values);
 
     }
 
     @Override
-    public void has(String propertyName, ComparisionOperator op, Object value) {
-
-        Compare c = getGremlinPredicate(op);
-        TitanPredicate pred = TitanPredicate.Converter.convert(c);
+    public void has(String propertyName, QueryOperator op, Object value) {
+        TitanPredicate pred;
+        if (op instanceof ComparisionOperator) {
+            Compare c = getGremlinPredicate((ComparisionOperator) op);
+            pred = TitanPredicate.Converter.convert(c);
+        } else {
+            pred = getGremlinPredicate((MatchingOperator)op);
+        }
         query.has(propertyName, pred, value);
     }
 
+    private Text getGremlinPredicate(MatchingOperator op) {
+        switch (op) {
+            case CONTAINS:
+                return Text.CONTAINS;
+            case PREFIX:
+                return Text.PREFIX;
+            case SUFFIX:
+                return Text.CONTAINS_REGEX;
+            case REGEX:
+                return Text.REGEX;
+            default:
+                throw new RuntimeException("Unsupported matching operator:" + op);
+        }
+    }
+
     private Compare getGremlinPredicate(ComparisionOperator op) {
         switch (op) {
-        case EQUAL:
-            return Compare.eq;
-        case GREATER_THAN_EQUAL:
-            return Compare.gte;
-        case LESS_THAN_EQUAL:
-            return Compare.lte;
-        case NOT_EQUAL:
-            return Compare.neq;
+            case EQUAL:
+                return Compare.eq;
+            case GREATER_THAN:
+                return Compare.gt;
+            case GREATER_THAN_EQUAL:
+                return Compare.gte;
+            case LESS_THAN:
+                return Compare.lt;
+            case LESS_THAN_EQUAL:
+                return Compare.lte;
+            case NOT_EQUAL:
+                return Compare.neq;
 
-        default:
-            throw new RuntimeException("Unsupported comparison operator:" + op);
+            default:
+                throw new RuntimeException("Unsupported comparison operator:" + op);
         }
     }
 
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index a402c62..9513dcb 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -39,10 +39,9 @@
 @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.PROPERTY)
 public class AtlasSearchResult implements Serializable {
     private AtlasQueryType            queryType;
+    private SearchParameters          searchParameters;
     private String                    queryText;
     private String                    type;
     private String                    classification;
@@ -59,11 +58,24 @@
     public AtlasSearchResult(String queryText, AtlasQueryType queryType) {
         setQueryText(queryText);
         setQueryType(queryType);
+        setSearchParameters(null);
         setEntities(null);
         setAttributes(null);
         setFullTextResult(null);
     }
 
+    public AtlasSearchResult(SearchParameters searchParameters) {
+        setQueryType(AtlasQueryType.BASIC);
+
+        if (searchParameters != null) {
+            setQueryText(searchParameters.getQuery());
+            setSearchParameters(searchParameters);
+            setEntities(null);
+            setAttributes(null);
+            setFullTextResult(null);
+        }
+    }
+
     public AtlasQueryType getQueryType() { return queryType; }
 
     public void setQueryType(AtlasQueryType queryType) { this.queryType = queryType; }
@@ -98,6 +110,7 @@
         if (o == null || getClass() != o.getClass()) return false;
         AtlasSearchResult that = (AtlasSearchResult) o;
         return Objects.equals(queryType, that.queryType) &&
+               Objects.equals(searchParameters, that.searchParameters) &&
                Objects.equals(queryText, that.queryText) &&
                Objects.equals(type, that.type) &&
                Objects.equals(classification, that.classification) &&
@@ -107,12 +120,13 @@
     }
 
     @Override
-    public int hashCode() { return Objects.hash(queryText, queryType, entities, attributes, fullTextResult, type, classification); }
+    public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult); }
 
     @Override
     public String toString() {
         return "AtlasSearchResult{" +
                 "queryType=" + queryType +
+                ", searchParameters='" + searchParameters + '\'' +
                 ", queryText='" + queryText + '\'' +
                 ", type=" + type +
                 ", classification=" + classification +
@@ -149,6 +163,14 @@
         }
     }
 
+    public void setSearchParameters(SearchParameters searchParameters) {
+        this.searchParameters = searchParameters;
+    }
+
+    public SearchParameters getSearchParameters() {
+        return searchParameters;
+    }
+
     public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }
 
     @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
new file mode 100644
index 0000000..30855dc
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.discovery;
+
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonValue;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SearchParameters {
+    private String  query;
+    private String  typeName;
+    private String  classification;
+    private boolean excludeDeletedEntities;
+    private int     limit;
+    private int     offset;
+
+    private FilterCriteria entityFilters;
+    private FilterCriteria tagFilters;
+    private Set<String>    attributes;
+
+    /**
+     * @return The type of query
+     */
+    public String getQuery() {
+        return query;
+    }
+
+    /**
+     * Set query type
+     * @param query type
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    /**
+     * @return Type name to search on
+     */
+    public String getTypeName() {
+        return typeName;
+    }
+
+    /**
+     * Set the type name to search on
+     * @param typeName type name
+     */
+    public void setTypeName(String typeName) {
+        this.typeName = typeName;
+    }
+
+    /**
+     *
+     * @return Classification/tag to search on
+     */
+    public String getClassification() {
+        return classification;
+    }
+
+    /**
+     * Set the classification/tag to search on
+     * @param classification classification/tag name
+     */
+    public void setClassification(String classification) {
+        this.classification = classification;
+    }
+
+    /**
+     * @return True iff deleted entities are excluded
+     */
+    public boolean getExcludeDeletedEntities() {
+        return excludeDeletedEntities;
+    }
+
+    /**
+     * Exclude deleted entities from search
+     * @param excludeDeletedEntities boolean flag
+     */
+    public void setExcludeDeletedEntities(boolean excludeDeletedEntities) {
+        this.excludeDeletedEntities = excludeDeletedEntities;
+    }
+
+    /**
+     * @return Max number of results to be returned
+     */
+    public int getLimit() {
+        return limit;
+    }
+
+    /**
+     * Restrict the results to the specified limit
+     * @param limit max number of results
+     */
+    public void setLimit(int limit) {
+        this.limit = limit;
+    }
+
+    /**
+     * @return Offset(pagination) of the results
+     */
+    public int getOffset() {
+        return offset;
+    }
+
+    /**
+     * @param offset
+     */
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * Entity attribute filters for the type (if type name is specified)
+     * @return
+     */
+    public FilterCriteria getEntityFilters() {
+        return entityFilters;
+    }
+
+    /**
+     * Filter the entities on this criteria
+     * @param entityFilters
+     */
+    public void setEntityFilters(FilterCriteria entityFilters) {
+        this.entityFilters = entityFilters;
+    }
+
+    /**
+     * Tag attribute filters for the classification/tag (if tag name is specified)
+     * @return
+     */
+    public FilterCriteria getTagFilters() {
+        return tagFilters;
+    }
+
+    /**
+     * Filter the tag/classification on this criteria
+     * @param tagFilters
+     */
+    public void setTagFilters(FilterCriteria tagFilters) {
+        this.tagFilters = tagFilters;
+    }
+
+    /**
+     * Attribute values included in the results
+     * @return
+     */
+    public Set<String> getAttributes() {
+        return attributes;
+    }
+
+    /**
+     * Return these attributes in the result response
+     * @param attributes
+     */
+    public void setAttributes(Set<String> attributes) {
+        this.attributes = attributes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SearchParameters that = (SearchParameters) o;
+        return excludeDeletedEntities == that.excludeDeletedEntities &&
+                limit == that.limit &&
+                offset == that.offset &&
+                Objects.equals(query, that.query) &&
+                Objects.equals(typeName, that.typeName) &&
+                Objects.equals(classification, that.classification) &&
+                Objects.equals(entityFilters, that.entityFilters) &&
+                Objects.equals(tagFilters, that.tagFilters) &&
+                Objects.equals(attributes, that.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(query, typeName, classification, excludeDeletedEntities, limit, offset, entityFilters, tagFilters, attributes);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("SearchParameters{");
+        sb.append("query='").append(query).append('\'');
+        sb.append(", typeName='").append(typeName).append('\'');
+        sb.append(", classification='").append(classification).append('\'');
+        sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities);
+        sb.append(", limit=").append(limit);
+        sb.append(", offset=").append(offset);
+        sb.append(", entityFilters=").append(entityFilters);
+        sb.append(", tagFilters=").append(tagFilters);
+        sb.append(", attributes=").append(attributes);
+        sb.append('}');
+        return sb.toString();
+    }
+
+
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class FilterCriteria {
+        // Can be presented as a group of conditions or a single condition
+        public enum Condition { AND, OR }
+
+        // Single condition
+        private String   attributeName;
+        private Operator operator;
+        private String   attributeValue;
+
+        // Complex conditions
+        private Condition            condition;
+        private List<FilterCriteria> criterion;
+
+        public String getAttributeName() {
+            return attributeName;
+        }
+
+        public void setAttributeName(String attributeName) {
+            this.attributeName = attributeName;
+        }
+
+        public Operator getOperator() {
+            return operator;
+        }
+
+        public void setOperator(Operator operator) {
+            this.operator = operator;
+        }
+
+        public String getAttributeValue() {
+            return attributeValue;
+        }
+
+        public void setAttributeValue(String attributeValue) {
+            this.attributeValue = attributeValue;
+        }
+
+        public Condition getCondition() {
+            return condition;
+        }
+
+        public void setCondition(Condition condition) {
+            this.condition = condition;
+        }
+
+        public List<FilterCriteria> getCriterion() {
+            return criterion;
+        }
+
+        public void setCriterion(List<FilterCriteria> criterion) {
+            this.criterion = criterion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            FilterCriteria that = (FilterCriteria) o;
+            return Objects.equals(attributeName, that.attributeName) &&
+                    Objects.equals(operator, that.operator) &&
+                    Objects.equals(attributeValue, that.attributeValue) &&
+                    condition == that.condition &&
+                    Objects.equals(criterion, that.criterion);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(attributeName, operator, attributeValue, condition, criterion);
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder("FilterCriteria{");
+            sb.append("attributeName='").append(attributeName).append('\'');
+            sb.append(", operator=").append(operator);
+            sb.append(", attributeValue='").append(attributeValue).append('\'');
+            sb.append(", condition=").append(condition);
+            sb.append(", criterion=").append(criterion);
+            sb.append('}');
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Supported search operations
+     * Logical comparision operators can only be used with numbers or dates
+     * IN, LIKE, startsWith, endsWith, CONTAINS can only be used with strings or text
+     */
+    public enum Operator {
+        LT(new String[]{"<", "lt"}),
+        GT(new String[]{">", "gt"}),
+        LTE(new String[]{"<=", "lte"}),
+        GTE(new String[]{">=", "gte"}),
+        EQ(new String[]{"eq", "="}),
+        NEQ(new String[]{"neq", "!="}),
+        IN(new String[]{"in", "IN"}),
+        LIKE(new String[]{"like", "LIKE"}),
+        STARTS_WITH(new String[]{"startsWith", "STARTSWITH", "begins_with", "BEGINS_WITH"}),
+        ENDS_WITH(new String[]{"endsWith", "ENDSWITH", "ends_with", "BEGINS_WITH"}),
+        CONTAINS(new String[]{"contains", "CONTAINS"})
+        ;
+        static final Map<String, Operator> operatorsMap = new HashMap<>();
+
+        private String[] symbols;
+
+        static  {
+            for (Operator operator : Operator.values()) {
+                for (String s : operator.symbols) {
+                    operatorsMap.put(s, operator);
+                }
+            }
+        }
+
+        Operator(String[] symbols) {
+            this.symbols = symbols;
+        }
+
+        @JsonCreator
+        public static Operator fromString(String symbol) {
+            return operatorsMap.get(symbol);
+        }
+
+        @JsonValue
+        public String getSymbol() {
+            return symbols[0];
+        }
+
+        public String[] getSymbols() {
+            return symbols;
+        }
+
+        @Override
+        public String toString() {
+            return getSymbol();
+        }
+    }
+}
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 21b6427..4d2ac62 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -18,14 +18,12 @@
 package org.apache.atlas.model.impexp;
 
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.codehaus.jackson.annotate.JsonAnySetter;
 import org.codehaus.jackson.annotate.JsonAutoDetect;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
@@ -33,12 +31,9 @@
 import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
 import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
 
-
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
 @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown=true)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.PROPERTY)
 public class AtlasImportRequest implements Serializable {
     private static final long   serialVersionUID = 1L;
     public  static final String TRANSFORMS_KEY   = "transforms";
@@ -97,4 +92,10 @@
 
         return (String) this.options.get(key);
     }
-}
+ @JsonAnySetter
+    public void setOption(String key, String value) {
+        if (null == options) {
+            options = new HashMap<>();
+        }
+        options.put(key, value);
+    }}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index 923a198..030a957 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -21,6 +21,7 @@
 
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 
 public interface AtlasDiscoveryService {
     /**
@@ -56,4 +57,12 @@
      */
     AtlasSearchResult searchUsingBasicQuery(String query, String type, String classification, String attrName,
                                             String attrValuePrefix, boolean excludeDeletedEntities, int limit, int offset) throws AtlasBaseException;
+
+    /**
+     * Search for entities matching the search criteria
+     * @param searchParameters Search criteria
+     * @return Matching entities
+     * @throws AtlasBaseException
+     */
+    AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException;
 }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 128cdbf..5068fa5 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -20,12 +20,14 @@
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
 import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
@@ -86,23 +88,28 @@
     private final EntityGraphRetriever            entityRetriever;
     private final AtlasGremlinQueryProvider       gremlinQueryProvider;
     private final AtlasTypeRegistry               typeRegistry;
+    private final SearchPipeline                  searchPipeline;
     private final int                             maxResultSetSize;
     private final int                             maxTypesCountInIdxQuery;
     private final int                             maxTagsCountInIdxQuery;
 
     @Inject
-    EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry, AtlasGraph graph) throws AtlasException {
+    EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry,
+                           AtlasGraph graph, SearchPipeline searchPipeline) throws AtlasException {
         this.graph                    = graph;
         this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
         this.entityRetriever          = new EntityGraphRetriever(typeRegistry);
         this.gremlinQueryProvider     = AtlasGremlinQueryProvider.INSTANCE;
         this.typeRegistry             = typeRegistry;
-        this.maxResultSetSize         = ApplicationProperties.get().getInt("atlas.graph.index.search.max-result-set-size", 150);
-        this.maxTypesCountInIdxQuery  = ApplicationProperties.get().getInt("atlas.graph.index.search.max-types-count", 10);
-        this.maxTagsCountInIdxQuery   = ApplicationProperties.get().getInt("atlas.graph.index.search.max-tags-count", 10);
+        this.searchPipeline           = searchPipeline;
+
+        this.maxResultSetSize         = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150);
+        this.maxTypesCountInIdxQuery  = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10);
+        this.maxTagsCountInIdxQuery   = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10);
     }
 
     @Override
+    @GraphTransaction
     public AtlasSearchResult searchUsingDslQuery(String dslQuery, int limit, int offset) throws AtlasBaseException {
         AtlasSearchResult ret = new AtlasSearchResult(dslQuery, AtlasQueryType.DSL);
         GremlinQuery gremlinQuery = toGremlinQuery(dslQuery, limit, offset);
@@ -155,6 +162,7 @@
     }
 
     @Override
+    @GraphTransaction
     public AtlasSearchResult searchUsingFullTextQuery(String fullTextQuery, boolean excludeDeletedEntities, int limit, int offset)
                                                       throws AtlasBaseException {
         AtlasSearchResult ret      = new AtlasSearchResult(fullTextQuery, AtlasQueryType.FULL_TEXT);
@@ -170,6 +178,7 @@
     }
 
     @Override
+    @GraphTransaction
     public AtlasSearchResult searchUsingBasicQuery(String query, String typeName, String classification, String attrName,
                                                    String attrValuePrefix, boolean excludeDeletedEntities, int limit,
                                                    int offset) throws AtlasBaseException {
@@ -393,6 +402,22 @@
         return ret;
     }
 
+    @Override
+    @GraphTransaction
+    public AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException {
+        AtlasSearchResult ret = new AtlasSearchResult(searchParameters);
+
+        List<AtlasVertex> resultList = searchPipeline.run(searchParameters);
+
+        for (AtlasVertex atlasVertex : resultList) {
+            AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, searchParameters.getAttributes());
+
+            ret.addEntity(entity);
+        }
+
+        return ret;
+    }
+
     private String getQueryForFullTextSearch(String userKeyedString, String typeName, String classification) {
         String typeFilter          = getTypeFilter(typeRegistry, typeName, maxTypesCountInIdxQuery);
         String classficationFilter = getClassificationFilter(typeRegistry, classification, maxTagsCountInIdxQuery);
@@ -548,4 +573,5 @@
     public int getMaxResultSetSize() {
         return maxResultSetSize;
     }
+
 }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index b07091a..3ae41c8 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -21,6 +21,7 @@
 
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
@@ -62,6 +63,7 @@
     }
 
     @Override
+    @GraphTransaction
     public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
         AtlasLineageInfo lineageInfo;
 
diff --git a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java
new file mode 100644
index 0000000..5565781
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
+import org.apache.atlas.model.discovery.SearchParameters.Operator;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.atlas.discovery.SearchPipeline.IndexResultType;
+import static org.apache.atlas.discovery.SearchPipeline.PipelineContext;
+import static org.apache.atlas.discovery.SearchPipeline.PipelineStep;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
+
+@Component
+public class GremlinStep implements PipelineStep {
+    private static final Logger LOG      = LoggerFactory.getLogger(GremlinStep.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep");
+
+    private final AtlasGraph        graph;
+    private final AtlasTypeRegistry typeRegistry;
+
+    enum GremlinFilterQueryType { TAG, ENTITY }
+
+    @Inject
+    public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
+        this.graph        = graph;
+        this.typeRegistry = typeRegistry;
+    }
+
+    @Override
+    public void execute(PipelineContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> GremlinStep.execute({})", context);
+        }
+
+        if (context == null) {
+            throw new AtlasBaseException("Can't start search without any context");
+        }
+
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context +  ")");
+        }
+
+        final Iterator<AtlasVertex> result;
+
+        if (context.hasIndexResults()) {
+            // We have some results from the indexed step, let's proceed accordingly
+            if (context.getIndexResultType() == IndexResultType.TAG) {
+                // Index search was done on tag and filters
+                if (context.isTagProcessingComplete()) {
+                    LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed");
+
+                    Set<String> taggedVertexGUIDs = new HashSet<>();
+
+                    Iterator<AtlasIndexQuery.Result> tagVertexIterator = context.getIndexResultsIterator();
+
+                    while (tagVertexIterator.hasNext()) {
+                        // Find out which Vertex has this outgoing edge
+                        AtlasVertex         vertex = tagVertexIterator.next().getVertex();
+                        Iterable<AtlasEdge> edges  = vertex.getEdges(AtlasEdgeDirection.IN);
+
+                        for (AtlasEdge edge : edges) {
+                            String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
+
+                            taggedVertexGUIDs.add(guid);
+                        }
+                    }
+
+                    // No entities are tagged  (actually this check is already done)
+                    if (!taggedVertexGUIDs.isEmpty()) {
+                        result = processEntity(taggedVertexGUIDs, context);
+                    } else {
+                        result = null;
+                    }
+                } else {
+                    result = processTagAndEntity(Collections.<String>emptySet(), context);
+                }
+            } else if (context.getIndexResultType() == IndexResultType.TEXT) {
+                // Index step processed full-text;
+                Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
+
+                result = processTagAndEntity(entityIDs, context);
+            } else if (context.getIndexResultType() == IndexResultType.ENTITY) {
+                // Index step processed entity and it's filters; tag filter wouldn't be set
+                Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
+
+                result = processEntity(entityIDs, context);
+            } else {
+                result = null;
+            }
+        } else {
+            // No index results, need full processing in Gremlin
+            if (context.getClassificationType() != null) {
+                // Process tag and filters first, then entity filters
+                result = processTagAndEntity(Collections.<String>emptySet(), context);
+            } else {
+                result = processEntity(Collections.<String>emptySet(), context);
+            }
+        }
+
+        context.setGremlinResultIterator(result);
+
+        AtlasPerfTracer.log(perf);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== GremlinStep.execute({})", context);
+        }
+    }
+
+    private Iterator<AtlasVertex> processEntity(Set<String> entityGUIDs, PipelineContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
+        }
+
+        final Iterator<AtlasVertex> ret;
+
+        SearchParameters searchParameters = context.getSearchParameters();
+        AtlasEntityType  entityType       = context.getEntityType();
+
+        if (entityType != null) {
+            AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER");
+
+            if (entityFilterQuery == null) {
+                entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes());
+
+                if (searchParameters.getEntityFilters() != null) {
+                    toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context);
+                }
+
+                if (searchParameters.getExcludeDeletedEntities()) {
+                    entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
+                }
+
+                context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery);
+            }
+
+            // Now get all vertices
+            if (CollectionUtils.isEmpty(entityGUIDs)) {
+                ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
+            } else {
+                AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
+
+                if (entityFilterQuery != null) {
+                    guidQuery.addConditionsFrom(entityFilterQuery);
+                } else if (searchParameters.getExcludeDeletedEntities()) {
+                    guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
+                }
+
+                ret = guidQuery.vertices(context.getMaxLimit()).iterator();
+            }
+        } else if (CollectionUtils.isNotEmpty(entityGUIDs)) {
+            AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
+
+            if (searchParameters.getExcludeDeletedEntities()) {
+                guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
+            }
+
+            Iterable<AtlasVertex> vertices = guidQuery.vertices(context.getMaxLimit());
+
+            ret = vertices.iterator();
+        } else {
+            ret = null;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
+        }
+
+        return ret;
+    }
+
+    private Iterator<AtlasVertex> processTagAndEntity(Set<String> entityGUIDs, PipelineContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
+        }
+
+        final Iterator<AtlasVertex> ret;
+
+        AtlasClassificationType classificationType = context.getClassificationType();
+
+        if (classificationType != null) {
+            AtlasGraphQuery  tagVertexQuery = context.getGraphQuery("TAG_VERTEX");
+
+            if (tagVertexQuery == null) {
+                tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
+
+                SearchParameters searchParameters = context.getSearchParameters();
+
+                // Do tag filtering first as it'll return a smaller subset of vertices
+                if (searchParameters.getTagFilters() != null) {
+                    toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context);
+                }
+
+                context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery);
+            }
+
+            if (tagVertexQuery != null) {
+                Set<String> taggedVertexGuids = new HashSet<>();
+                // Now get all vertices after adjusting offset for each iteration
+                LOG.debug("Firing TAG query");
+
+                Iterator<AtlasVertex> tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
+
+                while (tagVertexIterator.hasNext()) {
+                    // Find out which Vertex has this outgoing edge
+                    Iterable<AtlasEdge> edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN);
+                    for (AtlasEdge edge : edges) {
+                        String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
+                        taggedVertexGuids.add(guid);
+                    }
+                }
+
+                entityGUIDs = taggedVertexGuids;
+            }
+        }
+
+        if (!entityGUIDs.isEmpty()) {
+            ret = processEntity(entityGUIDs, context);
+        } else {
+            ret = null;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
+        }
+
+        return ret;
+    }
+
+    private Set<String> getVertexIDs(Iterator<AtlasIndexQuery.Result> idxResultsIterator) {
+        Set<String> guids = new HashSet<>();
+        while (idxResultsIterator.hasNext()) {
+            AtlasVertex vertex = idxResultsIterator.next().getVertex();
+            String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
+            guids.add(guid);
+        }
+        return guids;
+    }
+
+    private Set<String> getVertexIDs(Iterable<AtlasVertex> vertices) {
+        Set<String> guids = new HashSet<>();
+        for (AtlasVertex vertex : vertices) {
+            String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
+            guids.add(guid);
+        }
+        return guids;
+    }
+
+    private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria,
+                                                 AtlasGraphQuery query, PipelineContext context) {
+        if (criteria.getCondition() != null) {
+            if (criteria.getCondition() == Condition.AND) {
+                for (FilterCriteria filterCriteria : criteria.getCriterion()) {
+                    AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
+                    query.addConditionsFrom(nestedQuery);
+                }
+            } else {
+                List<AtlasGraphQuery> orConditions = new LinkedList<>();
+
+                for (FilterCriteria filterCriteria : criteria.getCriterion()) {
+                    AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
+                    // FIXME: Something might not be right here as the queries are getting overwritten sometimes
+                    orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery));
+                }
+
+                if (!orConditions.isEmpty()) {
+                    query.or(orConditions);
+                }
+            }
+        } else {
+            String   attrName  = criteria.getAttributeName();
+            String   attrValue = criteria.getAttributeValue();
+            Operator operator  = criteria.getOperator();
+
+            try {
+                // If attribute belongs to supertype then adjust the name accordingly
+                final String  qualifiedAttributeName;
+                final boolean attrProcessed;
+
+                if (queryType == GremlinFilterQueryType.TAG) {
+                    qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
+                    attrProcessed          = context.hasProcessedTagAttribute(qualifiedAttributeName);
+                } else {
+                    qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
+                    attrProcessed          = context.hasProcessedEntityAttribute(qualifiedAttributeName);
+                }
+
+                // Check if the qualifiedAttribute has been processed
+                if (!attrProcessed) {
+                    switch (operator) {
+                        case LT:
+                            query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue);
+                            break;
+                        case LTE:
+                            query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue);
+                            break;
+                        case GT:
+                            query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue);
+                            break;
+                        case GTE:
+                            query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue);
+                            break;
+                        case EQ:
+                            query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue);
+                            break;
+                        case NEQ:
+                            query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue);
+                            break;
+                        case LIKE:
+                            // TODO: Maybe we need to validate pattern
+                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue));
+                            break;
+                        case CONTAINS:
+                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue));
+                            break;
+                        case STARTS_WITH:
+                            query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue);
+                            break;
+                        case ENDS_WITH:
+                            query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue));
+                        case IN:
+                            LOG.warn("{}: unsupported operator. Ignored", operator);
+                            break;
+                    }
+                }
+            } catch (AtlasBaseException e) {
+                LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e);
+            }
+        }
+
+        return query;
+    }
+
+    private String getContainsRegex(String attributeValue) {
+        return ".*" + attributeValue + ".*";
+    }
+
+    private String getSuffixRegex(String attributeValue) {
+        return ".*" + attributeValue;
+    }
+
+    private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
+}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java b/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java
new file mode 100644
index 0000000..0f91b2d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
+import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.SearchTracker;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Component
+public class SearchPipeline {
+    private static final Logger LOG      = LoggerFactory.getLogger(SearchPipeline.class);
+    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("SearchPipeline");
+
+    enum ExecutionMode { SOLR, GREMLIN, MIXED }
+
+    enum IndexResultType { TAG, ENTITY, TEXT }
+
+    private final SolrStep                 solrStep;
+    private final GremlinStep              gremlinStep;
+    private final SearchTracker            searchTracker;
+    private final AtlasTypeRegistry        typeRegistry;
+    private final Configuration            atlasConfiguration;
+    private final GraphBackedSearchIndexer indexer;
+
+    @Inject
+    public SearchPipeline(SolrStep solrStep, GremlinStep gremlinStep, SearchTracker searchTracker, AtlasTypeRegistry typeRegistry, Configuration atlasConfiguration, GraphBackedSearchIndexer indexer) {
+        this.solrStep           = solrStep;
+        this.gremlinStep        = gremlinStep;
+        this.searchTracker      = searchTracker;
+        this.typeRegistry       = typeRegistry;
+        this.atlasConfiguration = atlasConfiguration;
+        this.indexer            = indexer;
+    }
+
+    public List<AtlasVertex> run(SearchParameters searchParameters) throws AtlasBaseException {
+        final List<AtlasVertex> ret;
+
+        AtlasPerfTracer perf = null;
+
+        if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+            perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "SearchPipeline.run("+ searchParameters +")");
+        }
+
+        AtlasEntityType         entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName());
+        AtlasClassificationType classiType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification());
+        PipelineContext         context    = new PipelineContext(searchParameters, entityType, classiType, indexer.getVertexIndexKeys());
+        String                  searchId   = searchTracker.add(context); // For future cancellation
+
+        try {
+            ExecutionMode mode = determineExecutionMode(context);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Execution mode {}", mode);
+            }
+
+            switch (mode) {
+                case SOLR:
+                    ret = runOnlySolr(context);
+                    break;
+
+                case GREMLIN:
+                    ret = runOnlyGremlin(context);
+                    break;
+
+                case MIXED:
+                    ret = runMixed(context);
+                    break;
+
+                default:
+                    ret = Collections.emptyList();
+            }
+        } finally {
+            searchTracker.remove(searchId);
+
+            AtlasPerfTracer.log(perf);
+        }
+
+        return ret;
+    }
+
+    private List<AtlasVertex> runOnlySolr(PipelineContext context) throws AtlasBaseException {
+        // Only when there's no tag and query
+        List<AtlasVertex> results = new ArrayList<>();
+
+        while (results.size() < context.getSearchParameters().getLimit()) {
+            if (context.getForceTerminate()) {
+                LOG.debug("search has been terminated");
+
+                break;
+            }
+
+            // Execute solr search only
+            solrStep.execute(context);
+
+            List<AtlasVertex> stepResults = getIndexResults(context);
+
+            context.incrementSearchRound();
+
+            addToResult(results, stepResults, context.getSearchParameters().getLimit());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
+            }
+
+            if (CollectionUtils.isEmpty(stepResults)) {
+                // If no result is found any subsequent iteration then just stop querying the index
+                break;
+            }
+        }
+
+        if (context.getIndexResultType() == IndexResultType.TAG) {
+            List<AtlasVertex> entityVertices = new ArrayList<>(results.size());
+
+            for (AtlasVertex tagVertex : results) {
+                Iterable<AtlasEdge> edges = tagVertex.getEdges(AtlasEdgeDirection.IN);
+
+                for (AtlasEdge edge : edges) {
+                    AtlasVertex entityVertex = edge.getOutVertex();
+
+                    entityVertices.add(entityVertex);
+                }
+            }
+
+            results = entityVertices;
+        }
+
+        return results;
+    }
+
+    private List<AtlasVertex> runOnlyGremlin(PipelineContext context) throws AtlasBaseException {
+        List<AtlasVertex> results = new ArrayList<>();
+
+        while (results.size() < context.getSearchParameters().getLimit()) {
+            if (context.getForceTerminate()) {
+                LOG.debug("search has been terminated");
+
+                break;
+            }
+
+            gremlinStep.execute(context);
+
+            List<AtlasVertex> stepResults = getGremlinResults(context);
+
+            context.incrementSearchRound();
+
+            addToResult(results, stepResults, context.getSearchParameters().getLimit());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
+            }
+
+            if (CollectionUtils.isEmpty(stepResults)) {
+                // If no result is found any subsequent iteration then just stop querying the index
+                break;
+            }
+        }
+
+        return results;
+    }
+
+    /*
+        1. Index processes few attributes and then gremlin processes rest
+            1.1 Iterate for gremlin till the index results are non null
+        2. Index processes all attributes, gremlin has nothing to do
+
+        Sometimes the result set might be less than the max limit and we need to iterate until the result set is full
+        or the iteration doesn't return any results
+
+     */
+    private List<AtlasVertex> runMixed(PipelineContext context) throws AtlasBaseException {
+        List<AtlasVertex> results = new ArrayList<>();
+
+        while (results.size() < context.getSearchParameters().getLimit()) {
+            if (context.getForceTerminate()) {
+                LOG.debug("search has been terminated");
+
+                break;
+            }
+
+            // Execute Solr search and then pass it to the Gremlin step (if needed)
+            solrStep.execute(context);
+
+            if (!context.hasIndexResults()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No index results in iteration {}", context.getIterationCount());
+                }
+
+                // If no result is found any subsequent iteration then just stop querying the index
+                break;
+            }
+
+            // Attributes partially processed by Solr, use gremlin to process remaining attribute(s)
+            gremlinStep.execute(context);
+
+            context.incrementSearchRound();
+
+            List<AtlasVertex> stepResults = getGremlinResults(context);
+
+            addToResult(results, stepResults, context.getSearchParameters().getLimit());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
+            }
+        }
+
+        return results;
+    }
+
+    private void addToResult(List<AtlasVertex> result, List<AtlasVertex> stepResult, int maxLimit) {
+        if (result != null && stepResult != null && result.size() < maxLimit) {
+            for (AtlasVertex vertex : stepResult) {
+                result.add(vertex);
+
+                if (result.size() >= maxLimit) {
+                    break;
+                }
+            }
+        }
+    }
+
+    private List<AtlasVertex> getIndexResults(PipelineContext pipelineContext) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (pipelineContext.hasIndexResults()) {
+            Iterator<AtlasIndexQuery.Result> iter = pipelineContext.getIndexResultsIterator();
+
+            while(iter.hasNext()) {
+                ret.add(iter.next().getVertex());
+            }
+        }
+
+        return ret;
+    }
+
+    private List<AtlasVertex> getGremlinResults(PipelineContext pipelineContext) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (pipelineContext.hasGremlinResults()) {
+            Iterator<AtlasVertex> iter = pipelineContext.getGremlinResultIterator();
+
+            while (iter.hasNext()) {
+                ret.add(iter.next());
+            }
+        }
+
+        return ret;
+    }
+
+    private ExecutionMode determineExecutionMode(PipelineContext context) {
+        SearchParameters        searchParameters   = context.getSearchParameters();
+        AtlasClassificationType classificationType = context.getClassificationType();
+        AtlasEntityType         entityType         = context.getEntityType();
+        int                     solrCount          = 0;
+        int                     gremlinCount       = 0;
+
+        if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
+            solrCount++;
+
+            // __state index only exists in vertex_index
+            if (searchParameters.getExcludeDeletedEntities()) {
+                gremlinCount++;
+            }
+        }
+
+        if (classificationType != null) {
+            Set<String> typeAndAllSubTypes = classificationType.getTypeAndAllSubTypes();
+
+            if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Classification type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
+                              classificationType.getTypeName(), typeAndAllSubTypes.size());
+                }
+
+                gremlinCount++;
+            } else {
+                if (hasNonIndexedAttrViolation(classificationType, context.getIndexedKeys(), searchParameters.getTagFilters())) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Tag filters not suitable for Solr search. Gremlin will be used to execute the search");
+                    }
+
+                    gremlinCount++;
+                } else {
+                    solrCount++;
+
+                    // __state index only exist in vertex_index
+                    if (searchParameters.getExcludeDeletedEntities()) {
+                        gremlinCount++;
+                    }
+                }
+            }
+        }
+
+        if (entityType != null) {
+            Set<String> typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
+
+            if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Entity type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
+                              entityType.getTypeName(), typeAndAllSubTypes.size());
+                }
+
+                gremlinCount++;
+            } else {
+                if (hasNonIndexedAttrViolation(entityType, context.getIndexedKeys(), searchParameters.getEntityFilters())) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Entity filters not suitable for Solr search. Gremlin will be used to execute the search");
+                    }
+
+                    gremlinCount++;
+                } else {
+                    solrCount++;
+                }
+            }
+        }
+
+        ExecutionMode mode = ExecutionMode.MIXED;
+
+        if (solrCount == 1 && gremlinCount == 0) {
+            mode = ExecutionMode.SOLR;
+        } else if (gremlinCount == 1 && solrCount == 0) {
+            mode = ExecutionMode.GREMLIN;
+        }
+
+        return mode;
+    }
+
+    // If Index can't process all attributes and any of the non-indexed attribute is present in OR nested within AND
+    // then the only way is Gremlin
+    // A violation (here) is defined as presence of non-indexed attribute within any OR clause nested under an AND clause
+    // the reason being that the index would not be able to process the nested OR attribute which might result in
+    // exclusion of valid result (vertex)
+    private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria) {
+        return hasNonIndexedAttrViolation(structType, indexKeys, filterCriteria, false);
+    }
+
+    private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria, boolean enclosedInOrCondition) {
+        if (filterCriteria == null) {
+            return false;
+        }
+
+        boolean              ret             = false;
+        Condition            filterCondition = filterCriteria.getCondition();
+        List<FilterCriteria> criterion       = filterCriteria.getCriterion();
+
+        if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) {
+            if (!enclosedInOrCondition) {
+                enclosedInOrCondition = filterCondition == Condition.OR;
+            }
+
+            // If we have nested criterion let's find any nested ORs with non-indexed attr
+            for (FilterCriteria criteria : criterion) {
+                ret |= hasNonIndexedAttrViolation(structType, indexKeys, criteria, enclosedInOrCondition);
+
+                if (ret) {
+                    break;
+                }
+            }
+        } else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) {
+            // If attribute qualified name doesn't exist in the vertex index we potentially might have a problem
+            try {
+                String qualifiedAttributeName = structType.getQualifiedAttributeName(filterCriteria.getAttributeName());
+
+                ret = CollectionUtils.isEmpty(indexKeys) || !indexKeys.contains(qualifiedAttributeName);
+
+                if (ret) {
+                    LOG.warn("search includes non-indexed attribute '{}'; might cause poor performance", qualifiedAttributeName);
+                }
+            } catch (AtlasBaseException e) {
+                LOG.warn(e.getMessage());
+
+                ret = true;
+            }
+        }
+
+        // return ret && enclosedInOrCondition;
+
+        return ret;
+    }
+
+    public interface PipelineStep {
+        void execute(PipelineContext context) throws AtlasBaseException;
+    }
+
+    public static class PipelineContext {
+        // TODO: See if anything can be cached in the context
+
+        private final SearchParameters        searchParameters;
+        private final AtlasEntityType         entityType;
+        private final AtlasClassificationType classificationType;
+        private final Set<String>             indexedKeys;
+
+        private int     iterationCount;
+        private boolean forceTerminate;
+        private int     currentOffset;
+        private int     maxLimit;
+
+        // Continuous processing stuff
+        private Set<String> tagSearchAttributes       = new HashSet<>();
+        private Set<String> entitySearchAttributes    = new HashSet<>();
+        private Set<String> tagAttrProcessedBySolr    = new HashSet<>();
+        private Set<String> entityAttrProcessedBySolr = new HashSet<>();
+
+        // Results related stuff
+        private IndexResultType                  indexResultType;
+        private Iterator<AtlasIndexQuery.Result> indexResultsIterator;
+        private Iterator<AtlasVertex>            gremlinResultIterator;
+
+        private Map<String, AtlasIndexQuery> cachedIndexQueries = new HashMap<>();
+        private Map<String, AtlasGraphQuery> cachedGraphQueries = new HashMap<>();
+
+        public PipelineContext(SearchParameters searchParameters, AtlasEntityType entityType, AtlasClassificationType classificationType, Set<String> indexedKeys) {
+            this.searchParameters   = searchParameters;
+            this.entityType         = entityType;
+            this.classificationType = classificationType;
+            this.indexedKeys        = indexedKeys;
+
+            currentOffset = searchParameters.getOffset();
+            maxLimit      = searchParameters.getLimit();
+        }
+
+        public SearchParameters getSearchParameters() {
+            return searchParameters;
+        }
+
+        public AtlasEntityType getEntityType() {
+            return entityType;
+        }
+
+        public AtlasClassificationType getClassificationType() {
+            return classificationType;
+        }
+
+        public Set<String> getIndexedKeys() { return indexedKeys; }
+
+        public int getIterationCount() {
+            return iterationCount;
+        }
+
+        public boolean getForceTerminate() {
+            return forceTerminate;
+        }
+
+        public void setForceTerminate(boolean forceTerminate) {
+            this.forceTerminate = forceTerminate;
+        }
+
+        public boolean hasProcessedTagAttribute(String attributeName) {
+            return tagAttrProcessedBySolr.contains(attributeName);
+        }
+
+        public boolean hasProcessedEntityAttribute(String attributeName) {
+            return entityAttrProcessedBySolr.contains(attributeName);
+        }
+
+        public Iterator<AtlasIndexQuery.Result> getIndexResultsIterator() {
+            return indexResultsIterator;
+        }
+
+        public void setIndexResultsIterator(Iterator<AtlasIndexQuery.Result> indexResultsIterator) {
+            this.indexResultsIterator = indexResultsIterator;
+        }
+
+        public Iterator<AtlasVertex> getGremlinResultIterator() {
+            return gremlinResultIterator;
+        }
+
+        public void setGremlinResultIterator(Iterator<AtlasVertex> gremlinResultIterator) {
+            this.gremlinResultIterator = gremlinResultIterator;
+        }
+
+        public boolean hasIndexResults() {
+            return null != indexResultsIterator && indexResultsIterator.hasNext();
+        }
+
+        public boolean hasGremlinResults() {
+            return null != gremlinResultIterator && gremlinResultIterator.hasNext();
+        }
+
+
+        public boolean isTagProcessingComplete() {
+            return CollectionUtils.isEmpty(tagSearchAttributes) ||
+                    CollectionUtils.isEqualCollection(tagSearchAttributes, tagAttrProcessedBySolr);
+        }
+
+        public boolean isEntityProcessingComplete() {
+            return CollectionUtils.isEmpty(entitySearchAttributes) ||
+                    CollectionUtils.isEqualCollection(entitySearchAttributes, entityAttrProcessedBySolr);
+        }
+
+        public boolean isProcessingComplete() {
+            return isTagProcessingComplete() && isEntityProcessingComplete();
+        }
+
+        public void incrementOffset(int increment) {
+            currentOffset += increment;
+        }
+
+        public void incrementSearchRound() {
+            iterationCount ++;
+            incrementOffset(searchParameters.getLimit());
+        }
+
+        public int getCurrentOffset() {
+            return currentOffset;
+        }
+
+        public boolean addTagSearchAttribute(String attribute) {
+            return tagSearchAttributes.add(attribute);
+        }
+
+        public boolean addProcessedTagAttribute(String attribute) {
+            return tagAttrProcessedBySolr.add(attribute);
+        }
+
+        public boolean addEntitySearchAttribute(String attribute) {
+            return tagSearchAttributes.add(attribute);
+        }
+
+        public boolean addProcessedEntityAttribute(String attribute) {
+            return entityAttrProcessedBySolr.add(attribute);
+        }
+
+        public void cacheGraphQuery(String name, AtlasGraphQuery graphQuery) {
+            cachedGraphQueries.put(name, graphQuery);
+        }
+
+        public void cacheIndexQuery(String name, AtlasIndexQuery indexQuery) {
+            cachedIndexQueries.put(name, indexQuery);
+        }
+
+        public AtlasIndexQuery getIndexQuery(String name){
+            return cachedIndexQueries.get(name);
+        }
+
+        public AtlasGraphQuery getGraphQuery(String name) {
+            return cachedGraphQueries.get(name);
+        }
+
+        public IndexResultType getIndexResultType() {
+            return indexResultType;
+        }
+
+        public void setIndexResultType(IndexResultType indexResultType) {
+            this.indexResultType = indexResultType;
+        }
+
+        public int getMaxLimit() {
+            return maxLimit;
+        }
+
+        @Override
+        public String toString() {
+            return new ToStringBuilder(this)
+                    .append("iterationCount", iterationCount)
+                    .append("forceTerminate", forceTerminate)
+                    .append("currentOffset", currentOffset)
+                    .append("maxLimit", maxLimit)
+                    .append("searchParameters", searchParameters)
+                    .append("tagSearchAttributes", tagSearchAttributes)
+                    .append("entitySearchAttributes", entitySearchAttributes)
+                    .append("tagAttrProcessedBySolr", tagAttrProcessedBySolr)
+                    .append("entityAttrProcessedBySolr", entityAttrProcessedBySolr)
+                    .append("indexResultType", indexResultType)
+                    .append("cachedIndexQueries", cachedIndexQueries)
+                    .append("cachedGraphQueries", cachedGraphQueries)
+                    .toString();
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java b/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java
new file mode 100644
index 0000000..6a5dd5a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.discovery;
+
+import org.apache.atlas.discovery.SearchPipeline.IndexResultType;
+import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
+import org.apache.atlas.discovery.SearchPipeline.PipelineStep;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.discovery.SearchParameters.Operator;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.type.*;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.atlas.model.discovery.SearchParameters.*;
+
+@Component
+public class SolrStep implements PipelineStep {
+    private static final Logger LOG = LoggerFactory.getLogger(SolrStep.class);
+
+    private static final Pattern STRAY_AND_PATTERN     = Pattern.compile("(AND\\s+)+\\)");
+    private static final Pattern STRAY_OR_PATTERN      = Pattern.compile("(OR\\s+)+\\)");
+    private static final Pattern STRAY_ELIPSIS_PATTERN = Pattern.compile("(\\(\\s*)\\)");
+    private static final String  AND_STR         = " AND ";
+    private static final String  EMPTY_STRING    = "";
+    private static final String  SPACE_STRING    = " ";
+    private static final String  BRACE_OPEN_STR  = "( ";
+    private static final String  BRACE_CLOSE_STR = " )";
+
+    private static final Map<Operator, String> operatorMap = new HashMap<>();
+
+    static
+    {
+        operatorMap.put(Operator.LT,"v.\"%s\": [* TO %s}");
+        operatorMap.put(Operator.GT,"v.\"%s\": {%s TO *]");
+        operatorMap.put(Operator.LTE,"v.\"%s\": [* TO %s]");
+        operatorMap.put(Operator.GTE,"v.\"%s\": [%s TO *]");
+        operatorMap.put(Operator.EQ,"v.\"%s\": %s");
+        operatorMap.put(Operator.NEQ,"v.\"%s\": (NOT %s)");
+        operatorMap.put(Operator.IN, "v.\"%s\": (%s)");
+        operatorMap.put(Operator.LIKE, "v.\"%s\": (%s)");
+        operatorMap.put(Operator.STARTS_WITH, "v.\"%s\": (%s*)");
+        operatorMap.put(Operator.ENDS_WITH, "v.\"%s\": (*%s)");
+        operatorMap.put(Operator.CONTAINS, "v.\"%s\": (*%s*)");
+    }
+
+    private final AtlasGraph graph;
+
+    @Inject
+    public SolrStep(AtlasGraph graph) {
+        this.graph = graph;
+    }
+
+    @Override
+    public void execute(PipelineContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> SolrStep.execute({})", context);
+        }
+
+        if (context == null) {
+            throw new AtlasBaseException("Can't start search without any context");
+        }
+
+        SearchParameters searchParameters = context.getSearchParameters();
+
+        final Iterator<AtlasIndexQuery.Result> result;
+
+        if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
+            result = executeAgainstFulltextIndex(context);
+        } else {
+            result = executeAgainstVertexIndex(context);
+        }
+
+        context.setIndexResultsIterator(result);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== SolrStep.execute({})", context);
+        }
+    }
+
+    private Iterator<AtlasIndexQuery.Result> executeAgainstFulltextIndex(PipelineContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> SolrStep.executeAgainstFulltextIndex()");
+        }
+
+        final Iterator<AtlasIndexQuery.Result> ret;
+
+        AtlasIndexQuery query = context.getIndexQuery("FULLTEXT");
+
+        if (query == null) {
+            // Compute only once
+            SearchParameters searchParameters = context.getSearchParameters();
+            String           indexQuery       = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery());
+
+            query = graph.indexQuery(Constants.FULLTEXT_INDEX, indexQuery);
+
+            context.cacheIndexQuery("FULLTEXT", query);
+        }
+
+        context.setIndexResultType(IndexResultType.TEXT);
+
+        ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== SolrStep.executeAgainstFulltextIndex()");
+        }
+
+        return ret;
+    }
+
+    private Iterator<AtlasIndexQuery.Result> executeAgainstVertexIndex(PipelineContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> SolrStep.executeAgainstVertexIndex()");
+        }
+
+        final Iterator<AtlasIndexQuery.Result> ret;
+
+        SearchParameters searchParameters = context.getSearchParameters();
+        AtlasIndexQuery  query            = context.getIndexQuery("VERTEX_INDEX");
+
+        if (query == null) {
+            StringBuilder solrQuery = new StringBuilder();
+
+            // If tag is specified then let's start processing using tag and it's attributes, entity filters will
+            // be pushed to Gremlin
+            if (context.getClassificationType() != null) {
+                context.setIndexResultType(IndexResultType.TAG);
+
+                constructTypeTestQuery(solrQuery, context.getClassificationType().getTypeAndAllSubTypes());
+                constructFilterQuery(solrQuery, context.getClassificationType(), searchParameters.getTagFilters(), context);
+            } else if (context.getEntityType() != null) {
+                context.setIndexResultType(IndexResultType.ENTITY);
+
+                constructTypeTestQuery(solrQuery, context.getEntityType().getTypeAndAllSubTypes());
+                constructFilterQuery(solrQuery, context.getEntityType(), searchParameters.getEntityFilters(), context);
+
+                // Set the status flag
+                if (searchParameters.getExcludeDeletedEntities()) {
+                    if (solrQuery.length() > 0) {
+                        solrQuery.append(" AND ");
+                    }
+
+                    solrQuery.append("v.\"__state\":").append("ACTIVE");
+                }
+            }
+
+            // No query was formed, doesn't make sense to do anything beyond this point
+            if (solrQuery.length() > 0) {
+                String validSolrQuery = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
+                validSolrQuery = STRAY_OR_PATTERN.matcher(validSolrQuery).replaceAll(")");
+                validSolrQuery = STRAY_ELIPSIS_PATTERN.matcher(validSolrQuery).replaceAll(EMPTY_STRING);
+
+                query = graph.indexQuery(Constants.VERTEX_INDEX, validSolrQuery);
+                context.cacheIndexQuery("VERTEX_INDEX", query);
+            }
+        }
+
+        // Execute solr query and return the index results in the context
+        if (query != null) {
+            ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
+        } else {
+            ret = null;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== SolrStep.executeAgainstVertexIndex()");
+        }
+
+        return ret;
+    }
+
+    private void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) {
+        String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING);
+
+        solrQuery.append("v.\"__typeName\": (")
+                .append(typeAndSubtypesString)
+                .append(")");
+    }
+
+    private void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, PipelineContext context) {
+        if (filterCriteria != null) {
+            LOG.debug("Processing Filters");
+
+            String filterQuery = toSolrQuery(type, filterCriteria, context);
+
+            if (StringUtils.isNotEmpty(filterQuery)) {
+                solrQuery.append(AND_STR).append(filterQuery);
+            }
+        }
+    }
+
+    private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context) {
+        return toSolrQuery(type, criteria, context, new StringBuilder());
+    }
+
+    private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context, StringBuilder sb) {
+        if (criteria.getCondition() != null && CollectionUtils.isNotEmpty(criteria.getCriterion())) {
+            StringBuilder nestedExpression = new StringBuilder();
+
+            for (FilterCriteria filterCriteria : criteria.getCriterion()) {
+                String nestedQuery = toSolrQuery(type, filterCriteria, context);
+
+                if (StringUtils.isNotEmpty(nestedQuery)) {
+                    if (nestedExpression.length() > 0) {
+                        nestedExpression.append(SPACE_STRING).append(criteria.getCondition()).append(SPACE_STRING);
+                    }
+
+                    nestedExpression.append(nestedQuery);
+                }
+            }
+
+            return nestedExpression.length() > 0 ? sb.append(BRACE_OPEN_STR).append(nestedExpression.toString()).append(BRACE_CLOSE_STR).toString() : EMPTY_STRING;
+        } else {
+            return toSolrExpression(type, criteria.getAttributeName(), criteria.getOperator(), criteria.getAttributeValue(), context);
+        }
+    }
+
+    private String toSolrExpression(AtlasStructType type, String attrName, Operator op, String attrVal, PipelineContext context) {
+        String ret = EMPTY_STRING;
+
+        try {
+            String    indexKey      = type.getQualifiedAttributeName(attrName);
+            AtlasType attributeType = type.getAttributeType(attrName);
+
+            switch (context.getIndexResultType()) {
+                case TAG:
+                    context.addTagSearchAttribute(indexKey);
+                    break;
+
+                case ENTITY:
+                    context.addEntitySearchAttribute(indexKey);
+                    break;
+
+                default:
+                    // Do nothing
+            }
+
+            if (attributeType != null && AtlasTypeUtil.isBuiltInType(attributeType.getTypeName()) && context.getIndexedKeys().contains(indexKey)) {
+                if (operatorMap.get(op) != null) {
+                    // If there's a chance of multi-value then we need some additional processing here
+                    switch (context.getIndexResultType()) {
+                        case TAG:
+                            context.addProcessedTagAttribute(indexKey);
+                            break;
+
+                        case ENTITY:
+                            context.addProcessedEntityAttribute(indexKey);
+                            break;
+                    }
+
+                    ret = String.format(operatorMap.get(op), indexKey, attrVal);
+                }
+            }
+        } catch (AtlasBaseException ex) {
+            LOG.warn(ex.getMessage());
+        }
+
+        return ret;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 35dbf6c..94b6092 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -68,8 +68,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
 
@@ -96,7 +98,10 @@
 
     //allows injection of a dummy graph for testing
     private IAtlasGraphProvider provider;
-    
+
+    private boolean     recomputeIndexedKeys = true;
+    private Set<String> vertexIndexKeys      = new HashSet<>();
+
     @Inject
     public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException {
         this(new AtlasGraphProvider(), ApplicationProperties.get(), typeRegistry);
@@ -130,6 +135,7 @@
             if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
                 LOG.info("Global indexes already exist for graph");
                 management.commit();
+
                 return;
             }
 
@@ -192,7 +198,6 @@
             throw new RepositoryException(t);
         }
     }
-   
 
     private void createFullTextIndex(AtlasGraphManagement management) {
         AtlasPropertyKey fullText =
@@ -247,6 +252,34 @@
         onAdd(dataTypes);
     }
 
+    public Set<String> getVertexIndexKeys() {
+        if (recomputeIndexedKeys) {
+            AtlasGraphManagement management = null;
+
+            try {
+                management = provider.get().getManagementSystem();
+            } catch (RepositoryException excp) {
+                LOG.error("failed to get indexedKeys from graph", excp);
+            }
+
+            if (management != null) {
+                recomputeIndexedKeys = false;
+
+                AtlasGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX);
+
+                Set<String> indexKeys = new HashSet<>();
+
+                for (AtlasPropertyKey fieldKey : vertexIndex.getFieldKeys()) {
+                    indexKeys.add(fieldKey.getName());
+                }
+
+                vertexIndexKeys = indexKeys;
+            }
+        }
+
+        return vertexIndexKeys;
+    }
+
     private void addIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) {
         if (typeDef instanceof AtlasEnumDef) {
             // Only handle complex types like Struct, Classification and Entity
@@ -577,6 +610,8 @@
     private void commit(AtlasGraphManagement management) throws IndexException {
         try {
             management.commit();
+
+            recomputeIndexedKeys = true;
         } catch (Exception e) {
             LOG.error("Index commit failed", e);
             throw new IndexException("Index commit failed ", e);
@@ -586,6 +621,8 @@
     private void rollback(AtlasGraphManagement management) throws IndexException {
         try {
             management.rollback();
+
+            recomputeIndexedKeys = true;
         } catch (Exception e) {
             LOG.error("Index rollback failed ", e);
             throw new IndexException("Index rollback failed ", e);
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 3411f8d..9221717 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -30,12 +30,14 @@
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
+import javax.inject.Inject;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 
-
+@Component
 public class ImportService {
     private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
 
@@ -46,6 +48,7 @@
     private long startTimestamp;
     private long endTimestamp;
 
+    @Inject
     public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.typeDefStore = typeDefStore;
         this.entityStore = entityStore;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 42bd58f..7b3f1e6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -39,7 +39,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Utility methods for Graph.
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index 9a8695a..a5b5730 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -46,10 +46,12 @@
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL;
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER;
@@ -123,7 +125,7 @@
     }
 
     public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException {
-        return entityVertex != null ? mapVertexToAtlasEntityHeader(entityVertex) : null;
+        return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
     }
 
     private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
@@ -185,6 +187,10 @@
     }
 
     private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException {
+        return mapVertexToAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
+    }
+
+    private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
         AtlasEntityHeader ret = new AtlasEntityHeader();
 
         String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
@@ -218,6 +224,20 @@
             if (displayText != null) {
                 ret.setDisplayText(displayText.toString());
             }
+
+            if (CollectionUtils.isNotEmpty(attributes)) {
+                for (String attrName : attributes) {
+                    if (ret.hasAttribute(attrName)) {
+                        continue;
+                    }
+
+                    Object attrValue = getVertexAttribute(entityVertex, entityType.getAttribute(attrName));
+
+                    if (attrValue != null) {
+                        ret.setAttribute(attrName, attrValue);
+                    }
+                }
+            }
         }
 
         return ret;
@@ -556,4 +576,8 @@
     private Object getVertexAttribute(AtlasVertex vertex, AtlasAttribute attribute) throws AtlasBaseException {
         return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null) : null;
     }
+
+    public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set<String> attributes) throws AtlasBaseException {
+        return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/util/SearchTracker.java b/repository/src/main/java/org/apache/atlas/util/SearchTracker.java
new file mode 100644
index 0000000..15a8c20
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/SearchTracker.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.util;
+
+import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@AtlasService
+public class SearchTracker {
+    private Map<String, PipelineContext> activeSearches = new HashMap<>();
+
+    /**
+     *
+     * @param context
+     */
+    public String add(PipelineContext context) {
+        String searchId = Thread.currentThread().getName();
+
+        activeSearches.put(searchId, context);
+
+        return searchId;
+    }
+
+    /**
+     *
+     * @param searchId
+     * @return
+     */
+    public PipelineContext terminate(String searchId) {
+        PipelineContext ret = null;
+
+        if (activeSearches.containsKey(searchId)) {
+            PipelineContext pipelineToTerminate = activeSearches.remove(searchId);
+
+            pipelineToTerminate.setForceTerminate(true);
+
+            ret = pipelineToTerminate;
+        }
+
+        return ret;
+    }
+
+    public PipelineContext remove(String id) {
+        return activeSearches.remove(id);
+    }
+
+    /**
+     *
+     * @return
+     */
+    public Set<String> getActiveSearches() {
+        return activeSearches.keySet();
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index fa2ac0d..d0da030 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -17,19 +17,14 @@
  */
 package org.apache.atlas;
 
+import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.matcher.Matchers;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.atlas.annotation.GraphTransaction;
-import org.apache.atlas.discovery.AtlasDiscoveryService;
-import org.apache.atlas.discovery.AtlasLineageService;
-import org.apache.atlas.discovery.DataSetLineageService;
-import org.apache.atlas.discovery.DiscoveryService;
-import org.apache.atlas.discovery.EntityDiscoveryService;
-import org.apache.atlas.discovery.EntityLineageService;
-import org.apache.atlas.discovery.LineageService;
+import org.apache.atlas.discovery.*;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
 import org.apache.atlas.graph.GraphSandboxUtil;
 import org.apache.atlas.listener.EntityChangeListener;
@@ -61,6 +56,7 @@
 import org.apache.atlas.typesystem.types.TypeSystem;
 import org.apache.atlas.typesystem.types.cache.TypeCache;
 import org.apache.atlas.util.AtlasRepositoryConfiguration;
+import org.apache.atlas.util.SearchTracker;
 import org.apache.commons.configuration.Configuration;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -76,7 +72,7 @@
     }
 
     // Test only DI modules
-    public static class TestOnlyModule extends com.google.inject.AbstractModule {
+    public static class TestOnlyModule extends AbstractModule {
 
         private static final Logger LOG = LoggerFactory.getLogger(TestOnlyModule.class);
 
@@ -147,6 +143,11 @@
             typeDefChangeListenerMultibinder.addBinding().to(DefaultMetadataService.class);
             typeDefChangeListenerMultibinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton();
 
+            bind(SearchPipeline.class).asEagerSingleton();
+            bind(SearchTracker.class).asEagerSingleton();
+            bind(SolrStep.class).asEagerSingleton();
+            bind(GremlinStep.class).asEagerSingleton();
+
             bind(AtlasEntityStore.class).to(AtlasEntityStoreV1.class);
             bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV1.class);
 
diff --git a/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java
index 5d5b043..dfb2ee2 100644
--- a/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.services;
 
+import org.apache.atlas.TestModules;
 import org.apache.atlas.discovery.EntityDiscoveryService;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
@@ -24,12 +25,16 @@
 import org.apache.commons.lang.StringUtils;
 import org.powermock.reflect.Whitebox;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import javax.inject.Inject;
+
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
+@Guice(modules = TestModules.TestOnlyModule.class)
 public class EntityDiscoveryServiceTest {
 
     private final String TEST_TYPE                = "test";
@@ -47,6 +52,9 @@
 
     private final int maxTypesCountInIdxQuery = 10;
 
+    @Inject
+    EntityDiscoveryService discoveryService;
+
 
     @BeforeClass
     public void init() throws AtlasBaseException {
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java
index cdb9064..8e74d39 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java
@@ -50,7 +50,7 @@
     @Override
     public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain)
     throws IOException, ServletException {
-        LOG.info("Cleaning stale transactions");
+        LOG.debug("Cleaning stale transactions");
         AtlasGraphProvider.getGraphInstance().rollback();
         filterChain.doFilter(request, response);
     }
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 8c5623f..1a9f57a 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -25,6 +25,7 @@
 import org.apache.atlas.authorize.AtlasActionTypes;
 import org.apache.atlas.authorize.AtlasResourceTypes;
 import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
+import org.apache.atlas.discovery.SearchPipeline;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
@@ -35,11 +36,9 @@
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.ZipSink;
 import org.apache.atlas.repository.impexp.ZipSource;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
-import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.SearchTracker;
 import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.Servlets;
@@ -51,7 +50,6 @@
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
@@ -62,9 +60,11 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
@@ -109,14 +109,10 @@
 
     private final ServiceState      serviceState;
     private final MetricsService    metricsService;
-    private final AtlasTypeRegistry typeRegistry;
-    private final AtlasTypeDefStore typesDefStore;
-    private final AtlasEntityStore  entityStore;
     private static Configuration atlasProperties;
     private final ExportService exportService;
-
-    @Inject
-    ApplicationContext applicationContext;
+    private final ImportService importService;
+    private final SearchTracker activeSearches;
 
     static {
         try {
@@ -128,15 +124,13 @@
 
     @Inject
     public AdminResource(ServiceState serviceState, MetricsService metricsService,
-                         AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore,
-                         AtlasEntityStore entityStore, ExportService exportService) {
+                         ExportService exportService, ImportService importService, SearchTracker activeSearches) {
         this.serviceState               = serviceState;
         this.metricsService             = metricsService;
-        this.typeRegistry               = typeRegistry;
-        this.typesDefStore              = typeDefStore;
-        this.entityStore                = entityStore;
         this.exportService = exportService;
-        this.importExportOperationLock  = new ReentrantLock();
+        this.importService = importService;
+        this.activeSearches = activeSearches;
+        importExportOperationLock = new ReentrantLock();
     }
 
     /**
@@ -377,7 +371,6 @@
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
-            ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
             ZipSource zipSource = new ZipSource(inputStream);
 
             result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
@@ -412,7 +405,6 @@
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
-            ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
             result = importService.run(request, Servlets.getUserName(httpServletRequest),
                                        Servlets.getHostName(httpServletRequest),
                                        AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
@@ -431,6 +423,21 @@
         return result;
     }
 
+    @GET
+    @Path("activeSearches")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public Set<String> getActiveSearches() {
+        return activeSearches.getActiveSearches();
+    }
+
+    @DELETE
+    @Path("activeSearches/{id}")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public boolean terminateActiveSearch(@PathParam("id") String searchId) {
+        SearchPipeline.PipelineContext terminate = activeSearches.terminate(searchId);
+        return null != terminate;
+    }
+
     private String getEditableEntityTypes(Configuration config) {
         String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
 
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index ea55021..dde300e 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -21,8 +21,10 @@
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.discovery.AtlasDiscoveryService;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Service;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -31,6 +33,7 @@
 import javax.inject.Singleton;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
@@ -213,6 +216,50 @@
         }
     }
 
+    /**
+     * Attribute based search for entities satisfying the search parameters
+     * @param parameters Search parameters
+     * @return Atlas search result
+     * @throws AtlasBaseException
+     *
+     * @HTTP 200 On successful search
+     * @HTTP 400 Tag/Entity doesn't exist or Tag/entity filter is present without tag/type name
+     */
+    @Path("basic")
+    @POST
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public AtlasSearchResult searchWithParameters(SearchParameters parameters) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters("+ parameters + ")");
+            }
+
+            if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
+            }
+
+            if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
+            }
+
+            if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
+            }
+
+            return atlasDiscoveryService.searchUsingBasicQuery(parameters);
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
+    private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
+        return filterCriteria == null ||
+               (StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
+    }
+
     private String escapeTypeName(String typeName) {
         String ret;
 
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 1fe3119..c0bbf09 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -48,7 +48,7 @@
 
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
         Response response = adminResource.getStatus();
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
         JSONObject entity = (JSONObject) response.getEntity();
@@ -59,7 +59,7 @@
     public void testResourceGetsValueFromServiceState() throws JSONException {
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
         Response response = adminResource.getStatus();
 
         verify(serviceState).getState();