OAK-9311 oak-search-elastic: use low-level client for critical operations

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1885376 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 49c54f0..6df8151 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -207,6 +207,18 @@
       <scope>provided</scope>
     </dependency>
 
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Test Dependencies -->
     <dependency>
       <groupId>ch.qos.logback</groupId>
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
index b32d334..b57b89e 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
@@ -44,6 +44,8 @@
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.search.WildcardQuery;
 import org.apache.lucene.search.join.ScoreMode;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.MatchBoolPrefixQueryBuilder;
@@ -61,6 +63,7 @@
 import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
@@ -106,6 +109,8 @@
 import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
 import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_SCORE;
 import static org.apache.jackrabbit.util.ISO8601.parse;
+import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
@@ -257,6 +262,26 @@
         return list;
     }
 
+    /**
+     * Receives a {@link SearchSourceBuilder} as input and converts it to a low level {@link Request} reducing the response
+     * in order to reduce size and improve speed.
+     * https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#common-options-response-filtering
+     * @param searchSourceBuilder the search request
+     * @param indexName the index to query
+     * @return a low level {@link Request} instance
+     */
+    public Request createLowLevelRequest(SearchSourceBuilder searchSourceBuilder, String indexName) {
+        String endpoint = "/" + indexName
+                + "/_search?filter_path=took,timed_out,hits.total.value,hits.hits._score,hits.hits.sort,,hits.hits._source,aggregations";
+        Request request = new Request("POST", endpoint);
+        try {
+            request.setJsonEntity(Strings.toString(searchSourceBuilder.toXContent(jsonBuilder(), EMPTY_PARAMS)));
+        } catch (IOException e) {
+            throw new IllegalStateException("Error creating request entity", e);
+        }
+        return request;
+    }
+
     public String getPropertyRestrictionQuery() {
         return propertyRestrictionQuery;
     }
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
index 2be9bf4..7eff471 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
@@ -16,15 +16,21 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
 import org.apache.jackrabbit.oak.spi.query.Filter;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.search.SearchHit;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -34,6 +40,12 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ElasticResponseHandler.class);
 
+    private static final ObjectMapper JSON_MAPPER;
+    static {
+        JSON_MAPPER = new ObjectMapper();
+        JSON_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
     private final PlanResult planResult;
     private final Filter filter;
 
@@ -42,10 +54,16 @@
         this.filter = filter;
     }
 
