blob: a4548d8c78a2829ccd95c9db5c9176ef7fdf92c9 [file] [log] [blame]
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.util.ThreadInterruptedException;
// TODO
// - we could make this work also w/ "normal" reopen/commit?
/**
* Utility class to manage sharing near-real-time searchers
* across multiple searching threads.
*
* <p>NOTE: to use this class, you must call reopen
* periodically. The {@link NRTManagerReopenThread} is a
* simple class to do this on a periodic basis. If you
* implement your own reopener, be sure to call {@link
* #addWaitingListener} so your reopener is notified when a
* caller is waiting for a specific generation searcher. </p>
*
* @lucene.experimental
*/
public class NRTManager implements Closeable {
private final IndexWriter writer;
private final ExecutorService es;
private final AtomicLong indexingGen;
private final AtomicLong searchingGen;
private final AtomicLong noDeletesSearchingGen;
private final SearcherWarmer warmer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private volatile IndexSearcher currentSearcher;
private volatile IndexSearcher noDeletesCurrentSearcher;
/**
* Create new NRTManager.
*
* @param writer IndexWriter to open near-real-time
* readers
* @param es optional ExecutorService so different segments can
* be searched concurrently (see {@link
* IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
* to search segments sequentially.
* @param warmer optional {@link SearcherWarmer}. Pass
* null if you don't require the searcher to warmed
* before going live. If this is non-null then a
* merged segment warmer is installed on the
* provided IndexWriter's config.
*
* <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
* not invoked for the initial searcher; you should
* warm it yourself if necessary.
*/
public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException {
this.writer = writer;
this.es = es;
this.warmer = warmer;
indexingGen = new AtomicLong(1);
searchingGen = new AtomicLong(-1);
noDeletesSearchingGen = new AtomicLong(-1);
// Create initial reader:
swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
if (this.warmer != null) {
writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es));
}
});
}
}
/** NRTManager invokes this interface to notify it when a
* caller is waiting for a specific generation searcher
* to be visible. */
public static interface WaitingListener {
public void waiting(boolean requiresDeletes, long targetGen);
}
/** Adds a listener, to be notified when a caller is
* waiting for a specific generation searcher to be
* visible. */
public void addWaitingListener(WaitingListener l) {
waitingListeners.add(l);
}
/** Remove a listener added with {@link
* #addWaitingListener}. */
public void removeWaitingListener(WaitingListener l) {
waitingListeners.remove(l);
}
public long updateDocument(Term t, Document d, Analyzer a) throws IOException {
writer.updateDocument(t, d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocument(Term t, Document d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException {
writer.updateDocuments(t, docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long updateDocuments(Term t, Collection<Document> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Document d, Analyzer a) throws IOException {
writer.addDocument(d, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException {
writer.addDocuments(docs, a);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocument(Document d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
public long addDocuments(Collection<Document> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Returns the most current searcher. If you require a
* certain indexing generation be visible in the returned
* searcher, call {@link #get(long)}
* instead.
*/
public synchronized IndexSearcher get() {
return get(true);
}
/** Just like {@link #get}, but by passing <code>false</code> for
* requireDeletes, you can get faster reopen time, but
* the returned reader is allowed to not reflect all
* deletions. See {@link IndexReader#open(IndexWriter,boolean)} */
public synchronized IndexSearcher get(boolean requireDeletes) {
final IndexSearcher s;
if (requireDeletes) {
s = currentSearcher;
} else if (noDeletesSearchingGen.get() > searchingGen.get()) {
s = noDeletesCurrentSearcher;
} else {
s = currentSearcher;
}
s.getIndexReader().incRef();
return s;
}
/** Call this if you require a searcher reflecting all
* changes as of the target generation.
*
* @param targetGen Returned searcher must reflect changes
* as of this generation
*/
public synchronized IndexSearcher get(long targetGen) {
return get(targetGen, true);
}
/** Call this if you require a searcher reflecting all
* changes as of the target generation, and you don't
* require deletions to be reflected. Note that the
* returned searcher may still reflect some or all
* deletions.
*
* @param targetGen Returned searcher must reflect changes
* as of this generation
*
* @param requireDeletes If true, the returned searcher must
* reflect all deletions. This can be substantially more
* costly than not applying deletes. Note that if you
* pass false, it's still possible that some or all
* deletes may have been applied.
**/
public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
assert noDeletesSearchingGen.get() >= searchingGen.get();
if (targetGen > getCurrentSearchingGen(requireDeletes)) {
// Must wait
//final long t0 = System.nanoTime();
for(WaitingListener listener : waitingListeners) {
listener.waiting(requireDeletes, targetGen);
}
while (targetGen > getCurrentSearchingGen(requireDeletes)) {
//System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
//final long waitNS = System.nanoTime()-t0;
//System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
}
return get(requireDeletes);
}
/** Returns generation of current searcher. */
public long getCurrentSearchingGen(boolean requiresDeletes) {
return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
}
/** Release the searcher obtained from {@link
* #get()} or {@link #get(long)}.
*
* <p><b>NOTE</b>: it's safe to call this after {@link
* #close}. */
public void release(IndexSearcher s) throws IOException {
s.getIndexReader().decRef();
}
/** Call this when you need the NRT reader to reopen.
*
* @param applyDeletes If true, the newly opened reader
* will reflect all deletes
*/
public boolean reopen(boolean applyDeletes) throws IOException {
// Mark gen as of when reopen started:
final long newSearcherGen = indexingGen.getAndIncrement();
if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
//System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
searchingGen.set(newSearcherGen);
noDeletesSearchingGen.set(newSearcherGen);
synchronized(this) {
notifyAll();
}
//System.out.println("reopen: skip: return");
return false;
} else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
//System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
noDeletesSearchingGen.set(newSearcherGen);
synchronized(this) {
notifyAll();
}
//System.out.println("reopen: skip: return");
return false;
}
//System.out.println("indexingGen now " + indexingGen);
// .reopen() returns a new reference:
// Start from whichever searcher is most current:
final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es);
if (warmer != null) {
warmer.warm(nextSearcher);
}
// Transfer reference to swapSearcher:
swapSearcher(nextSearcher,
newSearcherGen,
applyDeletes);
return true;
}
// Steals a reference from newSearcher:
private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
//System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
// Always replace noDeletesCurrentSearcher:
if (noDeletesCurrentSearcher != null) {
noDeletesCurrentSearcher.getIndexReader().decRef();
}
noDeletesCurrentSearcher = newSearcher;
assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
noDeletesSearchingGen.set(newSearchingGen);
if (applyDeletes) {
// Deletes were applied, so we also update currentSearcher:
if (currentSearcher != null) {
currentSearcher.getIndexReader().decRef();
}
currentSearcher = newSearcher;
if (newSearcher != null) {
newSearcher.getIndexReader().incRef();
}
assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
searchingGen.set(newSearchingGen);
}
notifyAll();
//System.out.println(Thread.currentThread().getName() + ": done");
}
/** Close this NRTManager to future searching. Any
* searches still in process in other threads won't be
* affected, and they should still call {@link #release}
* after they are done.
*
* <p><b>NOTE</b>: caller must separately close the writer. */
// @Override -- not until Java 1.6
public void close() throws IOException {
swapSearcher(null, indexingGen.getAndIncrement(), true);
}
}