| Index: lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java |
| =================================================================== |
| --- lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (revision 1176585) |
| +++ lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java (working copy) |
| @@ -20,7 +20,7 @@ |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.concurrent.ExecutorService; |
| -import java.util.concurrent.atomic.AtomicBoolean; |
| +import java.util.concurrent.Semaphore; |
| |
| import org.apache.lucene.index.IndexReader; |
| import org.apache.lucene.index.IndexWriter; |
| @@ -67,7 +67,7 @@ |
| // Current searcher |
| private volatile IndexSearcher currentSearcher; |
| private final SearcherWarmer warmer; |
| - private final AtomicBoolean reopening = new AtomicBoolean(); |
| + private final Semaphore reopening = new Semaphore(1); |
| private final ExecutorService es; |
| |
| /** Opens an initial searcher from the Directory. |
| @@ -136,7 +136,7 @@ |
| |
| // Ensure only 1 thread does reopen at once; other |
| // threads just return immediately: |
| - if (!reopening.getAndSet(true)) { |
| + if (reopening.tryAcquire()) { |
| try { |
| IndexReader newReader = currentSearcher.getIndexReader().reopen(); |
| if (newReader != currentSearcher.getIndexReader()) { |
| @@ -158,7 +158,7 @@ |
| return false; |
| } |
| } finally { |
| - reopening.set(false); |
| + reopening.release(); |
| } |
| } else { |
| return false; |
| @@ -168,12 +168,14 @@ |
| /** Obtain the current IndexSearcher. You must match |
| * every call to get with one call to {@link #release}; |
| * it's best to do so in a finally clause. */ |
| - public synchronized IndexSearcher get() { |
| - if (currentSearcher == null) { |
| - throw new AlreadyClosedException("this SearcherManager is closed"); |
| - } |
| - currentSearcher.getIndexReader().incRef(); |
| - return currentSearcher; |
| + public IndexSearcher acquire() { |
| + IndexSearcher searcher; |
| + do { |
| + if ((searcher = currentSearcher) == null) { |
| + throw new AlreadyClosedException("this SearcherManager is closed"); |
| + } |
| + } while (!searcher.getIndexReader().tryIncRef()); |
| + return searcher; |
| } |
| |
| /** Release the searcher previously obtained with {@link |
| @@ -186,7 +188,7 @@ |
| searcher.getIndexReader().decRef(); |
| } |
| |
| - // Replaces old searcher with new one |
| + // Replaces old searcher with new one - needs to be synced to make close() work |
| private synchronized void swapSearcher(IndexSearcher newSearcher) |
| throws IOException { |
| IndexSearcher oldSearcher = currentSearcher; |
| Index: lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java |
| =================================================================== |
| --- lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (revision 1176585) |
| +++ lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java (working copy) |
| @@ -36,7 +36,7 @@ |
| protected IndexSearcher getFinalSearcher() throws Exception { |
| writer.commit(); |
| mgr.maybeReopen(); |
| - return mgr.get(); |
| + return mgr.acquire(); |
| } |
| |
| private SearcherManager mgr; |
| @@ -94,7 +94,7 @@ |
| mgr.maybeReopen(); |
| } |
| |
| - return mgr.get(); |
| + return mgr.acquire(); |
| } |
| |
| @Override |
| Index: lucene/src/java/org/apache/lucene/index/IndexReader.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/index/IndexReader.java (revision 1176585) |
| +++ lucene/src/java/org/apache/lucene/index/IndexReader.java (working copy) |
| @@ -200,11 +200,45 @@ |
| * references. |
| * |
| * @see #decRef |
| + * @see #tryIncRef |
| */ |
| public void incRef() { |
| ensureOpen(); |
| refCount.incrementAndGet(); |
| } |
| + |
| + /** |
| + * Expert: increments the refCount of this IndexReader |
| + * instance only if the IndexReader has not been closed yet |
| + * and returns <code>true</code> iff the refCount was |
| + * successfully incremented, otherwise <code>false</code>. |
| + * If this method returns <code>false</code> the reader is either |
| + * already closed or is currently been closed. Either way this |
| + * reader instance shouldn't be used by an application unless |
| + * <code>true</code> is returned. |
| + * <p> |
| + * RefCounts are used to determine when a |
| + * reader can be closed safely, i.e. as soon as there are |
| + * no more references. Be sure to always call a |
| + * corresponding {@link #decRef}, in a finally clause; |
| + * otherwise the reader may never be closed. Note that |
| + * {@link #close} simply calls decRef(), which means that |
| + * the IndexReader will not really be closed until {@link |
| + * #decRef} has been called for all outstanding |
| + * references. |
| + * |
| + * @see #decRef |
| + * @see #incRef |
| + */ |
| + public boolean tryIncRef() { |
| + int count; |
| + while ((count = refCount.get()) > 0) { |
| + if(refCount.compareAndSet(count, count+1)) { |
| + return true; |
| + } |
| + } |
| + return false; |
| + } |
| |
| /** {@inheritDoc} */ |
| @Override |
| Index: lucene/src/test/org/apache/lucene/index/TestIndexReader.java |
| =================================================================== |
| --- lucene/src/test/org/apache/lucene/index/TestIndexReader.java (revision 1176585) |
| +++ lucene/src/test/org/apache/lucene/index/TestIndexReader.java (working copy) |
| @@ -27,6 +27,7 @@ |
| import java.util.List; |
| import java.util.Map; |
| import java.util.HashMap; |
| +import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import org.junit.Assume; |
| @@ -1403,4 +1404,71 @@ |
| r.close(); |
| dir.close(); |
| } |
| + |
| + public void testTryIncRef() throws CorruptIndexException, LockObtainFailedException, IOException { |
| + Directory dir = newDirectory(); |
| + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); |
| + writer.addDocument(new Document()); |
| + writer.commit(); |
| + IndexReader r = IndexReader.open(dir); |
| + assertTrue(r.tryIncRef()); |
| + r.decRef(); |
| + r.close(); |
| + assertFalse(r.tryIncRef()); |
| + writer.close(); |
| + dir.close(); |
| + } |
| + |
| + public void testStressTryIncRef() throws CorruptIndexException, LockObtainFailedException, IOException, InterruptedException { |
| + Directory dir = newDirectory(); |
| + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); |
| + writer.addDocument(new Document()); |
| + writer.commit(); |
| + IndexReader r = IndexReader.open(dir); |
| + int numThreads = atLeast(2); |
| + |
| + IncThread[] threads = new IncThread[numThreads]; |
| + System.out.println(numThreads); |
| + for (int i = 0; i < threads.length; i++) { |
| + threads[i] = new IncThread(r, random); |
| + threads[i].start(); |
| + } |
| + Thread.sleep(100); |
| + |
| + assertTrue(r.tryIncRef()); |
| + r.decRef(); |
| + r.close(); |
| + |
| + for (int i = 0; i < threads.length; i++) { |
| + threads[i].join(); |
| + assertNull(threads[i].failed); |
| + } |
| + assertFalse(r.tryIncRef()); |
| + writer.close(); |
| + dir.close(); |
| + } |
| + |
| + static class IncThread extends Thread { |
| + final IndexReader toInc; |
| + final Random random; |
| + Throwable failed; |
| + |
| + IncThread(IndexReader toInc, Random random) { |
| + this.toInc = toInc; |
| + this.random = random; |
| + } |
| + |
| + @Override |
| + public void run() { |
| + try { |
| + while (toInc.tryIncRef()) { |
| + assertFalse(toInc.hasDeletions()); |
| + toInc.decRef(); |
| + } |
| + assertFalse(toInc.tryIncRef()); |
| + } catch (Throwable e) { |
| + failed = e; |
| + } |
| + } |
| + } |
| } |