-    public String getPath(SearchHit hit) {
-        final Map<String, Object> sourceMap = hit.getSourceAsMap();
-        String path = (String) sourceMap.get(FieldNames.PATH);
+    public String getPath(SearchResponseHit hit) {
+        return transformPath((String) hit.source.get(FieldNames.PATH));
+    }
 
+    public String getPath(SearchHit hit) {
+        Map<String, Object> sourceMap = hit.getSourceAsMap();
+        return  transformPath((String) sourceMap.get(FieldNames.PATH));
+    }
+
+    private String transformPath(String path) {
         if ("".equals(path)) {
             path = "/";
         }
@@ -65,4 +83,92 @@
     public boolean isAccessible(String path) {
         return filter.isAccessible(path);
     }
+
+    public SearchResponse parse(Response response) throws IOException {
+        return JSON_MAPPER.readValue(response.getEntity().getContent(), SearchResponse.class);
+    }
+
+    // POJO for Elastic json deserialization
+
+    public static class SearchResponse {
+
+        public final long took;
+        public final boolean timedOut;
+        public final SearchResponseHits hits;
+        public final Map<String, AggregationBuckets> aggregations;
+
+        @JsonCreator
+        public SearchResponse(@JsonProperty("took") long took, @JsonProperty("timed_out") boolean timedOut,
+                              @JsonProperty("hits") SearchResponseHits hits,
+                              @JsonProperty("aggregations") Map<String, AggregationBuckets> aggregations) {
+            this.took = took;
+            this.timedOut = timedOut;
+            this.hits = hits;
+            this.aggregations = aggregations;
+        }
+
+    }
+
+    public static class SearchResponseHits {
+
+        public final SearchResponseHitsTotal total;
+        public final SearchResponseHit[] hits;
+
+        @JsonCreator
+        public SearchResponseHits(@JsonProperty("total") SearchResponseHitsTotal total,
+                                  @JsonProperty("hits") SearchResponseHit[] hits) {
+            this.total = total;
+            this.hits = hits;
+        }
+
+    }
+
+    public static class SearchResponseHit {
+
+        public final Map<String, Object> source;
+        public final Object[] sort;
+        public final double score;
+
+        @JsonCreator
+        public SearchResponseHit(@JsonProperty("_source") Map<String, Object> source,
+                                 @JsonProperty("sort") Object[] sort, @JsonProperty("_score") double score) {
+            this.source = source;
+            this.sort = sort;
+            this.score = score;
+        }
+
+    }
+
+    public static class SearchResponseHitsTotal {
+        public final long value;
+
+        @JsonCreator
+        public SearchResponseHitsTotal(@JsonProperty("value") long value) {
+            this.value = value;
+        }
+    }
+
+    public static class AggregationBuckets {
+
+        public final AggregationBucket[] buckets;
+
+        @JsonCreator
+        public AggregationBuckets(@JsonProperty("buckets") AggregationBucket[] buckets) {
+            this.buckets = buckets;
+        }
+
+    }
+
+    public static class AggregationBucket {
+
+        public final Object key;
+        public final int count;
+
+        @JsonCreator
+        public AggregationBucket(@JsonProperty("key") Object key, @JsonProperty("doc_count") int count) {
+            this.key = key;
+            this.count = count;
+        }
+
+    }
 }
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
index 2c8cc21..2093a4a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
@@ -16,11 +16,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
 
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -45,7 +45,7 @@
     void endData();
 
     /**
-     * {@link ElasticResponseListener} extension to subscribe on {@link SearchHit} events
+     * {@link ElasticResponseListener} extension to subscribe on response hit events
      */
     interface SearchHitListener extends ElasticResponseListener {
 
@@ -64,20 +64,20 @@
         default void startData(long totalHits) { /*empty*/ }
 
         /**
-         * This method is called for each {@link SearchHit} retrieved
+         * This method is called for each {@link ElasticResponseHandler.SearchResponseHit} retrieved
          */
-        void on(SearchHit searchHit);
+        void on(ElasticResponseHandler.SearchResponseHit searchHit);
     }
 
     /**
-     * {@link ElasticResponseListener} extension to subscribe on {@link Aggregations} events
+     * {@link ElasticResponseListener} extension to subscribe on aggregations events
      */
     interface AggregationListener extends ElasticResponseListener {
 
         /**
-         * This method is called once when the {@link Aggregations} are retrieved
-         * @param aggregations the {@link Aggregations} or {@code null} if there are no results
+         * This method is called once when the aggregations are retrieved
+         * @param aggregations the {@link Map} with aggregations or {@code null} if there are no results
          */
-        void on(Aggregations aggregations);
+        void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations);
     }
 }
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index c3d7344..413aeb5 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -25,20 +25,18 @@
 import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -128,7 +126,7 @@
     }
 
     @Override
-    public void on(SearchHit searchHit) {
+    public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
         final String path = elasticResponseHandler.getPath(searchHit);
         if (path != null) {
             if (rowInclusionPredicate != null && !rowInclusionPredicate.test(path)) {
@@ -136,7 +134,7 @@
                 return;
             }
             try {
-                queue.put(new FulltextResultRow(path, searchHit.getScore(), null, elasticFacetProvider, null));
+                queue.put(new FulltextResultRow(path, searchHit.score, null, elasticFacetProvider, null));
             } catch (InterruptedException e) {
                 throw new IllegalStateException("Error producing results into the iterator queue", e);
             }
@@ -160,13 +158,13 @@
             listeners.add(elasticFacetProvider);
         }
 
-        return new ElasticQueryScanner(elasticRequestHandler, listeners);
+        return new ElasticQueryScanner(listeners);
     }
 
     /**
      * Scans Elastic results asynchronously and notify listeners.
      */
