| Index: lucene/CHANGES.txt
|
| ===================================================================
|
| --- lucene/CHANGES.txt (revision 1476523)
|
| +++ lucene/CHANGES.txt (working copy)
|
| @@ -59,6 +59,12 @@
|
| * LUCENE-4955: NGramTokenizer now supports inputs larger than 1024 chars. |
| (Adrien Grand) |
| |
| +* LUCENE-4953: Fixed ParallelCompositeReader to inform ReaderClosedListeners of |
| + its synthetic subreaders. FieldCaches keyed on the atomic childs will be purged |
| + earlier and FC insanity prevented. In addition, ParallelCompositeReader's |
| + toString() was changed to better reflect the reader structure. |
| + (Mike McCandless, Uwe Schindler) |
| + |
| Optimizations |
| |
| * LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher |
| Index: lucene/core/src/java/org/apache/lucene/index/CompositeReader.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/index/CompositeReader.java (revision 1476523)
|
| +++ lucene/core/src/java/org/apache/lucene/index/CompositeReader.java (working copy)
|
| @@ -66,7 +66,13 @@
|
| @Override |
| public String toString() { |
| final StringBuilder buffer = new StringBuilder(); |
| - buffer.append(getClass().getSimpleName()); |
| + // walk up through class hierarchy to get a non-empty simple name (anonymous classes have no name): |
| + for (Class<?> clazz = getClass(); clazz != null; clazz = clazz.getSuperclass()) { |
| + if (!clazz.isAnonymousClass()) { |
| + buffer.append(clazz.getSimpleName()); |
| + break; |
| + } |
| + } |
| buffer.append('('); |
| final List<? extends IndexReader> subReaders = getSequentialSubReaders(); |
| assert subReaders != null; |
| Index: lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (revision 1476523)
|
| +++ lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (working copy)
|
| @@ -47,7 +47,7 @@
|
| * same order to the other indexes. <em>Failure to do so will result in |
| * undefined behavior</em>. |
| */ |
| -public final class ParallelAtomicReader extends AtomicReader { |
| +public class ParallelAtomicReader extends AtomicReader { |
| private final FieldInfos fieldInfos; |
| private final ParallelFields fields = new ParallelFields(); |
| private final AtomicReader[] parallelReaders, storedFieldsReaders; |
| Index: lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (revision 1476523)
|
| +++ lucene/core/src/java/org/apache/lucene/index/ParallelCompositeReader.java (working copy)
|
| @@ -20,7 +20,6 @@
|
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.IdentityHashMap; |
| -import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| @@ -47,10 +46,10 @@
|
| * by number of documents per segment. If you use different {@link MergePolicy}s |
| * it might happen that the segment structure of your index is no longer predictable. |
| */ |
| -public final class ParallelCompositeReader extends BaseCompositeReader<IndexReader> { |
| +public class ParallelCompositeReader extends BaseCompositeReader<IndexReader> { |
| private final boolean closeSubReaders; |
| - private final Set<CompositeReader> completeReaderSet = |
| - Collections.newSetFromMap(new IdentityHashMap<CompositeReader,Boolean>()); |
| + private final Set<IndexReader> completeReaderSet = |
| + Collections.newSetFromMap(new IdentityHashMap<IndexReader,Boolean>()); |
| |
| /** Create a ParallelCompositeReader based on the provided |
| * readers; auto-closes the given readers on {@link #close()}. */ |
| @@ -72,12 +71,14 @@
|
| this.closeSubReaders = closeSubReaders; |
| Collections.addAll(completeReaderSet, readers); |
| Collections.addAll(completeReaderSet, storedFieldReaders); |
| - // do this finally so any Exceptions occurred before don't affect refcounts: |
| + // update ref-counts (like MultiReader): |
| if (!closeSubReaders) { |
| - for (CompositeReader reader : completeReaderSet) { |
| + for (final IndexReader reader : completeReaderSet) { |
| reader.incRef(); |
| } |
| } |
| + // finally add our own synthetic readers, so we close or decRef them, too (it does not matter what we do) |
| + completeReaderSet.addAll(getSequentialSubReaders()); |
| } |
| |
| private static IndexReader[] prepareSubReaders(CompositeReader[] readers, CompositeReader[] storedFieldsReaders) throws IOException { |
| @@ -112,10 +113,12 @@
|
| for (int j = 0; j < storedFieldsReaders.length; j++) { |
| storedSubs[j] = (AtomicReader) storedFieldsReaders[j].getSequentialSubReaders().get(i); |
| } |
| - // we simply enable closing of subReaders, to prevent incRefs on subReaders |
| - // -> for synthetic subReaders, close() is never |
| - // called by our doClose() |
| - subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs); |
| + // We pass true for closeSubs and we prevent closing of subreaders in doClose(): |
| + // By this the synthetic throw-away readers used here are completely invisible to ref-counting |
| + subReaders[i] = new ParallelAtomicReader(true, atomicSubs, storedSubs) { |
| + @Override |
| + protected void doClose() {} |
| + }; |
| } else { |
| assert firstSubReaders.get(i) instanceof CompositeReader; |
| final CompositeReader[] compositeSubs = new CompositeReader[readers.length]; |
| @@ -126,9 +129,12 @@
|
| for (int j = 0; j < storedFieldsReaders.length; j++) { |
| storedSubs[j] = (CompositeReader) storedFieldsReaders[j].getSequentialSubReaders().get(i); |
| } |
| - // we simply enable closing of subReaders, to prevent incRefs on subReaders |
| - // -> for synthetic subReaders, close() is never called by our doClose() |
| - subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs); |
| + // We pass true for closeSubs and we prevent closing of subreaders in doClose(): |
| + // By this the synthetic throw-away readers used here are completely invisible to ref-counting |
| + subReaders[i] = new ParallelCompositeReader(true, compositeSubs, storedSubs) { |
| + @Override |
| + protected void doClose() {} |
| + }; |
| } |
| } |
| return subReaders; |
| @@ -159,19 +165,9 @@
|
| } |
| |
| @Override |
| - public String toString() { |
| - final StringBuilder buffer = new StringBuilder("ParallelCompositeReader("); |
| - for (final Iterator<CompositeReader> iter = completeReaderSet.iterator(); iter.hasNext();) { |
| - buffer.append(iter.next()); |
| - if (iter.hasNext()) buffer.append(", "); |
| - } |
| - return buffer.append(')').toString(); |
| - } |
| - |
| - @Override |
| protected synchronized void doClose() throws IOException { |
| IOException ioe = null; |
| - for (final CompositeReader reader : completeReaderSet) { |
| + for (final IndexReader reader : completeReaderSet) { |
| try { |
| if (closeSubReaders) { |
| reader.close(); |
| Index: lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java
|
| ===================================================================
|
| --- lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (revision 1476523)
|
| +++ lucene/core/src/test/org/apache/lucene/index/TestParallelAtomicReader.java (working copy)
|
| @@ -25,6 +25,7 @@
|
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.search.BooleanClause.Occur; |
| import org.apache.lucene.search.*; |
| +import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util._TestUtil; |
| @@ -114,6 +115,29 @@
|
| dir2.close(); |
| } |
| |
| + public void testCloseInnerReader() throws Exception { |
| + Directory dir1 = getDir1(random()); |
| + AtomicReader ir1 = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir1)); |
| + |
| + // with overlapping |
| + ParallelAtomicReader pr = new ParallelAtomicReader(true, |
| + new AtomicReader[] {ir1}, |
| + new AtomicReader[] {ir1}); |
| + |
| + ir1.close(); |
| + |
| + try { |
| + pr.document(0); |
| + fail("ParallelAtomicReader should be already closed because inner reader was closed!"); |
| + } catch (AlreadyClosedException e) { |
| + // pass |
| + } |
| + |
| + // noop: |
| + pr.close(); |
| + dir1.close(); |
| + } |
| + |
| public void testIncompatibleIndexes() throws IOException { |
| // two documents: |
| Directory dir1 = getDir1(random()); |
| Index: lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java
|
| ===================================================================
|
| --- lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (revision 1476523)
|
| +++ lucene/core/src/test/org/apache/lucene/index/TestParallelCompositeReader.java (working copy)
|
| @@ -23,8 +23,10 @@
|
| import org.apache.lucene.analysis.MockAnalyzer; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| +import org.apache.lucene.index.IndexReader.ReaderClosedListener; |
| import org.apache.lucene.search.BooleanClause.Occur; |
| import org.apache.lucene.search.*; |
| +import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.util.LuceneTestCase; |
| |
| @@ -82,12 +84,15 @@
|
| // close subreaders, ParallelReader will not change refCounts, but close on its own close |
| ParallelCompositeReader pr = new ParallelCompositeReader(ir1 = DirectoryReader.open(dir1), |
| ir2 = DirectoryReader.open(dir2)); |
| + IndexReader psub1 = pr.getSequentialSubReaders().get(0); |
| // check RefCounts |
| assertEquals(1, ir1.getRefCount()); |
| assertEquals(1, ir2.getRefCount()); |
| + assertEquals(1, psub1.getRefCount()); |
| pr.close(); |
| assertEquals(0, ir1.getRefCount()); |
| assertEquals(0, ir2.getRefCount()); |
| + assertEquals(0, psub1.getRefCount()); |
| dir1.close(); |
| dir2.close(); |
| } |
| @@ -100,20 +105,115 @@
|
| |
| // don't close subreaders, so ParallelReader will increment refcounts |
| ParallelCompositeReader pr = new ParallelCompositeReader(false, ir1, ir2); |
| + IndexReader psub1 = pr.getSequentialSubReaders().get(0); |
| // check RefCounts |
| assertEquals(2, ir1.getRefCount()); |
| assertEquals(2, ir2.getRefCount()); |
| + assertEquals("refCount must be 1, as the synthetic reader was created by ParallelCompositeReader", 1, psub1.getRefCount()); |
| pr.close(); |
| assertEquals(1, ir1.getRefCount()); |
| assertEquals(1, ir2.getRefCount()); |
| + assertEquals("refcount must be 0 because parent was closed", 0, psub1.getRefCount()); |
| ir1.close(); |
| ir2.close(); |
| assertEquals(0, ir1.getRefCount()); |
| assertEquals(0, ir2.getRefCount()); |
| + assertEquals("refcount should not change anymore", 0, psub1.getRefCount()); |
| dir1.close(); |
| dir2.close(); |
| } |
| |
| + // closeSubreaders=false |
| + public void testReaderClosedListener1() throws Exception { |
| + Directory dir1 = getDir1(random()); |
| + CompositeReader ir1 = DirectoryReader.open(dir1); |
| + |
| + // with overlapping |
| + ParallelCompositeReader pr = new ParallelCompositeReader(false, |
| + new CompositeReader[] {ir1}, |
| + new CompositeReader[] {ir1}); |
| + |
| + final int[] listenerClosedCount = new int[1]; |
| + |
| + assertEquals(3, pr.leaves().size()); |
| + |
| + for(AtomicReaderContext cxt : pr.leaves()) { |
| + cxt.reader().addReaderClosedListener(new ReaderClosedListener() { |
| + @Override |
| + public void onClose(IndexReader reader) { |
| + listenerClosedCount[0]++; |
| + } |
| + }); |
| + } |
| + pr.close(); |
| + ir1.close(); |
| + assertEquals(3, listenerClosedCount[0]); |
| + dir1.close(); |
| + } |
| + |
| + // closeSubreaders=true |
| + public void testReaderClosedListener2() throws Exception { |
| + Directory dir1 = getDir1(random()); |
| + CompositeReader ir1 = DirectoryReader.open(dir1); |
| + |
| + // with overlapping |
| + ParallelCompositeReader pr = new ParallelCompositeReader(true, |
| + new CompositeReader[] {ir1}, |
| + new CompositeReader[] {ir1}); |
| + |
| + final int[] listenerClosedCount = new int[1]; |
| + |
| + assertEquals(3, pr.leaves().size()); |
| + |
| + for(AtomicReaderContext cxt : pr.leaves()) { |
| + cxt.reader().addReaderClosedListener(new ReaderClosedListener() { |
| + @Override |
| + public void onClose(IndexReader reader) { |
| + listenerClosedCount[0]++; |
| + } |
| + }); |
| + } |
| + pr.close(); |
| + assertEquals(3, listenerClosedCount[0]); |
| + dir1.close(); |
| + } |
| + |
| + public void testCloseInnerReader() throws Exception { |
| + Directory dir1 = getDir1(random()); |
| + CompositeReader ir1 = DirectoryReader.open(dir1); |
| + assertEquals(1, ir1.getSequentialSubReaders().get(0).getRefCount()); |
| + |
| + // with overlapping |
| + ParallelCompositeReader pr = new ParallelCompositeReader(true, |
| + new CompositeReader[] {ir1}, |
| + new CompositeReader[] {ir1}); |
| + |
| + IndexReader psub = pr.getSequentialSubReaders().get(0); |
| + assertEquals(1, psub.getRefCount()); |
| + |
| + ir1.close(); |
| + |
| + assertEquals("refCount of synthetic subreader should be unchanged", 1, psub.getRefCount()); |
| + try { |
| + psub.document(0); |
| + fail("Subreader should be already closed because inner reader was closed!"); |
| + } catch (AlreadyClosedException e) { |
| + // pass |
| + } |
| + |
| + try { |
| + pr.document(0); |
| + fail("ParallelCompositeReader should be already closed because inner reader was closed!"); |
| + } catch (AlreadyClosedException e) { |
| + // pass |
| + } |
| + |
| + // noop: |
| + pr.close(); |
| + assertEquals(0, psub.getRefCount()); |
| + dir1.close(); |
| + } |
| + |
| public void testIncompatibleIndexes1() throws IOException { |
| // two documents: |
| Directory dir1 = getDir1(random()); |
| @@ -277,6 +377,30 @@
|
| dir2.close(); |
| } |
| |
| + public void testToString() throws IOException { |
| + Directory dir1 = getDir1(random()); |
| + CompositeReader ir1 = DirectoryReader.open(dir1); |
| + ParallelCompositeReader pr = new ParallelCompositeReader(new CompositeReader[] {ir1}); |
| + |
| + final String s = pr.toString(); |
| + assertTrue("toString incorrect: " + s, s.startsWith("ParallelCompositeReader(ParallelAtomicReader(")); |
| + |
| + pr.close(); |
| + dir1.close(); |
| + } |
| + |
| + public void testToStringCompositeComposite() throws IOException { |
| + Directory dir1 = getDir1(random()); |
| + CompositeReader ir1 = DirectoryReader.open(dir1); |
| + ParallelCompositeReader pr = new ParallelCompositeReader(new CompositeReader[] {new MultiReader(ir1)}); |
| + |
| + final String s = pr.toString(); |
| + assertTrue("toString incorrect: " + s, s.startsWith("ParallelCompositeReader(ParallelCompositeReader(ParallelAtomicReader(")); |
| + |
| + pr.close(); |
| + dir1.close(); |
| + } |
| + |
| private void queryTest(Query query) throws IOException { |
| ScoreDoc[] parallelHits = parallel.search(query, null, 1000).scoreDocs; |
| ScoreDoc[] singleHits = single.search(query, null, 1000).scoreDocs; |