OAK-9311 oak-search-elastic: use low-level client for critical operations
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1885376 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 49c54f0..6df8151 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -207,6 +207,18 @@
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Test Dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
index b32d334..b57b89e 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
@@ -44,6 +44,8 @@
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.search.join.ScoreMode;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchBoolPrefixQueryBuilder;
@@ -61,6 +63,7 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
@@ -106,6 +109,8 @@
import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_SCORE;
import static org.apache.jackrabbit.util.ISO8601.parse;
+import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
@@ -257,6 +262,26 @@
return list;
}
+ /**
+ * Receives a {@link SearchSourceBuilder} as input and converts it to a low level {@link Request} reducing the response
+ * in order to reduce size and improve speed.
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#common-options-response-filtering
+ * @param searchSourceBuilder the search request
+ * @param indexName the index to query
+ * @return a low level {@link Request} instance
+ */
+ public Request createLowLevelRequest(SearchSourceBuilder searchSourceBuilder, String indexName) {
+ String endpoint = "/" + indexName
+ + "/_search?filter_path=took,timed_out,hits.total.value,hits.hits._score,hits.hits.sort,,hits.hits._source,aggregations";
+ Request request = new Request("POST", endpoint);
+ try {
+ request.setJsonEntity(Strings.toString(searchSourceBuilder.toXContent(jsonBuilder(), EMPTY_PARAMS)));
+ } catch (IOException e) {
+ throw new IllegalStateException("Error creating request entity", e);
+ }
+ return request;
+ }
+
public String getPropertyRestrictionQuery() {
return propertyRestrictionQuery;
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
index 2be9bf4..7eff471 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResponseHandler.java
@@ -16,15 +16,21 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
import org.apache.jackrabbit.oak.spi.query.Filter;
+import org.elasticsearch.client.Response;
import org.elasticsearch.search.SearchHit;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Map;
/**
@@ -34,6 +40,12 @@
private static final Logger LOG = LoggerFactory.getLogger(ElasticResponseHandler.class);
+ private static final ObjectMapper JSON_MAPPER;
+ static {
+ JSON_MAPPER = new ObjectMapper();
+ JSON_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
private final PlanResult planResult;
private final Filter filter;
@@ -42,10 +54,16 @@
this.filter = filter;
}
- public String getPath(SearchHit hit) {
- final Map<String, Object> sourceMap = hit.getSourceAsMap();
- String path = (String) sourceMap.get(FieldNames.PATH);
+ public String getPath(SearchResponseHit hit) {
+ return transformPath((String) hit.source.get(FieldNames.PATH));
+ }
+ public String getPath(SearchHit hit) {
+ Map<String, Object> sourceMap = hit.getSourceAsMap();
+ return transformPath((String) sourceMap.get(FieldNames.PATH));
+ }
+
+ private String transformPath(String path) {
if ("".equals(path)) {
path = "/";
}
@@ -65,4 +83,92 @@
public boolean isAccessible(String path) {
return filter.isAccessible(path);
}
+
+ public SearchResponse parse(Response response) throws IOException {
+ return JSON_MAPPER.readValue(response.getEntity().getContent(), SearchResponse.class);
+ }
+
+ // POJO for Elastic json deserialization
+
+ public static class SearchResponse {
+
+ public final long took;
+ public final boolean timedOut;
+ public final SearchResponseHits hits;
+ public final Map<String, AggregationBuckets> aggregations;
+
+ @JsonCreator
+ public SearchResponse(@JsonProperty("took") long took, @JsonProperty("timed_out") boolean timedOut,
+ @JsonProperty("hits") SearchResponseHits hits,
+ @JsonProperty("aggregations") Map<String, AggregationBuckets> aggregations) {
+ this.took = took;
+ this.timedOut = timedOut;
+ this.hits = hits;
+ this.aggregations = aggregations;
+ }
+
+ }
+
+ public static class SearchResponseHits {
+
+ public final SearchResponseHitsTotal total;
+ public final SearchResponseHit[] hits;
+
+ @JsonCreator
+ public SearchResponseHits(@JsonProperty("total") SearchResponseHitsTotal total,
+ @JsonProperty("hits") SearchResponseHit[] hits) {
+ this.total = total;
+ this.hits = hits;
+ }
+
+ }
+
+ public static class SearchResponseHit {
+
+ public final Map<String, Object> source;
+ public final Object[] sort;
+ public final double score;
+
+ @JsonCreator
+ public SearchResponseHit(@JsonProperty("_source") Map<String, Object> source,
+ @JsonProperty("sort") Object[] sort, @JsonProperty("_score") double score) {
+ this.source = source;
+ this.sort = sort;
+ this.score = score;
+ }
+
+ }
+
+ public static class SearchResponseHitsTotal {
+ public final long value;
+
+ @JsonCreator
+ public SearchResponseHitsTotal(@JsonProperty("value") long value) {
+ this.value = value;
+ }
+ }
+
+ public static class AggregationBuckets {
+
+ public final AggregationBucket[] buckets;
+
+ @JsonCreator
+ public AggregationBuckets(@JsonProperty("buckets") AggregationBucket[] buckets) {
+ this.buckets = buckets;
+ }
+
+ }
+
+ public static class AggregationBucket {
+
+ public final Object key;
+ public final int count;
+
+ @JsonCreator
+ public AggregationBucket(@JsonProperty("key") Object key, @JsonProperty("doc_count") int count) {
+ this.key = key;
+ this.count = count;
+ }
+
+ }
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
index 2c8cc21..2093a4a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
@@ -16,11 +16,11 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
/**
@@ -45,7 +45,7 @@
void endData();
/**
- * {@link ElasticResponseListener} extension to subscribe on {@link SearchHit} events
+ * {@link ElasticResponseListener} extension to subscribe on response hit events
*/
interface SearchHitListener extends ElasticResponseListener {
@@ -64,20 +64,20 @@
default void startData(long totalHits) { /*empty*/ }
/**
- * This method is called for each {@link SearchHit} retrieved
+ * This method is called for each {@link ElasticResponseHandler.SearchResponseHit} retrieved
*/
- void on(SearchHit searchHit);
+ void on(ElasticResponseHandler.SearchResponseHit searchHit);
}
/**
- * {@link ElasticResponseListener} extension to subscribe on {@link Aggregations} events
+ * {@link ElasticResponseListener} extension to subscribe on aggregations events
*/
interface AggregationListener extends ElasticResponseListener {
/**
- * This method is called once when the {@link Aggregations} are retrieved
- * @param aggregations the {@link Aggregations} or {@code null} if there are no results
+ * This method is called once when the aggregations are retrieved
+ * @param aggregations the {@link Map} with aggregations or {@code null} if there are no results
*/
- void on(Aggregations aggregations);
+ void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations);
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index c3d7344..413aeb5 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -25,20 +25,18 @@
import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -128,7 +126,7 @@
}
@Override
- public void on(SearchHit searchHit) {
+ public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null) {
if (rowInclusionPredicate != null && !rowInclusionPredicate.test(path)) {
@@ -136,7 +134,7 @@
return;
}
try {
- queue.put(new FulltextResultRow(path, searchHit.getScore(), null, elasticFacetProvider, null));
+ queue.put(new FulltextResultRow(path, searchHit.score, null, elasticFacetProvider, null));
} catch (InterruptedException e) {
throw new IllegalStateException("Error producing results into the iterator queue", e);
}
@@ -160,13 +158,13 @@
listeners.add(elasticFacetProvider);
}
- return new ElasticQueryScanner(elasticRequestHandler, listeners);
+ return new ElasticQueryScanner(listeners);
}
/**
* Scans Elastic results asynchronously and notify listeners.
*/
- class ElasticQueryScanner implements ActionListener<SearchResponse> {
+ class ElasticQueryScanner implements ResponseListener {
private static final int SMALL_RESULT_SET_SIZE = 10;
@@ -194,14 +192,13 @@
private final ElasticMetricHandler elasticMetricHandler = indexNode.getElasticMetricHandler();
- ElasticQueryScanner(ElasticRequestHandler requestHandler,
- List<ElasticResponseListener> listeners) {
- this.query = requestHandler.baseQuery();
- this.sorts = requestHandler.baseSorts();
+ ElasticQueryScanner(List<ElasticResponseListener> listeners) {
+ this.query = elasticRequestHandler.baseQuery();
+ this.sorts = elasticRequestHandler.baseSorts();
- final Set<String> sourceFieldsSet = new HashSet<>();
- final AtomicBoolean needsAggregations = new AtomicBoolean(false);
- final Consumer<ElasticResponseListener> register = (listener) -> {
+ Set<String> sourceFieldsSet = new HashSet<>();
+ AtomicBoolean needsAggregations = new AtomicBoolean(false);
+ Consumer<ElasticResponseListener> register = (listener) -> {
allListeners.add(listener);
sourceFieldsSet.addAll(listener.sourceFields());
if (listener instanceof SearchHitListener) {
@@ -220,7 +217,7 @@
listeners.forEach(register);
this.sourceFields = sourceFieldsSet.toArray(new String[0]);
- final SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
+ SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
.query(query)
// use a smaller size when the query contains aggregations. This improves performance
// when the client is only interested in insecure facets
@@ -230,18 +227,18 @@
this.sorts.forEach(searchSourceBuilder::sort);
if (needsAggregations.get()) {
- requestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
+ elasticRequestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
}
- final SearchRequest searchRequest = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
- .source(searchSourceBuilder);
-
LOG.trace("Kicking initial search for query {}", searchSourceBuilder);
semaphore.tryAcquire();
searchStartTime = System.currentTimeMillis();
requests++;
- indexNode.getConnection().getClient().searchAsync(searchRequest, RequestOptions.DEFAULT, this);
+
+ Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
+ indexNode.getDefinition().getRemoteIndexAlias());
+ indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
elasticMetricHandler.markQuery(true);
}
@@ -253,16 +250,22 @@
* these data structures are modified before releasing the semaphore.
*/
@Override
- public void onResponse(SearchResponse searchResponse) {
+ public void onSuccess(Response response) {
long searchTotalTime = System.currentTimeMillis() - searchStartTime;
- SearchHit[] searchHits = searchResponse.getHits().getHits();
+ ElasticResponseHandler.SearchResponse searchResponse;
+ try {
+ searchResponse = elasticResponseHandler.parse(response);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to parse response", e);
+ }
+
+ ElasticResponseHandler.SearchResponseHit[] searchHits = searchResponse.hits.hits;
int hitsSize = searchHits != null ? searchHits.length : 0;
- TimeValue responseTook = searchResponse.getTook();
- elasticMetricHandler.measureQuery(hitsSize, responseTook.getMillis(), searchTotalTime, searchResponse.isTimedOut());
+ elasticMetricHandler.measureQuery(hitsSize, searchResponse.took, searchTotalTime, searchResponse.timedOut);
if (hitsSize > 0) {
- long totalHits = searchResponse.getHits().getTotalHits().value;
- LOG.debug("Processing search response that took {} to read {}/{} docs", responseTook, hitsSize, totalHits);
- lastHitSortValues = searchHits[hitsSize - 1].getSortValues();
+ long totalHits = searchResponse.hits.total.value;
+ LOG.debug("Processing search response that took {} to read {}/{} docs", searchResponse.took, hitsSize, totalHits);
+ lastHitSortValues = searchHits[hitsSize - 1].sort;
scannedRows += hitsSize;
anyDataLeft.set(totalHits > scannedRows);
estimator.update(indexPlan.getFilter(), totalHits);
@@ -276,16 +279,15 @@
}
if (!aggregationListeners.isEmpty()) {
- Aggregations aggregations = searchResponse.getAggregations();
- LOG.trace("Emitting aggregations {}", aggregations);
+ LOG.trace("Emitting aggregations {}", searchResponse.aggregations);
for (AggregationListener l : aggregationListeners) {
- l.on(aggregations);
+ l.on(searchResponse.aggregations);
}
}
}
LOG.trace("Emitting {} search hits, for a total of {} scanned results", searchHits.length, scannedRows);
- for (SearchHit hit : searchHits) {
+ for (ElasticResponseHandler.SearchResponseHit hit : searchHits) {
for (SearchHitListener l : searchHitListeners) {
l.on(hit);
}
@@ -329,7 +331,9 @@
LOG.trace("Kicking new search after query {}", searchRequest.source());
searchStartTime = System.currentTimeMillis();
- indexNode.getConnection().getClient().searchAsync(searchRequest, RequestOptions.DEFAULT, this);
+ Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
+ indexNode.getDefinition().getRemoteIndexAlias());
+ indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
elasticMetricHandler.markQuery(false);
} else {
LOG.trace("Scanner is closing or still processing data from the previous scan");
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
index 2327593..d641795 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
@@ -16,15 +16,15 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,7 +37,7 @@
private static final Logger LOG = LoggerFactory.getLogger(ElasticInsecureFacetAsyncProvider.class);
- private Aggregations aggregations;
+ private Map<String, ElasticResponseHandler.AggregationBuckets> aggregations;
private final CountDownLatch latch = new CountDownLatch(1);
@@ -49,20 +49,20 @@
} catch (InterruptedException e) {
throw new IllegalStateException("Error while waiting for facets", e);
}
- LOG.trace("Reading facets for {} from aggregations {}", columnName, aggregations.asMap());
+ LOG.trace("Reading facets for {} from aggregations {}", columnName, aggregations);
if (aggregations != null) {
final String facetProp = FulltextIndex.parseFacetField(columnName);
- Terms terms = aggregations.get(facetProp);
- List<FulltextIndex.Facet> facets = new ArrayList<>(terms.getBuckets().size());
- for (Terms.Bucket bucket : terms.getBuckets()) {
- facets.add(new FulltextIndex.Facet(bucket.getKeyAsString(), (int) bucket.getDocCount()));
+ ElasticResponseHandler.AggregationBuckets terms = aggregations.get(facetProp);
+ List<FulltextIndex.Facet> facets = new ArrayList<>(terms.buckets.length);
+ for (ElasticResponseHandler.AggregationBucket bucket : terms.buckets) {
+ facets.add(new FulltextIndex.Facet(bucket.key.toString(), bucket.count));
}
return facets;
} else return null;
}
@Override
- public void on(Aggregations aggregations) {
+ public void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations) {
this.aggregations = aggregations;
this.endData();
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
index c84e2ec..a13051c 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
@@ -20,7 +20,6 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,12 +69,11 @@
}
@Override
- public void on(SearchHit searchHit) {
+ public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null && isAccessible.test(path)) {
- Map<String, Object> sourceMap = searchHit.getSourceAsMap();
for (String field: facetFields) {
- Object value = sourceMap.get(field);
+ Object value = searchHit.source.get(field);
if (value != null) {
facetsMap.compute(field, (column, facetValues) -> {
if (facetValues == null) {
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
index 340bc94..25569e9 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
@@ -20,9 +20,6 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import java.util.ArrayList;
import java.util.HashMap;
@@ -70,7 +67,7 @@
}
@Override
- public void on(SearchHit searchHit) {
+ public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
if (totalHits < sampleSize) {
super.on(searchHit);
} else {
@@ -91,13 +88,13 @@
}
@Override
- public void on(Aggregations aggregations) {
+ public void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations) {
for (String field: facetFields) {
- Terms terms = aggregations.get(field);
- List<? extends Terms.Bucket> buckets = terms.getBuckets();
- final List<FulltextIndex.Facet> facetList = new ArrayList<>();
- for (Terms.Bucket bucket : buckets) {
- facetList.add(new FulltextIndex.Facet(bucket.getKeyAsString(), (int) bucket.getDocCount()));
+ ElasticResponseHandler.AggregationBuckets terms = aggregations.get(field);
+ ElasticResponseHandler.AggregationBucket[] buckets = terms.buckets;
+ final List<FulltextIndex.Facet> facetList = new ArrayList<>(buckets.length);
+ for (ElasticResponseHandler.AggregationBucket bucket : buckets) {
+ facetList.add(new FulltextIndex.Facet(bucket.key.toString(), bucket.count));
}
facetMap.put(field, facetList);
}
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
index 51573b2..ec471e9 100644
--- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
@@ -330,8 +330,6 @@
assertEquals("Unexpected number of facets", actualAclLabelCount.size(), facets.size());
for (Map.Entry<String, Integer> facet : actualAclLabelCount.entrySet()) {
-
-
String facetLabel = facet.getKey();
int facetCount = getFacets().get(facetLabel);
float ratio = ((float) facetCount) / facet.getValue();
@@ -365,7 +363,6 @@
assertEquals("Unexpected number of facets", actualLabelCount.size(), facets.size());
for (Map.Entry<String, Integer> facet : actualLabelCount.entrySet()) {
-
String facetLabel = facet.getKey();
int facetCount = getFacets().get(facetLabel);
float ratio = ((float) facetCount) / facet.getValue();
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
index 06bbbbd..3af557e 100644
--- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPerfTest.java
@@ -77,48 +77,48 @@
public void testFullTextSingleQuery() throws Exception {
createTestData();
String query = "//*[jcr:contains(@text, 'elit" + (NUM_NODES / 2) + "')] ";
- long startTest = LOG_PERF.start("Starting test executions");
+ long startTest = LOG_PERF.startForInfoLog("Starting test executions");
for (int j = 0; j < NUM_ITERATIONS; j++) {
testQuery(query, XPATH);
}
- LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+ LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
}
// Executes different queries each time
@Test
public void testFullTextMultiQuery() throws Exception {
createTestData();
- long startTest = LOG_PERF.start("Starting test executions");
+ long startTest = LOG_PERF.startForInfoLog("Starting test executions");
Random rndm = new Random(42);
for (int j = 0; j < NUM_ITERATIONS; j++) {
String query = "//*[jcr:contains(@text, 'elit" + rndm.nextInt(NUM_NODES) + "')] ";
testQuery(query, XPATH);
}
- LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+ LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
}
@Test
public void testFullTextMultiQueryWithExtraText() throws Exception {
Random randomText = new Random(42);
createTestData(() -> ElasticTestUtils.randomString(randomText, 1000));
- long startTest = LOG_PERF.start("Starting test executions");
+ long startTest = LOG_PERF.startForInfoLog("Starting test executions");
Random rndm = new Random(42);
for (int j = 0; j < NUM_ITERATIONS; j++) {
String query = "//*[jcr:contains(@text, 'elit" + rndm.nextInt(NUM_NODES) + "')] ";
testQuery(query, XPATH);
}
- LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+ LOG_PERF.end(startTest, -1, -1, "{} iterations of tests completed", NUM_ITERATIONS);
}
@Test
public void testPropertySingleQuery() throws Exception {
createTestData();
String query = "select [jcr:path] from [nt:base] where [title] = 'Title for node0'";
- long startTest = LOG_PERF.start("Starting test executions");
+ long startTest = LOG_PERF.startForInfoLog("Starting test executions");
for (int j = 0; j < NUM_ITERATIONS; j++) {
testQuery(query, SQL2);
}
- LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+ LOG_PERF.end(startTest, -1,-1, "{} iterations of tests completed", NUM_ITERATIONS);
}
@@ -126,13 +126,13 @@
@Test
public void testPropertyMultiQuery() throws Exception {
createTestData();
- long startTest = LOG_PERF.start("Starting test executions");
+ long startTest = LOG_PERF.startForInfoLog("Starting test executions");
Random rndm = new Random(42);
for (int j = 0; j < NUM_ITERATIONS; j++) {
String query = "select [jcr:path] from [nt:base] where [title] = 'Title for node" + rndm.nextInt(NUM_NODES) + "'";
testQuery(query, SQL2);
}
- LOG_PERF.end(startTest, -1, "{} iterations of tests completed", NUM_ITERATIONS);
+ LOG_PERF.end(startTest, -1, -1, "{} iterations of tests completed", NUM_ITERATIONS);
}
private void createTestData() throws Exception {