blob: 683ba98a994f23c8eef1422912f4e93a0f01c179 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.lucene.search;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.Tracer;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.OpenBitSet;
public class FacetExecutor {
private static final Log LOG = LogFactory.getLog(FacetExecutor.class);
static Comparator<Entry<Object, Info>> COMPARATOR = new Comparator<Entry<Object, Info>>() {
@Override
public int compare(Entry<Object, Info> o1, Entry<Object, Info> o2) {
AtomicReader r1 = o1.getValue()._reader;
AtomicReader r2 = o2.getValue()._reader;
return r2.maxDoc() - r1.maxDoc();
}
};
static class Finished extends IOException {
private static final long serialVersionUID = 1L;
}
static class SimpleCollector extends Collector {
int _hits;
final OpenBitSet _bitSet;
Scorer _scorer;
SimpleCollector(OpenBitSet bitSet) {
_bitSet = bitSet;
}
@Override
public void collect(int doc) throws IOException {
if (_bitSet.fastGet(doc)) {
_hits++;
} else {
int nextSetBit = _bitSet.nextSetBit(doc);
if (nextSetBit < 0) {
throw new Finished();
} else {
int advance = _scorer.advance(nextSetBit);
if (advance == DocIdSetIterator.NO_MORE_DOCS) {
throw new Finished();
}
if (_bitSet.fastGet(advance)) {
_hits++;
}
}
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
_scorer = scorer;
}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
}
@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
}
static class SimpleCollectorExitEarly extends SimpleCollector {
private long _currentCount;
private final long _min;
SimpleCollectorExitEarly(OpenBitSet bitSet, long currentCount, long min) {
super(bitSet);
_currentCount = currentCount;
_min = min;
}
@Override
public void collect(int doc) throws IOException {
super.collect(doc);
_currentCount++;
if (_currentCount >= _min) {
throw new Finished();
}
}
}
static class Info {
final OpenBitSet _bitSet;
final Scorer[] _scorers;
final AtomicReader _reader;
final String _readerStr;
final int _maxDoc;
final Lock[] _locks;
@Override
public String toString() {
return "Info scorers length [" + _scorers.length + "] reader [" + _reader + "]";
}
Info(AtomicReaderContext context, Scorer[] scorers, Lock[] locks) {
AtomicReader reader = context.reader();
_bitSet = new OpenBitSet(reader.maxDoc());
_scorers = scorers;
_reader = reader;
_readerStr = _reader.toString();
_maxDoc = _reader.maxDoc();
_locks = locks;
}
void process(AtomicLongArray counts, long[] minimumsBeforeReturning, AtomicBoolean running) throws IOException {
if (minimumsBeforeReturning == null) {
Tracer trace = Trace.trace("processing facet - segment", Trace.param("reader", _readerStr),
Trace.param("maxDoc", _maxDoc), Trace.param("minimums", "NONE"), Trace.param("scorers", _scorers.length));
try {
for (int i = 0; i < _scorers.length && running.get(); i++) {
SimpleCollector col = new SimpleCollector(_bitSet);
runFacet(counts, col, i);
}
} finally {
trace.done();
}
} else {
BlockingQueue<Integer> ids = new ArrayBlockingQueue<Integer>(_scorers.length + 1);
try {
populate(ids);
while (!ids.isEmpty() && running.get()) {
int id = ids.take();
Lock lock = _locks[id];
if (lock.tryLock()) {
try {
long min = minimumsBeforeReturning[id];
long currentCount = counts.get(id);
if (currentCount < min) {
LOG.debug("Running facet, current count [{0}] min [{1}]", currentCount, min);
SimpleCollectorExitEarly col = new SimpleCollectorExitEarly(_bitSet, currentCount, min);
runFacet(counts, col, id);
}
} finally {
lock.unlock();
}
} else {
ids.put(id);
}
}
} catch (Exception e) {
throw new IOException(e);
}
}
}
private void populate(BlockingQueue<Integer> ids) throws InterruptedException {
for (int i = 0; i < _scorers.length; i++) {
ids.put(i);
}
}
private void runFacet(AtomicLongArray counts, SimpleCollector col, int i) throws IOException {
Scorer scorer = _scorers[i];
if (scorer != null) {
Tracer traceInner = Trace.trace("processing facet - segment - scorer", Trace.param("scorer", scorer),
Trace.param("scorer.cost", scorer.cost()));
try {
// new ExitScorer(scorer).score(col);
scorer.score(col);
} catch (Finished e) {
// Do nothing, exiting early.
} finally {
traceInner.done();
}
int hits = col._hits;
LOG.debug("Facet [{0}] result [{1}]", i, hits);
counts.addAndGet(i, hits);
}
col._hits = 0;
}
}
private final Map<Object, Info> _infoMap = new ConcurrentHashMap<Object, FacetExecutor.Info>();
private final int _length;
private final AtomicLongArray _counts;
private final long[] _minimumsBeforeReturning;
private final Lock[] _locks;
private final AtomicBoolean _running;
private boolean _processed;
public FacetExecutor(int length) {
this(length, null, new AtomicLongArray(length));
}
public FacetExecutor(int length, long[] minimumsBeforeReturning, AtomicLongArray counts) {
this(length, minimumsBeforeReturning, counts, new AtomicBoolean(true));
}
public FacetExecutor(int length, long[] minimumsBeforeReturning) {
this(length, minimumsBeforeReturning, new AtomicLongArray(length));
}
public FacetExecutor(int length, long[] minimumsBeforeReturning, AtomicLongArray counts, AtomicBoolean running) {
_length = length;
_counts = counts;
_minimumsBeforeReturning = minimumsBeforeReturning;
_locks = new Lock[_length];
for (int i = 0; i < _length; i++) {
_locks[i] = new ReentrantReadWriteLock().writeLock();
}
_running = running;
}
public void addScorers(AtomicReaderContext context, Scorer[] scorers) throws IOException {
if (scorers.length != _length) {
throw new IOException("Scorer length is not correct expecting [" + _length + "] actual [" + scorers.length + "]");
}
Object key = getKey(context);
Info info = _infoMap.get(key);
if (info == null) {
info = new Info(context, scorers, _locks);
_infoMap.put(key, info);
} else {
AtomicReader reader = context.reader();
LOG.warn("Info about reader context [{0}] already created, existing Info [{1}] current reader [{2}].", context,
info, reader);
}
}
public boolean scorersAlreadyAdded(AtomicReaderContext context) {
Object key = getKey(context);
return _infoMap.containsKey(key);
}
private Object getKey(AtomicReaderContext context) {
return context.reader().getCoreCacheKey();
}
public OpenBitSet getBitSet(AtomicReaderContext context) throws IOException {
Info info = _infoMap.get(getKey(context));
if (info == null) {
throw new IOException("Info object missing.");
}
return info._bitSet;
}
public int length() {
return _length;
}
public long get(int i) throws IOException {
if (!_processed) {
throw new IOException("Has not been processed.");
}
return _counts.get(i);
}
public void processFacets(ExecutorService executor) throws IOException {
if (!_processed) {
Tracer trace = Trace.trace("processing facets");
try {
processInternal(executor);
} finally {
trace.done();
}
_processed = true;
}
}
private void processInternal(ExecutorService executor) throws IOException {
List<Entry<Object, Info>> entries = new ArrayList<Entry<Object, Info>>(_infoMap.entrySet());
Collections.sort(entries, COMPARATOR);
if (executor == null) {
for (Entry<Object, Info> e : entries) {
if (_running.get()) {
e.getValue().process(_counts, _minimumsBeforeReturning, _running);
}
}
} else {
final AtomicInteger finished = new AtomicInteger();
for (Entry<Object, Info> e : entries) {
if (_running.get()) {
final Entry<Object, Info> entry = e;
executor.submit(new Runnable() {
@Override
public void run() {
try {
entry.getValue().process(_counts, _minimumsBeforeReturning, _running);
} catch (Throwable e) {
LOG.error("Unknown error", e);
} finally {
finished.incrementAndGet();
}
}
});
}
}
while (finished.get() < entries.size()) {
synchronized (this) {
try {
wait(1);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
}
}
}