| package org.apache.lucene.index; |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.lucene.analysis.Analyzer; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.index.IndexReader; // javadocs |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.Query; |
| import org.apache.lucene.search.SearcherWarmer; |
| import org.apache.lucene.util.ThreadInterruptedException; |
| |
| // TODO |
| // - we could make this work also w/ "normal" reopen/commit? |
| |
| /** |
| * Utility class to manage sharing near-real-time searchers |
| * across multiple searching threads. |
| * |
| * <p>NOTE: to use this class, you must call reopen |
| * periodically. The {@link NRTManagerReopenThread} is a |
| * simple class to do this on a periodic basis. If you |
| * implement your own reopener, be sure to call {@link |
| * #addWaitingListener} so your reopener is notified when a |
| * caller is waiting for a specific generation searcher. </p> |
| * |
| * @lucene.experimental |
| */ |
| |
| public class NRTManager implements Closeable { |
| private final IndexWriter writer; |
| private final ExecutorService es; |
| private final AtomicLong indexingGen; |
| private final AtomicLong searchingGen; |
| private final AtomicLong noDeletesSearchingGen; |
| private final SearcherWarmer warmer; |
| private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>(); |
| |
| private volatile IndexSearcher currentSearcher; |
| private volatile IndexSearcher noDeletesCurrentSearcher; |
| |
| /** |
| * Create new NRTManager. |
| * |
| * @param writer IndexWriter to open near-real-time |
| * readers |
| * @param es optional ExecutorService so different segments can |
| * be searched concurrently (see {@link |
| * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null |
| * to search segments sequentially. |
| * @param warmer optional {@link SearcherWarmer}. Pass |
| * null if you don't require the searcher to warmed |
| * before going live. If this is non-null then a |
| * merged segment warmer is installed on the |
| * provided IndexWriter's config. |
| * |
| * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is |
| * not invoked for the initial searcher; you should |
| * warm it yourself if necessary. |
| */ |
| public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException { |
| |
| this.writer = writer; |
| this.es = es; |
| this.warmer = warmer; |
| indexingGen = new AtomicLong(1); |
| searchingGen = new AtomicLong(-1); |
| noDeletesSearchingGen = new AtomicLong(-1); |
| |
| // Create initial reader: |
| swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true); |
| |
| if (this.warmer != null) { |
| writer.getConfig().setMergedSegmentWarmer( |
| new IndexWriter.IndexReaderWarmer() { |
| @Override |
| public void warm(IndexReader reader) throws IOException { |
| NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es)); |
| } |
| }); |
| } |
| } |
| |
| /** NRTManager invokes this interface to notify it when a |
| * caller is waiting for a specific generation searcher |
| * to be visible. */ |
| public static interface WaitingListener { |
| public void waiting(boolean requiresDeletes, long targetGen); |
| } |
| |
| /** Adds a listener, to be notified when a caller is |
| * waiting for a specific generation searcher to be |
| * visible. */ |
| public void addWaitingListener(WaitingListener l) { |
| waitingListeners.add(l); |
| } |
| |
| /** Remove a listener added with {@link |
| * #addWaitingListener}. */ |
| public void removeWaitingListener(WaitingListener l) { |
| waitingListeners.remove(l); |
| } |
| |
| public long updateDocument(Term t, Document d, Analyzer a) throws IOException { |
| writer.updateDocument(t, d, a); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long updateDocument(Term t, Document d) throws IOException { |
| writer.updateDocument(t, d); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException { |
| writer.updateDocuments(t, docs, a); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long updateDocuments(Term t, Collection<Document> docs) throws IOException { |
| writer.updateDocuments(t, docs); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long deleteDocuments(Term t) throws IOException { |
| writer.deleteDocuments(t); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long deleteDocuments(Query q) throws IOException { |
| writer.deleteDocuments(q); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long addDocument(Document d, Analyzer a) throws IOException { |
| writer.addDocument(d, a); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException { |
| writer.addDocuments(docs, a); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long addDocument(Document d) throws IOException { |
| writer.addDocument(d); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| public long addDocuments(Collection<Document> docs) throws IOException { |
| writer.addDocuments(docs); |
| // Return gen as of when indexing finished: |
| return indexingGen.get(); |
| } |
| |
| /** Returns the most current searcher. If you require a |
| * certain indexing generation be visible in the returned |
| * searcher, call {@link #get(long)} |
| * instead. |
| */ |
| public synchronized IndexSearcher get() { |
| return get(true); |
| } |
| |
| /** Just like {@link #get}, but by passing <code>false</code> for |
| * requireDeletes, you can get faster reopen time, but |
| * the returned reader is allowed to not reflect all |
| * deletions. See {@link IndexReader#open(IndexWriter,boolean)} */ |
| public synchronized IndexSearcher get(boolean requireDeletes) { |
| final IndexSearcher s; |
| if (requireDeletes) { |
| s = currentSearcher; |
| } else if (noDeletesSearchingGen.get() > searchingGen.get()) { |
| s = noDeletesCurrentSearcher; |
| } else { |
| s = currentSearcher; |
| } |
| s.getIndexReader().incRef(); |
| return s; |
| } |
| |
| /** Call this if you require a searcher reflecting all |
| * changes as of the target generation. |
| * |
| * @param targetGen Returned searcher must reflect changes |
| * as of this generation |
| */ |
| public synchronized IndexSearcher get(long targetGen) { |
| return get(targetGen, true); |
| } |
| |
| /** Call this if you require a searcher reflecting all |
| * changes as of the target generation, and you don't |
| * require deletions to be reflected. Note that the |
| * returned searcher may still reflect some or all |
| * deletions. |
| * |
| * @param targetGen Returned searcher must reflect changes |
| * as of this generation |
| * |
| * @param requireDeletes If true, the returned searcher must |
| * reflect all deletions. This can be substantially more |
| * costly than not applying deletes. Note that if you |
| * pass false, it's still possible that some or all |
| * deletes may have been applied. |
| **/ |
| public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) { |
| |
| assert noDeletesSearchingGen.get() >= searchingGen.get(); |
| |
| if (targetGen > getCurrentSearchingGen(requireDeletes)) { |
| // Must wait |
| //final long t0 = System.nanoTime(); |
| for(WaitingListener listener : waitingListeners) { |
| listener.waiting(requireDeletes, targetGen); |
| } |
| while (targetGen > getCurrentSearchingGen(requireDeletes)) { |
| //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes); |
| try { |
| wait(); |
| } catch (InterruptedException ie) { |
| throw new ThreadInterruptedException(ie); |
| } |
| } |
| //final long waitNS = System.nanoTime()-t0; |
| //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0)); |
| } |
| |
| return get(requireDeletes); |
| } |
| |
| /** Returns generation of current searcher. */ |
| public long getCurrentSearchingGen(boolean requiresDeletes) { |
| return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get(); |
| } |
| |
| /** Release the searcher obtained from {@link |
| * #get()} or {@link #get(long)}. |
| * |
| * <p><b>NOTE</b>: it's safe to call this after {@link |
| * #close}. */ |
| public void release(IndexSearcher s) throws IOException { |
| s.getIndexReader().decRef(); |
| } |
| |
| /** Call this when you need the NRT reader to reopen. |
| * |
| * @param applyDeletes If true, the newly opened reader |
| * will reflect all deletes |
| */ |
| public boolean reopen(boolean applyDeletes) throws IOException { |
| |
| // Mark gen as of when reopen started: |
| final long newSearcherGen = indexingGen.getAndIncrement(); |
| |
| if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) { |
| //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen); |
| searchingGen.set(newSearcherGen); |
| noDeletesSearchingGen.set(newSearcherGen); |
| synchronized(this) { |
| notifyAll(); |
| } |
| //System.out.println("reopen: skip: return"); |
| return false; |
| } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) { |
| //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen); |
| noDeletesSearchingGen.set(newSearcherGen); |
| synchronized(this) { |
| notifyAll(); |
| } |
| //System.out.println("reopen: skip: return"); |
| return false; |
| } |
| |
| //System.out.println("indexingGen now " + indexingGen); |
| |
| // .reopen() returns a new reference: |
| |
| // Start from whichever searcher is most current: |
| final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher; |
| final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes); |
| |
| final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es); |
| if (warmer != null) { |
| warmer.warm(nextSearcher); |
| } |
| |
| // Transfer reference to swapSearcher: |
| swapSearcher(nextSearcher, |
| newSearcherGen, |
| applyDeletes); |
| |
| return true; |
| } |
| |
| // Steals a reference from newSearcher: |
| private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException { |
| //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes); |
| |
| // Always replace noDeletesCurrentSearcher: |
| if (noDeletesCurrentSearcher != null) { |
| noDeletesCurrentSearcher.getIndexReader().decRef(); |
| } |
| noDeletesCurrentSearcher = newSearcher; |
| assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen; |
| noDeletesSearchingGen.set(newSearchingGen); |
| |
| if (applyDeletes) { |
| // Deletes were applied, so we also update currentSearcher: |
| if (currentSearcher != null) { |
| currentSearcher.getIndexReader().decRef(); |
| } |
| currentSearcher = newSearcher; |
| if (newSearcher != null) { |
| newSearcher.getIndexReader().incRef(); |
| } |
| assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen; |
| searchingGen.set(newSearchingGen); |
| } |
| |
| notifyAll(); |
| //System.out.println(Thread.currentThread().getName() + ": done"); |
| } |
| |
| /** Close this NRTManager to future searching. Any |
| * searches still in process in other threads won't be |
| * affected, and they should still call {@link #release} |
| * after they are done. |
| * |
| * <p><b>NOTE</b>: caller must separately close the writer. */ |
| // @Override -- not until Java 1.6 |
| public void close() throws IOException { |
| swapSearcher(null, indexingGen.getAndIncrement(), true); |
| } |
| } |