blob: 25569e94dc12ed47c1b702ac887e5693b29021df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* An {@link ElasticSecureFacetAsyncProvider} extension that subscribes also on Elastic Aggregation events.
* SearchHit events are sampled and then used to adjust facets coming from Aggregations in order to minimize
* access checks. This provider could improve facets performance but only when the result set is quite big.
*/
public class ElasticStatisticalFacetAsyncProvider
extends ElasticSecureFacetAsyncProvider
implements ElasticResponseListener.AggregationListener {
private final int sampleSize;
private long totalHits;
private final Random rGen;
private int sampled = 0;
private int seen = 0;
private long accessibleCount = 0;
private final Map<String, List<FulltextIndex.Facet>> facetMap = new HashMap<>();
private final CountDownLatch latch = new CountDownLatch(1);
ElasticStatisticalFacetAsyncProvider(ElasticRequestHandler elasticRequestHandler,
ElasticResponseHandler elasticResponseHandler,
Predicate<String> isAccessible,
long randomSeed, int sampleSize) {
super(elasticRequestHandler, elasticResponseHandler, isAccessible);
this.sampleSize = sampleSize;
this.rGen = new Random(randomSeed);
}
@Override
public void startData(long totalHits) {
this.totalHits = totalHits;
}
@Override
public void on(ElasticResponseHandler.SearchResponseHit searchHit) {
if (totalHits < sampleSize) {
super.on(searchHit);
} else {
if (sampleSize == sampled) {
return;
}
int r = rGen.nextInt((int) (totalHits - seen)) + 1;
seen++;
if (r <= sampleSize - sampled) {
sampled++;
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null && isAccessible.test(path)) {
accessibleCount++;
}
}
}
}
@Override
public void on(Map<String, ElasticResponseHandler.AggregationBuckets> aggregations) {
for (String field: facetFields) {
ElasticResponseHandler.AggregationBuckets terms = aggregations.get(field);
ElasticResponseHandler.AggregationBucket[] buckets = terms.buckets;
final List<FulltextIndex.Facet> facetList = new ArrayList<>(buckets.length);
for (ElasticResponseHandler.AggregationBucket bucket : buckets) {
facetList.add(new FulltextIndex.Facet(bucket.key.toString(), bucket.count));
}
facetMap.put(field, facetList);
}
}
@Override
public void endData() {
if (totalHits < sampleSize) {
super.endData();
} else {
for (String facet: facetMap.keySet()) {
facetMap.compute(facet, (s, facets1) -> updateLabelAndValueIfRequired(facets1));
}
latch.countDown();
}
}
@Override
public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String columnName) {
if (totalHits < sampleSize) {
return super.getFacets(numberOfFacets, columnName);
} else {
LOG.trace("Requested facets for {} - Latch count: {}", columnName, latch.getCount());
try {
latch.await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException("Error while waiting for facets", e);
}
LOG.trace("Reading facets for {} from {}", columnName, facetMap);
return facetMap.get(FulltextIndex.parseFacetField(columnName));
}
}
private List<FulltextIndex.Facet> updateLabelAndValueIfRequired(List<FulltextIndex.Facet> labelAndValues) {
if (accessibleCount < sampleSize) {
int numZeros = 0;
List<FulltextIndex.Facet> newValues;
{
List<FulltextIndex.Facet> proportionedLVs = new LinkedList<>();
for (FulltextIndex.Facet labelAndValue : labelAndValues) {
long count = labelAndValue.getCount() * accessibleCount / sampleSize;
if (count == 0) {
numZeros++;
}
proportionedLVs.add(new FulltextIndex.Facet(labelAndValue.getLabel(), Math.toIntExact(count)));
}
labelAndValues = proportionedLVs;
}
if (numZeros > 0) {
newValues = new LinkedList<>();
for (FulltextIndex.Facet lv : labelAndValues) {
if (lv.getCount() > 0) {
newValues.add(lv);
}
}
} else {
newValues = labelAndValues;
}
return newValues;
} else {
return labelAndValues;
}
}
}