blob: 413aeb509d229ce94677b10142c27de02772bd33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexNode;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* Class to iterate over Elastic results of a given {@link IndexPlan}.
* The results are produced asynchronously into an internal unbounded {@link BlockingQueue}. To avoid too many calls to
* Elastic the results are loaded in chunks (using search_after strategy) and loaded only when needed.
*/
public class ElasticResultRowAsyncIterator implements Iterator<FulltextResultRow>, ElasticResponseListener.SearchHitListener {
private static final Logger LOG = LoggerFactory.getLogger(ElasticResultRowAsyncIterator.class);
// this is an internal special message to notify the consumer the result set has been completely returned
private static final FulltextResultRow POISON_PILL =
new FulltextResultRow("___OAK_POISON_PILL___", 0d, Collections.emptyMap(), null, null);
private final BlockingQueue<FulltextResultRow> queue = new LinkedBlockingQueue<>();
private final ElasticIndexNode indexNode;
private final IndexPlan indexPlan;
private final Predicate<String> rowInclusionPredicate;
private final LMSEstimator estimator;
private final ElasticQueryScanner elasticQueryScanner;
private final ElasticRequestHandler elasticRequestHandler;
private final ElasticResponseHandler elasticResponseHandler;
private final ElasticFacetProvider elasticFacetProvider;
private FulltextResultRow nextRow;
public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode indexNode,
@NotNull ElasticRequestHandler elasticRequestHandler,
@NotNull ElasticResponseHandler elasticResponseHandler,
@NotNull QueryIndex.IndexPlan indexPlan,
Predicate<String> rowInclusionPredicate,
LMSEstimator estimator) {
this.indexNode = indexNode;
this.elasticRequestHandler = elasticRequestHandler;
this.elasticResponseHandler = elasticResponseHandler;
this.indexPlan = indexPlan;
this.rowInclusionPredicate = rowInclusionPredicate;
this.estimator = estimator;
this.elasticFacetProvider = elasticRequestHandler.getAsyncFacetProvider(elasticResponseHandler);
this.elasticQueryScanner = initScanner();
}
@Override
public boolean hasNext() {
// if nextRow is not null it means the caller invoked hasNext() before without calling next()
if (nextRow == null) {
if (queue.isEmpty()) {
// this triggers, when needed, the scan of the next results chunk
elasticQueryScanner.scan();
}
try {
nextRow = queue.take();
} catch (InterruptedException e) {
throw new IllegalStateException("Error reading next result from Elastic", e);
}
}
return !POISON_PILL.path.equals(nextRow.path);
}
@Override
public FulltextResultRow next() {
if (nextRow == null) { // next is called without hasNext
if (!hasNext()) {
return null;
}
}
FulltextResultRow row = null;
if (nextRow != null && !POISON_PILL.path.equals(nextRow.path)) {
row = nextRow;
nextRow = null;
}
return row;
}
@Override
public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null) {
if (rowInclusionPredicate != null && !rowInclusionPredicate.test(path)) {
LOG.trace("Path {} not included because of hierarchy inclusion rules", path);
return;
}
try {
queue.put(new FulltextResultRow(path, searchHit.score, null, elasticFacetProvider, null));
} catch (InterruptedException e) {
throw new IllegalStateException("Error producing results into the iterator queue", e);
}
}
}
@Override
public void endData() {
try {
queue.put(POISON_PILL);
} catch (InterruptedException e) {
throw new IllegalStateException("Error inserting poison pill into the iterator queue", e);
}
}
private ElasticQueryScanner initScanner() {
List<ElasticResponseListener> listeners = new ArrayList<>();
// TODO: we could avoid to register this listener when the client is interested in facets only. It would save space and time
listeners.add(this);
if (elasticFacetProvider != null) {
listeners.add(elasticFacetProvider);
}
return new ElasticQueryScanner(listeners);
}
/**
* Scans Elastic results asynchronously and notify listeners.
*/
class ElasticQueryScanner implements ResponseListener {
private static final int SMALL_RESULT_SET_SIZE = 10;
private final Set<ElasticResponseListener> allListeners = new HashSet<>();
private final List<SearchHitListener> searchHitListeners = new ArrayList<>();
private final List<AggregationListener> aggregationListeners = new ArrayList<>();
private final QueryBuilder query;
private final List<FieldSortBuilder> sorts;
private final String[] sourceFields;
// concurrent data structures to coordinate chunks loading
private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
private int scannedRows;
private int requests;
private boolean fullScan;
private long searchStartTime;
// reference to the last document sort values for search_after queries
private Object[] lastHitSortValues;
// Semaphore to guarantee only one in-flight request to Elastic
private final Semaphore semaphore = new Semaphore(1);
private final ElasticMetricHandler elasticMetricHandler = indexNode.getElasticMetricHandler();
ElasticQueryScanner(List<ElasticResponseListener> listeners) {
this.query = elasticRequestHandler.baseQuery();
this.sorts = elasticRequestHandler.baseSorts();
Set<String> sourceFieldsSet = new HashSet<>();
AtomicBoolean needsAggregations = new AtomicBoolean(false);
Consumer<ElasticResponseListener> register = (listener) -> {
allListeners.add(listener);
sourceFieldsSet.addAll(listener.sourceFields());
if (listener instanceof SearchHitListener) {
SearchHitListener searchHitListener = (SearchHitListener) listener;
searchHitListeners.add(searchHitListener);
if (searchHitListener.isFullScan()) {
fullScan = true;
}
}
if (listener instanceof AggregationListener) {
aggregationListeners.add((AggregationListener) listener);
needsAggregations.set(true);
}
};
listeners.forEach(register);
this.sourceFields = sourceFieldsSet.toArray(new String[0]);
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
.query(query)
// use a smaller size when the query contains aggregations. This improves performance
// when the client is only interested in insecure facets
.size(needsAggregations.get() ? Math.min(SMALL_RESULT_SET_SIZE, getFetchSize(requests)) : getFetchSize(requests))
.fetchSource(sourceFields, null);
this.sorts.forEach(searchSourceBuilder::sort);
if (needsAggregations.get()) {
elasticRequestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
}
LOG.trace("Kicking initial search for query {}", searchSourceBuilder);
semaphore.tryAcquire();
searchStartTime = System.currentTimeMillis();
requests++;
Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
indexNode.getDefinition().getRemoteIndexAlias());
indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
elasticMetricHandler.markQuery(true);
}
/**
* Handle the response action notifying the registered listeners. Depending on the listeners' configuration
* it could keep loading chunks or wait for a {@code #scan} call to resume scanning.
*
* Some code in this method relies on structure that are not thread safe. We need to make sure
* these data structures are modified before releasing the semaphore.
*/
@Override
public void onSuccess(Response response) {
long searchTotalTime = System.currentTimeMillis() - searchStartTime;
ElasticResponseHandler.SearchResponse searchResponse;
try {
searchResponse = elasticResponseHandler.parse(response);
} catch (IOException e) {
throw new IllegalStateException("Unable to parse response", e);
}
ElasticResponseHandler.SearchResponseHit[] searchHits = searchResponse.hits.hits;
int hitsSize = searchHits != null ? searchHits.length : 0;
elasticMetricHandler.measureQuery(hitsSize, searchResponse.took, searchTotalTime, searchResponse.timedOut);
if (hitsSize > 0) {
long totalHits = searchResponse.hits.total.value;
LOG.debug("Processing search response that took {} to read {}/{} docs", searchResponse.took, hitsSize, totalHits);
lastHitSortValues = searchHits[hitsSize - 1].sort;
scannedRows += hitsSize;
anyDataLeft.set(totalHits > scannedRows);
estimator.update(indexPlan.getFilter(), totalHits);
// now that we got the last hit we can release the semaphore to potentially unlock other requests
semaphore.release();
if (requests == 1) {
for (SearchHitListener l : searchHitListeners) {
l.startData(totalHits);
}
if (!aggregationListeners.isEmpty()) {
LOG.trace("Emitting aggregations {}", searchResponse.aggregations);
for (AggregationListener l : aggregationListeners) {
l.on(searchResponse.aggregations);
}
}
}
LOG.trace("Emitting {} search hits, for a total of {} scanned results", searchHits.length, scannedRows);
for (ElasticResponseHandler.SearchResponseHit hit : searchHits) {
for (SearchHitListener l : searchHitListeners) {
l.on(hit);
}
}
if (!anyDataLeft.get()) {
LOG.trace("No data left: closing scanner, notifying listeners");
close();
} else if (fullScan) {
scan();
}
} else {
LOG.trace("No results: closing scanner, notifying listeners");
close();
}
}
@Override
public void onFailure(Exception e) {
elasticMetricHandler.measureFailedQuery(System.currentTimeMillis() - searchStartTime);
LOG.error("Error retrieving data from Elastic: closing scanner, notifying listeners", e);
// closing scanner immediately after a failure avoiding them to hang (potentially) forever
close();
}
/**
* Triggers a scan of a new chunk of the result set, if needed.
*/
private void scan() {
if (semaphore.tryAcquire() && anyDataLeft.get()) {
final SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource()
.query(query)
.size(getFetchSize(requests++))
.fetchSource(sourceFields, null)
.searchAfter(lastHitSortValues);
this.sorts.forEach(searchSourceBuilder::sort);
final SearchRequest searchRequest = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
.source(searchSourceBuilder);
LOG.trace("Kicking new search after query {}", searchRequest.source());
searchStartTime = System.currentTimeMillis();
Request request = elasticRequestHandler.createLowLevelRequest(searchSourceBuilder,
indexNode.getDefinition().getRemoteIndexAlias());
indexNode.getConnection().getClient().getLowLevelClient().performRequestAsync(request, this);
elasticMetricHandler.markQuery(false);
} else {
LOG.trace("Scanner is closing or still processing data from the previous scan");
}
}
/* picks the size in the fetch array at index=requests or the last if out of bound */
private int getFetchSize(int requestId) {
int[] queryFetchSizes = indexNode.getDefinition().queryFetchSizes;
return queryFetchSizes.length > requestId ?
queryFetchSizes[requestId] : queryFetchSizes[queryFetchSizes.length -1];
}
// close all listeners
private void close() {
semaphore.release();
for (ElasticResponseListener l : allListeners) {
l.endData();
}
}
}
}