blob: 2b6960e14760add210b9f165436ada8845fae6a8 [file] [log] [blame]
Index: lucene/CHANGES.txt
--- lucene/CHANGES.txt Sun Jan 23 11:31:46 2011 -0500
+++ lucene/CHANGES.txt Sun Jan 23 13:35:57 2011 -0500
@@ -796,6 +796,11 @@
* LUCENE-2864: Add getMaxTermFrequency (maximum within-document TF) to
FieldInvertState so that it can be used in Similarity.computeNorm.
(Robert Muir)
+
+* LUCENE-2474: Added expert ReaderFinishedListener API to
+ IndexReader, to allow apps that maintain external per-segment caches
+ to evict entries when a segment is finished. (Shay Banon, Yonik
+ Seeley, Mike McCandless)
Optimizations
Index: lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java
--- lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/contrib/instantiated/src/java/org/apache/lucene/store/instantiated/InstantiatedIndexReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -52,6 +53,7 @@
public InstantiatedIndexReader(InstantiatedIndex index) {
super();
this.index = index;
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
/**
Index: lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java
--- lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/contrib/memory/src/java/org/apache/lucene/index/memory/MemoryIndex.java Sun Jan 23 13:35:57 2011 -0500
@@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -757,6 +758,7 @@
private MemoryIndexReader() {
super(); // avoid as much superclass baggage as possible
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
private Info getInfo(String fieldName) {
Index: lucene/src/java/org/apache/lucene/index/DirectoryReader.java
--- lucene/src/java/org/apache/lucene/index/DirectoryReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/DirectoryReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -37,8 +37,6 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
-
/**
* An IndexReader which reads indexes with multiple segments.
*/
@@ -106,6 +104,7 @@
} else {
this.codecs = codecs;
}
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
// To reduce the chance of hitting FileNotFound
// (and having to retry), we open segments in
@@ -117,6 +116,7 @@
boolean success = false;
try {
readers[i] = SegmentReader.get(readOnly, sis.info(i), termInfosIndexDivisor);
+ readers[i].readerFinishedListeners = readerFinishedListeners;
success = true;
} finally {
if (!success) {
@@ -146,6 +146,7 @@
} else {
this.codecs = codecs;
}
+ readerFinishedListeners = writer.getReaderFinishedListeners();
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
@@ -160,6 +161,7 @@
final SegmentInfo info = infos.info(i);
assert info.dir == dir;
readers[i] = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
+ readers[i].readerFinishedListeners = readerFinishedListeners;
success = true;
} finally {
if (!success) {
@@ -182,11 +184,14 @@
/** This constructor is only used for {@link #reopen()} */
DirectoryReader(Directory directory, SegmentInfos infos, SegmentReader[] oldReaders, int[] oldStarts,
- boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
+ boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs,
+ Collection<ReaderFinishedListener> readerFinishedListeners) throws IOException {
this.directory = directory;
this.readOnly = readOnly;
this.segmentInfos = infos;
this.termInfosIndexDivisor = termInfosIndexDivisor;
+ this.readerFinishedListeners = readerFinishedListeners;
+
if (codecs == null) {
this.codecs = CodecProvider.getDefault();
} else {
@@ -232,8 +237,10 @@
// this is a new reader; in case we hit an exception we can close it safely
newReader = SegmentReader.get(readOnly, infos.info(i), termInfosIndexDivisor);
+ newReader.readerFinishedListeners = readerFinishedListeners;
} else {
newReader = newReaders[i].reopenSegment(infos.info(i), doClone, readOnly);
+ assert newReader.readerFinishedListeners == readerFinishedListeners;
}
if (newReader == newReaders[i]) {
// this reader will be shared between the old and the new one,
@@ -357,6 +364,7 @@
writeLock = null;
hasChanges = false;
}
+ assert newReader.readerFinishedListeners != null;
return newReader;
}
@@ -391,7 +399,9 @@
// TODO: right now we *always* make a new reader; in
// the future we could have write make some effort to
// detect that no changes have occurred
- return writer.getReader();
+ IndexReader reader = writer.getReader();
+ reader.readerFinishedListeners = readerFinishedListeners;
+ return reader;
}
private IndexReader doReopen(final boolean openReadOnly, IndexCommit commit) throws CorruptIndexException, IOException {
@@ -458,7 +468,7 @@
private synchronized DirectoryReader doReopen(SegmentInfos infos, boolean doClone, boolean openReadOnly) throws CorruptIndexException, IOException {
DirectoryReader reader;
- reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs);
+ reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs, readerFinishedListeners);
return reader;
}
@@ -805,11 +815,6 @@
}
}
- // NOTE: only needed in case someone had asked for
- // FieldCache for top-level reader (which is generally
- // not a good idea):
- FieldCache.DEFAULT.purge(this);
-
if (writer != null) {
// Since we just closed, writer may now be able to
// delete unused files:
Index: lucene/src/java/org/apache/lucene/index/FilterIndexReader.java
--- lucene/src/java/org/apache/lucene/index/FilterIndexReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/FilterIndexReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -22,13 +22,14 @@
import org.apache.lucene.index.IndexReader.ReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Collections;
/** A <code>FilterIndexReader</code> contains another IndexReader, which it
* uses as its basic source of data, possibly transforming the data along the
@@ -286,6 +287,7 @@
public FilterIndexReader(IndexReader in) {
super();
this.in = in;
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
@Override
@@ -391,11 +393,6 @@
@Override
protected void doClose() throws IOException {
in.close();
-
- // NOTE: only needed in case someone had asked for
- // FieldCache for top-level reader (which is generally
- // not a good idea):
- FieldCache.DEFAULT.purge(this);
}
@@ -454,4 +451,16 @@
buffer.append(')');
return buffer.toString();
}
-}
\ No newline at end of file
+
+ @Override
+ public void addReaderFinishedListener(ReaderFinishedListener listener) {
+ super.addReaderFinishedListener(listener);
+ in.addReaderFinishedListener(listener);
+ }
+
+ @Override
+ public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+ super.removeReaderFinishedListener(listener);
+ in.removeReaderFinishedListener(listener);
+ }
+}
Index: lucene/src/java/org/apache/lucene/index/IndexReader.java
--- lucene/src/java/org/apache/lucene/index/IndexReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/IndexReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -34,6 +34,7 @@
import java.io.Closeable;
import java.util.Collection;
import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -82,6 +83,65 @@
public abstract class IndexReader implements Cloneable,Closeable {
/**
+ * A custom listener that's invoked when the IndexReader
+ * is finished.
+ *
+ * <p>For a SegmentReader, this listener is called only
+ * once all SegmentReaders sharing the same core are
+ * closed. At this point it is safe for apps to evict
+ * this reader from any caches keyed on {@link
+ * #getCoreCacheKey}. This is the same interface that
+ * {@link FieldCache} uses, internally, to evict
+ * entries.</p>
+ *
+ * <p>For other readers, this listener is called when they
+ * are closed.</p>
+ *
+ * @lucene.experimental
+ */
+ public static interface ReaderFinishedListener {
+ public void finished(IndexReader reader);
+ }
+
+ // Impls must set this if they may call add/removeReaderFinishedListener:
+ protected volatile Collection<ReaderFinishedListener> readerFinishedListeners;
+
+ /** Expert: adds a {@link ReaderFinishedListener}. The
+ * provided listener is also added to any sub-readers, if
+ * this is a composite reader. Also, any reader reopened
+ * or cloned from this one will also copy the listeners at
+ * the time of reopen.
+ *
+ * @lucene.experimental */
+ public void addReaderFinishedListener(ReaderFinishedListener listener) {
+ readerFinishedListeners.add(listener);
+ }
+
+ /** Expert: remove a previously added {@link ReaderFinishedListener}.
+ *
+ * @lucene.experimental */
+ public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+ readerFinishedListeners.remove(listener);
+ }
+
+ protected void notifyReaderFinishedListeners() {
+ // Defensive (should never be null -- all impls must set
+ // this):
+ if (readerFinishedListeners != null) {
+
+ // Clone the set so that we don't have to sync on
+ // readerFinishedListeners while invoking them:
+ for(ReaderFinishedListener listener : new HashSet<ReaderFinishedListener>(readerFinishedListeners)) {
+ listener.finished(this);
+ }
+ }
+ }
+
+ protected void readerFinished() {
+ notifyReaderFinishedListeners();
+ }
+
+ /**
* Constants describing field properties, for example used for
* {@link IndexReader#getFieldNames(FieldOption)}.
*/
@@ -195,6 +255,7 @@
refCount.incrementAndGet();
}
}
+ readerFinished();
}
}
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Sun Jan 23 13:35:57 2011 -0500
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
@@ -358,6 +359,13 @@
return r;
}
+ // Used for all SegmentReaders we open
+ private final Collection<IndexReader.ReaderFinishedListener> readerFinishedListeners = Collections.synchronizedSet(new HashSet<IndexReader.ReaderFinishedListener>());
+
+ Collection<IndexReader.ReaderFinishedListener> getReaderFinishedListeners() throws IOException {
+ return readerFinishedListeners;
+ }
+
/** Holds shared SegmentReader instances. IndexWriter uses
* SegmentReaders for 1) applying deletes, 2) doing
* merges, 3) handing out a real-time reader. This pool
@@ -567,6 +575,7 @@
// synchronized
// Returns a ref, which we xfer to readerMap:
sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor);
+ sr.readerFinishedListeners = readerFinishedListeners;
if (info.dir == directory) {
// Only pool if reader is not external
Index: lucene/src/java/org/apache/lucene/index/MultiReader.java
--- lucene/src/java/org/apache/lucene/index/MultiReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/MultiReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -20,10 +20,11 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import java.util.HashSet;
+import java.util.Collections;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.ReaderUtil;
@@ -82,6 +83,7 @@
}
}
starts[subReaders.length] = maxDoc;
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
return ReaderUtil.buildReaderContext(this);
}
@@ -345,11 +347,6 @@
subReaders[i].close();
}
}
-
- // NOTE: only needed in case someone had asked for
- // FieldCache for top-level reader (which is generally
- // not a good idea):
- FieldCache.DEFAULT.purge(this);
}
@Override
@@ -389,4 +386,20 @@
public ReaderContext getTopReaderContext() {
return topLevelContext;
}
+
+ @Override
+ public void addReaderFinishedListener(ReaderFinishedListener listener) {
+ super.addReaderFinishedListener(listener);
+ for(IndexReader sub : subReaders) {
+ sub.addReaderFinishedListener(listener);
+ }
+ }
+
+ @Override
+ public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+ super.removeReaderFinishedListener(listener);
+ for(IndexReader sub : subReaders) {
+ sub.removeReaderFinishedListener(listener);
+ }
+ }
}
Index: lucene/src/java/org/apache/lucene/index/ParallelReader.java
--- lucene/src/java/org/apache/lucene/index/ParallelReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/ParallelReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -22,7 +22,6 @@
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.Bits;
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
import java.io.IOException;
@@ -73,6 +72,7 @@
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
+ readerFinishedListeners = Collections.synchronizedSet(new HashSet<ReaderFinishedListener>());
}
/** {@inheritDoc} */
@@ -529,8 +529,6 @@
readers.get(i).close();
}
}
-
- FieldCache.DEFAULT.purge(this);
}
@Override
@@ -548,6 +546,21 @@
return topLevelReaderContext;
}
+ @Override
+ public void addReaderFinishedListener(ReaderFinishedListener listener) {
+ super.addReaderFinishedListener(listener);
+ for (IndexReader reader : readers) {
+ reader.addReaderFinishedListener(listener);
+ }
+ }
+
+ @Override
+ public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+ super.removeReaderFinishedListener(listener);
+ for (IndexReader reader : readers) {
+ reader.removeReaderFinishedListener(listener);
+ }
+ }
}
Index: lucene/src/java/org/apache/lucene/index/SegmentReader.java
--- lucene/src/java/org/apache/lucene/index/SegmentReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/index/SegmentReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -38,7 +38,6 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.index.codecs.FieldsProducer;
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
import org.apache.lucene.util.BytesRef;
/**
@@ -183,13 +182,9 @@
storeCFSReader.close();
}
- // Force FieldCache to evict our entries at this
- // point. If the exception occurred while
- // initializing the core readers, then
- // origInstance will be null, and we don't want
- // to call FieldCache.purge (it leads to NPE):
+ // Now, notify any ReaderFinished listeners:
if (origInstance != null) {
- FieldCache.DEFAULT.purge(origInstance);
+ origInstance.notifyReaderFinishedListeners();
}
}
}
@@ -633,6 +628,7 @@
clone.si = si;
clone.readBufferSize = readBufferSize;
clone.pendingDeleteCount = pendingDeleteCount;
+ clone.readerFinishedListeners = readerFinishedListeners;
if (!openReadOnly && hasChanges) {
// My pending changes transfer to the new reader
@@ -1203,4 +1199,14 @@
public int getTermInfosIndexDivisor() {
return core.termsIndexDivisor;
}
+
+ @Override
+ protected void readerFinished() {
+ // Do nothing here -- we have more careful control on
+ // when to notify that a SegmentReader has finished,
+ // because a given core is shared across many closed
+ // SegmentReaders. We only notify once that core is no
+ // longer used (all SegmentReaders sharing it have been
+ // closed).
+ }
}
Index: lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java
--- lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/java/org/apache/lucene/search/FieldCacheImpl.java Sun Jan 23 13:35:57 2011 -0500
@@ -137,6 +137,13 @@
public Object getValue() { return value; }
}
+ final static IndexReader.ReaderFinishedListener purgeReader = new IndexReader.ReaderFinishedListener() {
+ // @Override -- not until Java 1.6
+ public void finished(IndexReader reader) {
+ FieldCache.DEFAULT.purge(reader);
+ }
+ };
+
/** Expert: Internal cache. */
final static class Cache<T> {
Cache() {
@@ -171,8 +178,10 @@
synchronized (readerCache) {
innerCache = readerCache.get(readerKey);
if (innerCache == null) {
+ // First time this reader is using FieldCache
innerCache = new HashMap<Entry<T>,Object>();
readerCache.put(readerKey, innerCache);
+ reader.addReaderFinishedListener(purgeReader);
value = null;
} else {
value = innerCache.get(key);
Index: lucene/src/test/org/apache/lucene/index/TestIndexReader.java
--- lucene/src/test/org/apache/lucene/index/TestIndexReader.java Sun Jan 23 11:31:46 2011 -0500
+++ lucene/src/test/org/apache/lucene/index/TestIndexReader.java Sun Jan 23 13:35:57 2011 -0500
@@ -1905,4 +1905,42 @@
dir.close();
}
}
+
+ // LUCENE-2474
+ public void testReaderFinishedListener() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()));
+ ((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(3);
+ writer.setInfoStream(VERBOSE ? System.out : null);
+ writer.addDocument(new Document());
+ writer.commit();
+ writer.addDocument(new Document());
+ writer.commit();
+ final IndexReader reader = writer.getReader();
+ final int[] closeCount = new int[1];
+ final IndexReader.ReaderFinishedListener listener = new IndexReader.ReaderFinishedListener() {
+ public void finished(IndexReader reader) {
+ closeCount[0]++;
+ }
+ };
+
+ reader.addReaderFinishedListener(listener);
+
+ reader.close();
+
+ // Just the top reader
+ assertEquals(1, closeCount[0]);
+ writer.close();
+
+ // Now also the subs
+ assertEquals(3, closeCount[0]);
+
+ IndexReader reader2 = IndexReader.open(dir);
+ reader2.addReaderFinishedListener(listener);
+
+ closeCount[0] = 0;
+ reader2.close();
+ assertEquals(3, closeCount[0]);
+ dir.close();
+ }
}