-    class ElasticQueryScanner implements ActionListener<SearchResponse> {
+    class ElasticQueryScanner implements ResponseListener {
 
         private static final int SMALL_RESULT_SET_SIZE = 10;
 
@@ -194,14 +192,13 @@
 
         private final ElasticMetricHandler elasticMetricHandler = indexNode.getElasticMetricHandler();
 
-        ElasticQueryScanner(ElasticRequestHandler requestHandler,
-                            List<ElasticResponseListener> listeners) {
-            this.query = requestHandler.baseQuery();
-            this.sorts = requestHandler.baseSorts();
+        ElasticQueryScanner(List<ElasticResponseListener> listeners) {
+            this.query = elasticRequestHandler.baseQuery();
+            this.sorts = elasticRequestHandler.baseSorts();
 
-            final Set<String> sourceFieldsSet = new HashSet<>();
-            final AtomicBoolean needsAggregations = new AtomicBoolean(false);
-            final Consumer<ElasticResponseListener> register = (listener) -> {
+            Set<String> sourceFieldsSet = new HashSet<>();
+            AtomicBoolean needsAggregations = new AtomicBoolean(false);
+            Consumer<ElasticResponseListener> register = (listener) -> {
                 allListeners.add(listener);
                 sourceFieldsSet.addAll(listener.sourceFields());
                 if (listener instanceof SearchHitListener) {
@@ -220,7 +217,7 @@
             listeners.forEach(register);
             this.sourceFields = sourceFieldsSet.toArray(new String[0]);
 
-            final SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
+            SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
                     .query(query)
                     // use a smaller size when the query contains aggregations. This improves performance
                     // when the client is only interested in insecure facets
@@ -230,18 +227,18 @@
             this.sorts.forEach(searchSourceBuilder::sort);
 
             if (needsAggregations.get()) {
-                requestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
+                elasticRequestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
             }
 
-            final SearchRequest searchRequest = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
-                    .source(searchSourceBuilder);
-
             LOG.trace("Kicking initial search for query {}", searchSourceBuilder);
             semaphore.tryAcquire();
 
             searchStartTime = System.currentTimeMillis();
             requests++;
-            indexNode.getConnection().getClient().searchAsync(searchRequest, RequestOptions.DEFAULT, this);
+
+            Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
+                    indexNode.getDefinition().getRemoteIndexAlias());
+            indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
             elasticMetricHandler.markQuery(true);
         }
 
@@ -253,16 +250,22 @@
          * these data structures are modified before releasing the semaphore.
          */
         @Override
-        public void onResponse(SearchResponse searchResponse) {
+        public void onSuccess(Response response) {
             long searchTotalTime = System.currentTimeMillis() - searchStartTime;
-            SearchHit[] searchHits = searchResponse.getHits().getHits();
+            ElasticResponseHandler.SearchResponse searchResponse;
+            try {
+                searchResponse = elasticResponseHandler.parse(response);
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to parse response", e);
+            }
+
+            ElasticResponseHandler.SearchResponseHit[] searchHits = searchResponse.hits.hits;
             int hitsSize = searchHits != null ? searchHits.length : 0;
-            TimeValue responseTook = searchResponse.getTook();
-            elasticMetricHandler.measureQuery(hitsSize, responseTook.getMillis(), searchTotalTime, searchResponse.isTimedOut());
+            elasticMetricHandler.measureQuery(hitsSize, searchResponse.took, searchTotalTime, searchResponse.timedOut);
             if (hitsSize > 0) {
-                long totalHits = searchResponse.getHits().getTotalHits().value;
-                LOG.debug("Processing search response that took {} to read {}/{} docs", responseTook, hitsSize, totalHits);
-                lastHitSortValues = searchHits[hitsSize - 1].getSortValues();
+                long totalHits = searchResponse.hits.total.value;
+                LOG.debug("Processing search response that took {} to read {}/{} docs", searchResponse.took, hitsSize, totalHits);
+                lastHitSortValues = searchHits[hitsSize - 1].sort;
                 scannedRows += hitsSize;
                 anyDataLeft.set(totalHits > scannedRows);
                 estimator.update(indexPlan.getFilter(), totalHits);
@@ -276,16 +279,15 @@
                     }
 
                     if (!aggregationListeners.isEmpty()) {
-                        Aggregations aggregations = searchResponse.getAggregations();
-                        LOG.trace("Emitting aggregations {}", aggregations);
+                        LOG.trace("Emitting aggregations {}", searchResponse.aggregations);
                         for (AggregationListener l : aggregationListeners) {
-                            l.on(aggregations);
+                            l.on(searchResponse.aggregations);
                         }
                     }
                 }
 
                 LOG.trace("Emitting {} search hits, for a total of {} scanned results", searchHits.length, scannedRows);
-                for (SearchHit hit : searchHits) {
+                for (ElasticResponseHandler.SearchResponseHit hit : searchHits) {
                     for (SearchHitListener l : searchHitListeners) {
                         l.on(hit);
                     }
@@ -329,7 +331,9 @@
                 LOG.trace("Kicking new search after query {}", searchRequest.source());
 
                 searchStartTime = System.currentTimeMillis();
-                indexNode.getConnection().getClient().searchAsync(searchRequest, RequestOptions.DEFAULT, this);
+                Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
+                        indexNode.getDefinition().getRemoteIndexAlias());
+                indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
                 elasticMetricHandler.markQuery(false);
             } else {
                 LOG.trace("Scanner is closing or still processing data from the previous scan");
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
index 2327593..d641795 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
@@ -16,15 +16,15 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
 
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
 import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -37,7 +37,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ElasticInsecureFacetAsyncProvider.class);
 
-    private Aggregations aggregations;
+    private Map<String, ElasticResponseHandler.AggregationBuckets> aggregations;
 
     private final CountDownLatch latch = new CountDownLatch(1);
 
@@ -49,20 +49,20 @@
         } catch (InterruptedException e) {
             throw new IllegalStateException("Error while waiting for facets", e);
         }
-        LOG.trace("Reading facets for {} from aggregations {}", columnName, aggregations.asMap());
+        LOG.trace("Reading facets for {} from aggregations {}", columnName, aggregations);
         if (aggregations != null) {
             final String facetProp = FulltextIndex.parseFacetField(columnName);
-            Terms terms = aggregations.get(facetProp);
-            List<FulltextIndex.Facet> facets = new ArrayList<>(terms.getBuckets().size());
-            for (Terms.Bucket bucket : terms.getBuckets()) {
-                facets.add(new FulltextIndex.Facet(bucket.getKeyAsString(), (int) bucket.getDocCount()));
+            ElasticResponseHandler.AggregationBuckets terms = aggregations.get(facetProp);
+            List<FulltextIndex.Facet> facets = new ArrayList<>(terms.buckets.length);
+            for (ElasticResponseHandler.AggregationBucket bucket : terms.buckets) {
+                facets.add(new FulltextIndex.Facet(bucket.key.toString(), bucket.count));
             }
             return facets;
         } else return null;
     }
 
     @Override
-    public void on(Aggregations aggregations) {
+    public void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations) {
         this.aggregations = aggregations;
         this.endData();
     }
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
index c84e2ec..a13051c 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
@@ -20,7 +20,6 @@
 import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
 import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,12 +69,11 @@
     }
 
     @Override
-    public void on(SearchHit searchHit) {
+    public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
         final String path = elasticResponseHandler.getPath(searchHit);
         if (path != null && isAccessible.test(path)) {
-            Map<String, Object> sourceMap = searchHit.getSourceAsMap();
             for (String field: facetFields) {
-                Object value = sourceMap.get(field);
+                Object value = searchHit.source.get(field);
                 if (value != null) {
                     facetsMap.compute(field, (column, facetValues) -> {
                         if (facetValues == null) {
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
index 340bc94..25569e9 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
@@ -20,9 +20,6 @@
 import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
 import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -70,7 +67,7 @@
     }
 
     @Override
-    public void on(SearchHit searchHit) {
+    public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
         if (totalHits < sampleSize) {
             super.on(searchHit);
         } else {
@@ -91,13 +88,13 @@
     }
 
     @Override
-    public void on(Aggregations aggregations) {
+    public void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations) {
         for (String field: facetFields) {
-            Terms terms = aggregations.get(field);
-            List<? extends Terms.Bucket> buckets = terms.getBuckets();
-            final List<FulltextIndex.Facet> facetList = new ArrayList<>();
-            for (Terms.Bucket bucket : buckets) {
-                facetList.add(new FulltextIndex.Facet(bucket.getKeyAsString(), (int) bucket.getDocCount()));
+            ElasticResponseHandler.AggregationBuckets terms = aggregations.get(field);
+            ElasticResponseHandler.AggregationBucket[] buckets = terms.buckets;
+            final List<FulltextIndex.Facet> facetList = new ArrayList<>(buckets.length);
+            for (ElasticResponseHandler.AggregationBucket bucket : buckets) {
+                facetList.add(new FulltextIndex.Facet(bucket.key.toString(), bucket.count));
             }
             facetMap.put(field, facetList);
         }
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
index 51573b2..ec471e9 100644
--- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
@@ -330,8 +330,6 @@
         assertEquals("Unexpected number of facets", actualAclLabelCount.size(), facets.size());
 
         for (Map.Entry<String, Integer> facet : actualAclLabelCount.entrySet()) {
-
-
             String facetLabel = facet.getKey();
             int facetCount = getFacets().get(facetLabel);
             float ratio = ((float) facetCount) / facet.getValue();
@@ -365,7 +363,6 @@
         assertEquals("Unexpected number of facets", actualLabelCount.size(), facets.size());
 
         for (Map.Entry<String, Integer> facet : actualLabelCount.entrySet()) {
-
             String facetLabel = facet.getKey();
             int facetCount = getFacets().get(facetLabel);
             float ratio = ((float) facetCount) / facet.getValue();
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
index 06bbbbd..3af557e 100644
--- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
@@ -77,48 +77,48 @@
     public void testFullTextSingleQuery() throws Exception {
         createTestData();
         String query = "//*[jcr:contains(@text, 'elit" + (NUM_NODES / 2) + "')] ";
-        long startTest = LOG_PERF.start("Starting test executions");
+        long startTest = LOG_PERF.startForInfoLog("Starting test executions");
         for (int j = 0; j < NUM_ITERATIONS; j++) {
             testQuery(query, XPATH);
         }
-        LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+        LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
     }
 
     // Executes different queries each time
     @Test
     public void testFullTextMultiQuery() throws Exception {
         createTestData();
-        long startTest = LOG_PERF.start("Starting test executions");
+        long startTest = LOG_PERF.startForInfoLog("Starting test executions");
         Random rndm = new Random(42);
         for (int j = 0; j < NUM_ITERATIONS; j++) {
             String query = "//*[jcr:contains(@text, 'elit" + rndm.nextInt(NUM_NODES) + "')] ";
             testQuery(query, XPATH);
         }
-        LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+        LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
     }
 
     @Test
     public void testFullTextMultiQueryWithExtraText() throws Exception {
         Random randomText = new Random(42);
         createTestData(() -> ElasticTestUtils.randomString(randomText, 1000));
-        long startTest = LOG_PERF.start("Starting test executions");
+        long startTest = LOG_PERF.startForInfoLog("Starting test executions");
         Random rndm = new Random(42);
         for (int j = 0; j < NUM_ITERATIONS; j++) {
             String query = "//*[jcr:contains(@text, 'elit" + rndm.nextInt(NUM_NODES) + "')] ";
             testQuery(query, XPATH);
         }
-        LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+        LOG_PERF.end(startTest, -1, -1, "{} iterations of tests completed", NUM_ITERATIONS);
     }
 
     @Test
     public void testPropertySingleQuery() throws Exception {
         createTestData();
         String query = "select [jcr:path] from [nt:base] where [title] = 'Title for node0'";
-        long startTest = LOG_PERF.start("Starting test executions");
+        long startTest = LOG_PERF.startForInfoLog("Starting test executions");
         for (int j = 0; j < NUM_ITERATIONS; j++) {
             testQuery(query, SQL2);
         }
-        LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+        LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
 
     }
 
@@ -126,13 +126,13 @@
     @Test
     public void testPropertyMultiQuery() throws Exception {
         createTestData();
-        long startTest = LOG_PERF.start("Starting test executions");
+        long startTest = LOG_PERF.startForInfoLog("Starting test executions");
         Random rndm = new Random(42);
         for (int j = 0; j < NUM_ITERATIONS; j++) {
             String query = "select [jcr:path] from [nt:base] where [title] = 'Title for node" + rndm.nextInt(NUM_NODES) + "'";
             testQuery(query, SQL2);
         }
-        LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+        LOG_PERF.end(startTest, -1, -1, "{} iterations of tests completed", NUM_ITERATIONS);
     }
 
     private void createTestData() throws Exception {