LUCENE-3445: add SearcherManager to simplify handling of multiple search threads and reopening IndexSearcher
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/branch_3x@1175420 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lucene/contrib/CHANGES.txt b/lucene/contrib/CHANGES.txt
index 59350ee..797d921 100644
--- a/lucene/contrib/CHANGES.txt
+++ b/lucene/contrib/CHANGES.txt
@@ -26,6 +26,11 @@
* LUCENE-3414: Added HunspellStemFilter which uses a provided pure Java implementation of the
Hunspell algorithm. (Chris Male)
+ * LUCENE-3445: Added SearcherManager, to manage sharing and reopening
+ IndexSearchers across multiple search threads. IndexReader's
+ refCount is used to safely close the reader only once all threads are done
+ using it. (Michael McCandless)
+
API Changes
* LUCENE-3431: Deprecated QueryAutoStopWordAnalyzer.addStopWords* since they
diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java b/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java
index 1c1097e..a4548d8 100644
--- a/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java
+++ b/lucene/contrib/misc/src/java/org/apache/lucene/index/NRTManager.java
@@ -20,16 +20,17 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.List;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader; // javadocs
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
+import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.util.ThreadInterruptedException;
// TODO
@@ -47,7 +48,7 @@
* caller is waiting for a specific generation searcher. </p>
*
* @lucene.experimental
-*/
+ */
public class NRTManager implements Closeable {
private final IndexWriter writer;
@@ -55,36 +56,36 @@
private final AtomicLong indexingGen;
private final AtomicLong searchingGen;
private final AtomicLong noDeletesSearchingGen;
+ private final SearcherWarmer warmer;
private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
private volatile IndexSearcher currentSearcher;
private volatile IndexSearcher noDeletesCurrentSearcher;
/**
- * Create new NRTManager. Note that this installs a
- * merged segment warmer on the provided IndexWriter's
- * config.
+ * Create new NRTManager.
*
* @param writer IndexWriter to open near-real-time
* readers
- */
- public NRTManager(IndexWriter writer) throws IOException {
- this(writer, null);
- }
-
- /**
- * Create new NRTManager. Note that this installs a
- * merged segment warmer on the provided IndexWriter's
- * config.
- *
- * @param writer IndexWriter to open near-real-time
- * readers
- * @param es ExecutorService to pass to the IndexSearcher
- */
- public NRTManager(IndexWriter writer, ExecutorService es) throws IOException {
+ * @param es optional ExecutorService so different segments can
+ * be searched concurrently (see {@link
+ * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
+ * to search segments sequentially.
+ * @param warmer optional {@link SearcherWarmer}. Pass
+ * null if you don't require the searcher to warmed
+ * before going live. If this is non-null then a
+ * merged segment warmer is installed on the
+ * provided IndexWriter's config.
+ *
+ * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
+ * not invoked for the initial searcher; you should
+ * warm it yourself if necessary.
+ */
+ public NRTManager(IndexWriter writer, ExecutorService es, SearcherWarmer warmer) throws IOException {
this.writer = writer;
this.es = es;
+ this.warmer = warmer;
indexingGen = new AtomicLong(1);
searchingGen = new AtomicLong(-1);
noDeletesSearchingGen = new AtomicLong(-1);
@@ -92,13 +93,15 @@
// Create initial reader:
swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
- writer.getConfig().setMergedSegmentWarmer(
+ if (this.warmer != null) {
+ writer.getConfig().setMergedSegmentWarmer(
new IndexWriter.IndexReaderWarmer() {
@Override
public void warm(IndexReader reader) throws IOException {
- NRTManager.this.warm(reader);
+ NRTManager.this.warmer.warm(new IndexSearcher(reader, NRTManager.this.es));
}
});
+ }
}
/** NRTManager invokes this interface to notify it when a
@@ -263,7 +266,10 @@
}
/** Release the searcher obtained from {@link
- * #get()} or {@link #get(long)}. */
+ * #get()} or {@link #get(long)}.
+ *
+ * <p><b>NOTE</b>: it's safe to call this after {@link
+ * #close}. */
public void release(IndexSearcher s) throws IOException {
s.getIndexReader().decRef();
}
@@ -305,23 +311,19 @@
final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
- warm(nextReader);
+ final IndexSearcher nextSearcher = new IndexSearcher(nextReader, es);
+ if (warmer != null) {
+ warmer.warm(nextSearcher);
+ }
// Transfer reference to swapSearcher:
- swapSearcher(new IndexSearcher(nextReader, es),
+ swapSearcher(nextSearcher,
newSearcherGen,
applyDeletes);
return true;
}
- /** Override this to warm the newly opened reader before
- * it's swapped in. Note that this is called both for
- * newly merged segments and for new top-level readers
- * opened by #reopen. */
- protected void warm(IndexReader reader) throws IOException {
- }
-
// Steals a reference from newSearcher:
private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
//System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
@@ -351,7 +353,12 @@
//System.out.println(Thread.currentThread().getName() + ": done");
}
- /** NOTE: caller must separately close the writer. */
+ /** Close this NRTManager to future searching. Any
+ * searches still in process in other threads won't be
+ * affected, and they should still call {@link #release}
+ * after they are done.
+ *
+ * <p><b>NOTE</b>: caller must separately close the writer. */
// @Override -- not until Java 1.6
public void close() throws IOException {
swapSearcher(null, indexingGen.getAndIncrement(), true);
diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java
new file mode 100644
index 0000000..9fff8c9
--- /dev/null
+++ b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherManager.java
@@ -0,0 +1,201 @@
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.NRTManager; // javadocs
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+
+/** Utility class to safely share {@link IndexSearcher} instances
+ * across multiple threads, while periodically reopening.
+ * This class ensures each IndexSearcher instance is not
+ * closed until it is no longer needed.
+ *
+ * <p>Use {@link #get} to obtain the current searcher, and
+ * {@link #release} to release it, like this:
+ *
+ * <pre>
+ * IndexSearcher s = manager.get();
+ * try {
+ * // Do searching, doc retrieval, etc. with s
+ * } finally {
+ * manager.release(s);
+ * }
+ * // Do not use s after this!
+ * s = null;
+ * </pre>
+ *
+ * <p>In addition you should periodically call {@link
+ * #maybeReopen}. While it's possible to call this just
+ * before running each query, this is discouraged since it
+ * penalizes the unlucky queries that do the reopen. It's
+ * better to use a separate background thread, that
+ * periodically calls maybeReopen. Finally, be sure to
+ * call {@link #close} once you are done.
+ *
+ * <p><b>NOTE</b>: if you have an {@link IndexWriter}, it's
+ * better to use {@link NRTManager} since that class pulls
+ * near-real-time readers from the IndexWriter.
+ *
+ * @lucene.experimental
+ */
+
+public class SearcherManager implements Closeable {
+
+ // Current searcher
+ private volatile IndexSearcher currentSearcher;
+ private final SearcherWarmer warmer;
+ private final AtomicBoolean reopening = new AtomicBoolean();
+ private final ExecutorService es;
+
+ /** Opens an initial searcher from the Directory.
+ *
+ * @param dir Directory to open the searcher from
+ *
+ * @param warmer optional {@link SearcherWarmer}. Pass
+ * null if you don't require the searcher to warmed
+ * before going live.
+ *
+ * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
+ * not invoked for the initial searcher; you should
+ * warm it yourself if necessary.
+ */
+ public SearcherManager(Directory dir, SearcherWarmer warmer) throws IOException {
+ this(dir, warmer, null);
+ }
+
+ /** Opens an initial searcher from the Directory.
+ *
+ * @param dir Directory to open the searcher from
+ *
+ * @param warmer optional {@link SearcherWarmer}. Pass
+ * null if you don't require the searcher to warmed
+ * before going live.
+ *
+ * @param es optional ExecutorService so different segments can
+ * be searched concurrently (see {@link
+ * IndexSearcher#IndexSearcher(IndexReader,ExecutorService)}. Pass null
+ * to search segments sequentially.
+ *
+ * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
+ * not invoked for the initial searcher; you should
+ * warm it yourself if necessary.
+ */
+ public SearcherManager(Directory dir, SearcherWarmer warmer, ExecutorService es) throws IOException {
+ this.es = es;
+ currentSearcher = new IndexSearcher(IndexReader.open(dir), this.es);
+ this.warmer = warmer;
+ }
+
+ /** You must call this, periodically, to perform a
+ * reopen. This calls {@link IndexReader#reopen} on the
+ * underlying reader, and if that returns a new reader,
+ * it's warmed (if you provided a {@link SearcherWarmer}
+ * and then swapped into production.
+ *
+ * <p><b>Threads</b>: it's fine for more than one thread to
+ * call this at once. Only the first thread will attempt
+ * the reopen; subsequent threads will see that another
+ * thread is already handling reopen and will return
+ * immediately. Note that this means if another thread
+ * is already reopening then subsequent threads will
+ * return right away without waiting for the reader
+ * reopen to complete.</p>
+ *
+ * <p>This method returns true if a new reader was in
+ * fact opened.</p>
+ */
+ public boolean maybeReopen()
+ throws IOException {
+
+ if (currentSearcher == null) {
+ throw new AlreadyClosedException("this SearcherManager is closed");
+ }
+
+ // Ensure only 1 thread does reopen at once; other
+ // threads just return immediately:
+ if (!reopening.getAndSet(true)) {
+ try {
+ IndexReader newReader = currentSearcher.getIndexReader().reopen();
+ if (newReader != currentSearcher.getIndexReader()) {
+ IndexSearcher newSearcher = new IndexSearcher(newReader, es);
+ if (warmer != null) {
+ warmer.warm(newSearcher);
+ }
+ swapSearcher(newSearcher);
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ reopening.set(false);
+ }
+ } else {
+ return false;
+ }
+ }
+
+ /** Obtain the current IndexSearcher. You must match
+ * every call to get with one call to {@link #release};
+ * it's best to do so in a finally clause. */
+ public IndexSearcher get() {
+ IndexSearcher toReturn = currentSearcher;
+ if (toReturn == null) {
+ throw new AlreadyClosedException("this SearcherManager is closed");
+ }
+ toReturn.getIndexReader().incRef();
+ return toReturn;
+ }
+
+ /** Release the searcher previously obtained with {@link
+ * #get}.
+ *
+ * <p><b>NOTE</b>: it's safe to call this after {@link
+ * #close}. */
+ public void release(IndexSearcher searcher)
+ throws IOException {
+ searcher.getIndexReader().decRef();
+ }
+
+ // Replaces old searcher with new one
+ private void swapSearcher(IndexSearcher newSearcher)
+ throws IOException {
+ IndexSearcher oldSearcher = currentSearcher;
+ if (oldSearcher == null) {
+ throw new AlreadyClosedException("this SearcherManager is closed");
+ }
+ currentSearcher = newSearcher;
+ release(oldSearcher);
+ }
+
+ /** Close this SearcherManager to future searching. Any
+ * searches still in process in other threads won't be
+ * affected, and they should still call {@link #release}
+ * after they are done. */
+ @Override
+ public void close() throws IOException {
+ swapSearcher(null);
+ }
+}
diff --git a/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java
new file mode 100644
index 0000000..52f0ec4
--- /dev/null
+++ b/lucene/contrib/misc/src/java/org/apache/lucene/search/SearcherWarmer.java
@@ -0,0 +1,34 @@
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.index.NRTManager; // javadocs
+
+/** Pass an implementation of this to {@link NRTManager} or
+ * {@link SearcherManager} to warm a new {@link
+ * IndexSearcher} before it's put into production.
+ *
+ * @lucene.experimental */
+
+public interface SearcherWarmer {
+ // TODO: can we somehow merge this w/ IW's
+ // IndexReaderWarmer.... should IW switch to this?
+ public void warm(IndexSearcher s) throws IOException;
+}
diff --git a/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java b/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java
index be1789d..7af99c8 100644
--- a/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java
+++ b/lucene/contrib/misc/src/test/org/apache/lucene/index/TestNRTManager.java
@@ -17,152 +17,161 @@
* limitations under the License.
*/
-import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.Collection;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Fieldable;
import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.PhraseQuery;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory;
-import org.apache.lucene.util.LineFileDocs;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.NamedThreadFactory;
-import org.apache.lucene.util._TestUtil;
-import org.junit.Test;
-// TODO
-// - mix in optimize, addIndexes
-// - randomoly mix in non-congruent docs
+public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
-// NOTE: This is a copy of TestNRTThreads, but swapping in
-// NRTManager for adding/updating/searching
+ private final ThreadLocal<Long> lastGens = new ThreadLocal<Long>();
+ private boolean warmCalled;
-public class TestNRTManager extends LuceneTestCase {
-
- private static class SubDocs {
- public final String packID;
- public final List<String> subIDs;
- public boolean deleted;
-
- public SubDocs(String packID, List<String> subIDs) {
- this.packID = packID;
- this.subIDs = subIDs;
- }
- }
-
- // TODO: is there a pre-existing way to do this!!!
- private Document cloneDoc(Document doc1) {
- final Document doc2 = new Document();
- for(Fieldable f : doc1.getFields()) {
- Field field1 = (Field) f;
-
- Field field2 = new Field(field1.name(),
- field1.stringValue(),
- field1.isStored() ? Field.Store.YES : Field.Store.NO,
- field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
- if (field1.getOmitNorms()) {
- field2.setOmitNorms(true);
- }
- field2.setIndexOptions(field1.getIndexOptions());
- doc2.add(field2);
- }
-
- return doc2;
- }
-
- @Test
public void testNRTManager() throws Exception {
+ runTest("TestNRTManager");
+ }
- final long t0 = System.currentTimeMillis();
-
- final LineFileDocs docs = new LineFileDocs(random);
- final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
- final MockDirectoryWrapper _dir = newFSDirectory(tempDir);
- _dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves
- Directory dir = _dir;
- final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setOpenMode(IndexWriterConfig.OpenMode.CREATE);
-
- if (LuceneTestCase.TEST_NIGHTLY) {
- // newIWConfig makes smallish max seg size, which
- // results in tons and tons of segments for this test
- // when run nightly:
- MergePolicy mp = conf.getMergePolicy();
- if (mp instanceof TieredMergePolicy) {
- ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
- } else if (mp instanceof LogByteSizeMergePolicy) {
- ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
- } else if (mp instanceof LogMergePolicy) {
- ((LogMergePolicy) mp).setMaxMergeDocs(100000);
- }
+ @Override
+ protected IndexSearcher getFinalSearcher() throws Exception {
+ if (VERBOSE) {
+ System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
+ return nrt.get(maxGen, true);
+ }
- conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
- @Override
- public void warm(IndexReader reader) throws IOException {
- if (VERBOSE) {
- System.out.println("TEST: now warm merged reader=" + reader);
- }
- final int maxDoc = reader.maxDoc();
- int sum = 0;
- final int inc = Math.max(1, maxDoc/50);
- for(int docID=0;docID<maxDoc;docID += inc) {
- if (!reader.isDeleted(docID)) {
- final Document doc = reader.document(docID);
- sum += doc.getFields().size();
- }
- }
-
- IndexSearcher searcher = newSearcher(reader);
- sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
- searcher.close();
-
- if (VERBOSE) {
- System.out.println("TEST: warm visited " + sum + " fields");
- }
- }
- });
-
+ @Override
+ protected Directory getDirectory(Directory in) {
+ // Randomly swap in NRTCachingDir
if (random.nextBoolean()) {
if (VERBOSE) {
System.out.println("TEST: wrap NRTCachingDir");
}
- NRTCachingDirectory nrtDir = new NRTCachingDirectory(dir, 5.0, 60.0);
- conf.setMergeScheduler(nrtDir.getMergeScheduler());
- dir = nrtDir;
+ return new NRTCachingDirectory(in, 5.0, 60.0);
+ } else {
+ return in;
+ }
+ }
+
+ @Override
+ protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
+ final long gen = nrt.updateDocuments(id, docs);
+
+ // Randomly verify the update "took":
+ if (random.nextInt(20) == 2) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ }
+ final IndexSearcher s = nrt.get(gen, true);
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ }
+ try {
+ assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
+ } finally {
+ nrt.release(s);
+ }
}
- final IndexWriter writer = new IndexWriter(dir, conf);
-
- if (VERBOSE) {
- writer.setInfoStream(System.out);
+ lastGens.set(gen);
+ }
+
+ @Override
+ protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
+ final long gen = nrt.addDocuments(docs);
+ // Randomly verify the add "took":
+ if (random.nextInt(20) == 2) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ }
+ final IndexSearcher s = nrt.get(gen, false);
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ }
+ try {
+ assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
+ } finally {
+ nrt.release(s);
+ }
}
- _TestUtil.reduceOpenFiles(writer);
- //System.out.println("TEST: conf=" + writer.getConfig());
+ lastGens.set(gen);
+ }
- final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads"));
+ @Override
+ protected void addDocument(Term id, Document doc) throws Exception {
+ final long gen = nrt.addDocument(doc);
+ // Randomly verify the add "took":
+ if (random.nextInt(20) == 2) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ }
+ final IndexSearcher s = nrt.get(gen, false);
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ }
+ try {
+ assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
+ } finally {
+ nrt.release(s);
+ }
+ }
+ lastGens.set(gen);
+ }
+
+ @Override
+ protected void updateDocument(Term id, Document doc) throws Exception {
+ final long gen = nrt.updateDocument(id, doc);
+ // Randomly verify the udpate "took":
+ if (random.nextInt(20) == 2) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ }
+ final IndexSearcher s = nrt.get(gen, true);
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ }
+ try {
+ assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
+ } finally {
+ nrt.release(s);
+ }
+ }
+ lastGens.set(gen);
+ }
+
+ @Override
+ protected void deleteDocuments(Term id) throws Exception {
+ final long gen = nrt.deleteDocuments(id);
+ // randomly verify the delete "took":
+ if (random.nextInt(20) == 7) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
+ }
+ final IndexSearcher s = nrt.get(gen, true);
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ }
+ try {
+ assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
+ } finally {
+ nrt.release(s);
+ }
+ }
+ lastGens.set(gen);
+ }
+
+ private NRTManager nrt;
+ private NRTManagerReopenThread nrtThread;
+
+ @Override
+ protected void doAfterWriter(ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
@@ -170,499 +179,57 @@
System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
- final NRTManager nrt = new NRTManager(writer, es);
- final NRTManagerReopenThread nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
+ nrt = new NRTManager(writer, es,
+ new SearcherWarmer() {
+ @Override
+ public void warm(IndexSearcher s) throws IOException {
+ TestNRTManager.this.warmCalled = true;
+ s.search(new TermQuery(new Term("body", "united")), 10);
+ }
+ });
+ nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
nrtThread.setName("NRT Reopen Thread");
nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtThread.setDaemon(true);
nrtThread.start();
+ }
- final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 1, 3);
- final int NUM_SEARCH_THREADS = _TestUtil.nextInt(random, 1, 3);
- //final int NUM_INDEX_THREADS = 1;
- //final int NUM_SEARCH_THREADS = 1;
- if (VERBOSE) {
- System.out.println("TEST: " + NUM_INDEX_THREADS + " index threads; " + NUM_SEARCH_THREADS + " search threads");
+ @Override
+ protected void doAfterIndexingThreadDone() {
+ Long gen = lastGens.get();
+ if (gen != null) {
+ addMaxGen(gen);
}
+ }
- final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
+ private long maxGen = -1;
- final AtomicBoolean failed = new AtomicBoolean();
- final AtomicInteger addCount = new AtomicInteger();
- final AtomicInteger delCount = new AtomicInteger();
- final AtomicInteger packCount = new AtomicInteger();
- final List<Long> lastGens = new ArrayList<Long>();
+ private synchronized void addMaxGen(long gen) {
+ maxGen = Math.max(gen, maxGen);
+ }
- final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
- final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
+ @Override
+ protected void doSearching(ExecutorService es, long stopTime) throws Exception {
+ runSearchThreads(stopTime);
+ }
- final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
- Thread[] threads = new Thread[NUM_INDEX_THREADS];
- for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
- threads[thread] = new Thread() {
- @Override
- public void run() {
- // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
- final List<String> toDeleteIDs = new ArrayList<String>();
- final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
+ @Override
+ protected IndexSearcher getCurrentSearcher() throws Exception {
+ return nrt.get(random.nextBoolean());
+ }
- long gen = 0;
- while(System.currentTimeMillis() < stopTime && !failed.get()) {
-
- //System.out.println(Thread.currentThread().getName() + ": cycle");
- try {
- // Occassional longish pause if running
- // nightly
- if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": now long sleep");
- }
- Thread.sleep(_TestUtil.nextInt(random, 50, 500));
- }
-
- // Rate limit ingest rate:
- Thread.sleep(_TestUtil.nextInt(random, 1, 10));
- if (VERBOSE) {
- System.out.println(Thread.currentThread() + ": done sleep");
- }
-
- Document doc = docs.nextDoc();
- if (doc == null) {
- break;
- }
- final String addedField;
- if (random.nextBoolean()) {
- addedField = "extra" + random.nextInt(10);
- doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
- } else {
- addedField = null;
- }
- if (random.nextBoolean()) {
-
- if (random.nextBoolean()) {
- // Add a pack of adjacent sub-docs
- final String packID;
- final SubDocs delSubDocs;
- if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
- delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
- assert !delSubDocs.deleted;
- toDeleteSubDocs.remove(delSubDocs);
- // reuse prior packID
- packID = delSubDocs.packID;
- } else {
- delSubDocs = null;
- // make new packID
- packID = packCount.getAndIncrement() + "";
- }
-
- final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
- final List<String> docIDs = new ArrayList<String>();
- final SubDocs subDocs = new SubDocs(packID, docIDs);
- final List<Document> docsList = new ArrayList<Document>();
-
- allSubDocs.add(subDocs);
- doc.add(packIDField);
- docsList.add(cloneDoc(doc));
- docIDs.add(doc.get("docid"));
-
- final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
- while(docsList.size() < maxDocCount) {
- doc = docs.nextDoc();
- if (doc == null) {
- break;
- }
- docsList.add(cloneDoc(doc));
- docIDs.add(doc.get("docid"));
- }
- addCount.addAndGet(docsList.size());
-
- if (delSubDocs != null) {
- delSubDocs.deleted = true;
- delIDs.addAll(delSubDocs.subIDs);
- delCount.addAndGet(delSubDocs.subIDs.size());
- if (VERBOSE) {
- System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
- }
- gen = nrt.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
- /*
- // non-atomic:
- nrt.deleteDocuments(new Term("packID", delSubDocs.packID));
- for(Document subDoc : docsList) {
- nrt.addDocument(subDoc);
- }
- */
- } else {
- if (VERBOSE) {
- System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
- }
- gen = nrt.addDocuments(docsList);
-
- /*
- // non-atomic:
- for(Document subDoc : docsList) {
- nrt.addDocument(subDoc);
- }
- */
- }
- doc.removeField("packID");
-
- if (random.nextInt(5) == 2) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
- }
- toDeleteSubDocs.add(subDocs);
- }
-
- // randomly verify the add/update "took":
- if (random.nextInt(20) == 2) {
- final boolean applyDeletes = delSubDocs != null;
- final IndexSearcher s = nrt.get(gen, applyDeletes);
- try {
- assertEquals(docsList.size(), s.search(new TermQuery(new Term("packID", packID)), 10).totalHits);
- } finally {
- nrt.release(s);
- }
- }
-
- } else {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": add doc docid:" + doc.get("docid"));
- }
-
- gen = nrt.addDocument(doc);
- addCount.getAndIncrement();
-
- // randomly verify the add "took":
- if (random.nextInt(20) == 2) {
- //System.out.println(Thread.currentThread().getName() + ": verify");
- final IndexSearcher s = nrt.get(gen, false);
- //System.out.println(Thread.currentThread().getName() + ": got s=" + s);
- try {
- assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
- } finally {
- nrt.release(s);
- }
- //System.out.println(Thread.currentThread().getName() + ": done verify");
- }
-
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
- }
- toDeleteIDs.add(doc.get("docid"));
- }
- }
- } else {
- // we use update but it never replaces a
- // prior doc
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
- }
- gen = nrt.updateDocument(new Term("docid", doc.get("docid")), doc);
- addCount.getAndIncrement();
-
- // randomly verify the add "took":
- if (random.nextInt(20) == 2) {
- final IndexSearcher s = nrt.get(gen, true);
- try {
- assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
- } finally {
- nrt.release(s);
- }
- }
-
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
- }
- toDeleteIDs.add(doc.get("docid"));
- }
- }
-
- if (random.nextInt(30) == 17) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
- }
- for(String id : toDeleteIDs) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
- }
- gen = nrt.deleteDocuments(new Term("docid", id));
-
- // randomly verify the delete "took":
- if (random.nextInt(20) == 7) {
- final IndexSearcher s = nrt.get(gen, true);
- try {
- assertEquals(0, s.search(new TermQuery(new Term("docid", id)), 10).totalHits);
- } finally {
- nrt.release(s);
- }
- }
- }
-
- final int count = delCount.addAndGet(toDeleteIDs.size());
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
- }
- delIDs.addAll(toDeleteIDs);
- toDeleteIDs.clear();
-
- for(SubDocs subDocs : toDeleteSubDocs) {
- assertTrue(!subDocs.deleted);
- gen = nrt.deleteDocuments(new Term("packID", subDocs.packID));
- subDocs.deleted = true;
- if (VERBOSE) {
- System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
- }
- delIDs.addAll(subDocs.subIDs);
- delCount.addAndGet(subDocs.subIDs.size());
-
- // randomly verify the delete "took":
- if (random.nextInt(20) == 7) {
- final IndexSearcher s = nrt.get(gen, true);
- try {
- assertEquals(0, s.search(new TermQuery(new Term("packID", subDocs.packID)), 1).totalHits);
- } finally {
- nrt.release(s);
- }
- }
- }
- toDeleteSubDocs.clear();
- }
- if (addedField != null) {
- doc.removeField(addedField);
- }
- } catch (Throwable t) {
- System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
- t.printStackTrace();
- failed.set(true);
- throw new RuntimeException(t);
- }
- }
-
- lastGens.add(gen);
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": indexing done");
- }
- }
- };
- threads[thread].setDaemon(true);
- threads[thread].start();
- }
-
- if (VERBOSE) {
- System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
- }
-
- // let index build up a bit
- Thread.sleep(100);
-
- // silly starting guess:
- final AtomicInteger totTermCount = new AtomicInteger(100);
-
- // run search threads
- final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
- final AtomicInteger totHits = new AtomicInteger();
-
- if (VERBOSE) {
- System.out.println("TEST: start search threads");
- }
-
- for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
- searchThreads[thread] = new Thread() {
- @Override
- public void run() {
- while(System.currentTimeMillis() < stopTime && !failed.get()) {
- final IndexSearcher s = nrt.get(random.nextBoolean());
- try {
- try {
- smokeTestSearcher(s);
- if (s.getIndexReader().numDocs() > 0) {
-
- TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
- int seenTermCount = 0;
- int shift;
- int trigger;
- if (totTermCount.get() < 10) {
- shift = 0;
- trigger = 1;
- } else {
- trigger = totTermCount.get()/10;
- shift = random.nextInt(trigger);
- }
-
- while(System.currentTimeMillis() < stopTime) {
- Term term = termEnum.term();
- if (term == null) {
- if (seenTermCount == 0) {
- break;
- }
- totTermCount.set(seenTermCount);
- seenTermCount = 0;
- if (totTermCount.get() < 10) {
- shift = 0;
- trigger = 1;
- } else {
- trigger = totTermCount.get()/10;
- //System.out.println("trigger " + trigger);
- shift = random.nextInt(trigger);
- }
- termEnum = s.getIndexReader().terms(new Term("body", ""));
- continue;
- }
- seenTermCount++;
- // search 10 terms
- if (trigger == 0) {
- trigger = 1;
- }
- if ((seenTermCount + shift) % trigger == 0) {
- //if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
- //}
- totHits.addAndGet(runQuery(s, new TermQuery(term)));
- }
- termEnum.next();
- }
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": search done");
- }
- }
- } finally {
- nrt.release(s);
- }
- } catch (Throwable t) {
- System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
- failed.set(true);
- t.printStackTrace(System.out);
- throw new RuntimeException(t);
- }
- }
- }
- };
- searchThreads[thread].setDaemon(true);
- searchThreads[thread].start();
- }
-
- if (VERBOSE) {
- System.out.println("TEST: now join");
- }
- for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
- threads[thread].join();
- }
- for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
- searchThreads[thread].join();
- }
-
- if (VERBOSE) {
- System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
- System.out.println("TEST: search totHits=" + totHits);
- }
-
- long maxGen = 0;
- for(long gen : lastGens) {
- maxGen = Math.max(maxGen, gen);
- }
-
- final IndexSearcher s = nrt.get(maxGen, true);
-
- boolean doFail = false;
- for(String id : delIDs) {
- final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
- if (hits.totalHits != 0) {
- System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
- doFail = true;
- }
- }
-
- // Make sure each group of sub-docs are still in docID order:
- for(SubDocs subDocs : allSubDocs) {
- if (!subDocs.deleted) {
- // We sort by relevance but the scores should be identical so sort falls back to by docID:
- TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
- assertEquals(subDocs.subIDs.size(), hits.totalHits);
- int lastDocID = -1;
- int startDocID = -1;
- for(ScoreDoc scoreDoc : hits.scoreDocs) {
- final int docID = scoreDoc.doc;
- if (lastDocID != -1) {
- assertEquals(1+lastDocID, docID);
- } else {
- startDocID = docID;
- }
- lastDocID = docID;
- final Document doc = s.doc(docID);
- assertEquals(subDocs.packID, doc.get("packID"));
- }
-
- lastDocID = startDocID - 1;
- for(String subID : subDocs.subIDs) {
- hits = s.search(new TermQuery(new Term("docid", subID)), 1);
- assertEquals(1, hits.totalHits);
- final int docID = hits.scoreDocs[0].doc;
- if (lastDocID != -1) {
- assertEquals(1+lastDocID, docID);
- }
- lastDocID = docID;
- }
- } else {
- for(String subID : subDocs.subIDs) {
- assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
- }
- }
- }
-
- final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
- for(int id=0;id<endID;id++) {
- String stringID = ""+id;
- if (!delIDs.contains(stringID)) {
- final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
- if (hits.totalHits != 1) {
- System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
- doFail = true;
- }
- }
- }
- assertFalse(doFail);
-
- assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
+ @Override
+ protected void releaseSearcher(IndexSearcher s) throws Exception {
nrt.release(s);
+ }
- if (es != null) {
- es.shutdown();
- es.awaitTermination(1, TimeUnit.SECONDS);
- }
-
- writer.commit();
- assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
-
+ @Override
+ protected void doClose() throws Exception {
+ assertTrue(warmCalled);
if (VERBOSE) {
System.out.println("TEST: now close NRTManager");
}
nrtThread.close();
nrt.close();
- assertFalse(writer.anyNonBulkMerges);
- writer.close(false);
- _TestUtil.checkIndex(dir);
- dir.close();
- _TestUtil.rmDir(tempDir);
- docs.close();
-
- if (VERBOSE) {
- System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
- }
- }
-
- private int runQuery(IndexSearcher s, Query q) throws Exception {
- s.search(q, 10);
- return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
- }
-
- private void smokeTestSearcher(IndexSearcher s) throws Exception {
- runQuery(s, new TermQuery(new Term("body", "united")));
- runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
- PhraseQuery pq = new PhraseQuery();
- pq.add(new Term("body", "united"));
- pq.add(new Term("body", "states"));
- runQuery(s, pq);
}
}
diff --git a/lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java b/lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java
new file mode 100644
index 0000000..e293314
--- /dev/null
+++ b/lucene/contrib/misc/src/test/org/apache/lucene/search/TestSearcherManager.java
@@ -0,0 +1,113 @@
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
+
+ boolean warmCalled;
+
+ public void testSearcherManager() throws Exception {
+ runTest("TestSearcherManager");
+ }
+
+ @Override
+ protected IndexSearcher getFinalSearcher() throws Exception {
+ writer.commit();
+ mgr.maybeReopen();
+ return mgr.get();
+ }
+
+ private SearcherManager mgr;
+
+ @Override
+ protected void doAfterWriter(ExecutorService es) throws Exception {
+ // SearcherManager needs to see empty commit:
+ writer.commit();
+ mgr = new SearcherManager(dir,
+ new SearcherWarmer() {
+ @Override
+ public void warm(IndexSearcher s) throws IOException {
+ TestSearcherManager.this.warmCalled = true;
+ s.search(new TermQuery(new Term("body", "united")), 10);
+ }
+ }, es);
+ }
+
+ @Override
+ protected void doSearching(ExecutorService es, final long stopTime) throws Exception {
+
+ Thread reopenThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while(System.currentTimeMillis() < stopTime) {
+ Thread.sleep(_TestUtil.nextInt(random, 1, 100));
+ writer.commit();
+ Thread.sleep(_TestUtil.nextInt(random, 1, 5));
+ mgr.maybeReopen();
+ }
+ } catch (Throwable t) {
+ System.out.println("TEST: reopen thread hit exc");
+ t.printStackTrace(System.out);
+ failed.set(true);
+ throw new RuntimeException(t);
+ }
+ }
+ };
+ reopenThread.setDaemon(true);
+ reopenThread.start();
+
+ runSearchThreads(stopTime);
+
+ reopenThread.join();
+ }
+
+ @Override
+ protected IndexSearcher getCurrentSearcher() throws Exception {
+ if (random.nextInt(10) == 7) {
+ // NOTE: not best practice to call maybeReopen
+ // synchronous to your search threads, but still we
+ // test as apps will presumably do this for
+ // simplicity:
+ mgr.maybeReopen();
+ }
+
+ return mgr.get();
+ }
+
+ @Override
+ protected void releaseSearcher(IndexSearcher s) throws Exception {
+ mgr.release(s);
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ assertTrue(warmCalled);
+ if (VERBOSE) {
+ System.out.println("TEST: now close SearcherManager");
+ }
+ mgr.close();
+ }
+}
diff --git a/lucene/src/test-framework/org/apache/lucene/index/ThreadedIndexingAndSearchingTestCase.java b/lucene/src/test-framework/org/apache/lucene/index/ThreadedIndexingAndSearchingTestCase.java
new file mode 100644
index 0000000..df09adf
--- /dev/null
+++ b/lucene/src/test-framework/org/apache/lucene/index/ThreadedIndexingAndSearchingTestCase.java
@@ -0,0 +1,635 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.PhraseQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util._TestUtil;
+
+// TODO
+// - mix in optimize, addIndexes
+// - randomoly mix in non-congruent docs
+
+/** Utility class that spawns multiple indexing and
+ * searching threads. */
+public abstract class ThreadedIndexingAndSearchingTestCase extends LuceneTestCase {
+
+ protected final AtomicBoolean failed = new AtomicBoolean();
+ protected final AtomicInteger addCount = new AtomicInteger();
+ protected final AtomicInteger delCount = new AtomicInteger();
+ protected final AtomicInteger packCount = new AtomicInteger();
+
+ protected Directory dir;
+ protected IndexWriter writer;
+
+ private static class SubDocs {
+ public final String packID;
+ public final List<String> subIDs;
+ public boolean deleted;
+
+ public SubDocs(String packID, List<String> subIDs) {
+ this.packID = packID;
+ this.subIDs = subIDs;
+ }
+ }
+
+ // Called per-search
+ protected abstract IndexSearcher getCurrentSearcher() throws Exception;
+
+ protected abstract IndexSearcher getFinalSearcher() throws Exception;
+
+ protected void releaseSearcher(IndexSearcher s) throws Exception {
+ }
+
+ // Called once to run searching
+ protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception;
+
+ protected Directory getDirectory(Directory in) {
+ return in;
+ }
+
+ protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
+ writer.updateDocuments(id, docs);
+ }
+
+ protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
+ writer.addDocuments(docs);
+ }
+
+ protected void addDocument(Term id, Document doc) throws Exception {
+ writer.addDocument(doc);
+ }
+
+ protected void updateDocument(Term term, Document doc) throws Exception {
+ writer.updateDocument(term, doc);
+ }
+
+ protected void deleteDocuments(Term term) throws Exception {
+ writer.deleteDocuments(term);
+ }
+
+ protected void doAfterIndexingThreadDone() {
+ }
+
+ private Thread[] launchIndexingThreads(final LineFileDocs docs,
+ int numThreads,
+ final long stopTime,
+ final Set<String> delIDs,
+ final Set<String> delPackIDs,
+ final List<SubDocs> allSubDocs)
+ throws Exception {
+ final Thread[] threads = new Thread[numThreads];
+ for(int thread=0;thread<numThreads;thread++) {
+ threads[thread] = new Thread() {
+ @Override
+ public void run() {
+ // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
+ final List<String> toDeleteIDs = new ArrayList<String>();
+ final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
+ while(System.currentTimeMillis() < stopTime && !failed.get()) {
+ try {
+
+ // Occasional longish pause if running
+ // nightly
+ if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": now long sleep");
+ }
+ Thread.sleep(_TestUtil.nextInt(random, 50, 500));
+ }
+
+ // Rate limit ingest rate:
+ if (random.nextInt(7) == 5) {
+ Thread.sleep(_TestUtil.nextInt(random, 1, 10));
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": done sleep");
+ }
+ }
+
+ Document doc = docs.nextDoc();
+ if (doc == null) {
+ break;
+ }
+
+ // Maybe add randomly named field
+ final String addedField;
+ if (random.nextBoolean()) {
+ addedField = "extra" + random.nextInt(40);
+ doc.add(newField(addedField, "a random field", Field.Store.YES, Field.Index.ANALYZED));
+ } else {
+ addedField = null;
+ }
+
+ if (random.nextBoolean()) {
+
+ if (random.nextBoolean()) {
+ // Add/update doc block:
+ final String packID;
+ final SubDocs delSubDocs;
+ if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
+ delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
+ assert !delSubDocs.deleted;
+ toDeleteSubDocs.remove(delSubDocs);
+ // Update doc block, replacing prior packID
+ packID = delSubDocs.packID;
+ } else {
+ delSubDocs = null;
+ // Add doc block, using new packID
+ packID = packCount.getAndIncrement() + "";
+ }
+
+ final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
+ final List<String> docIDs = new ArrayList<String>();
+ final SubDocs subDocs = new SubDocs(packID, docIDs);
+ final List<Document> docsList = new ArrayList<Document>();
+
+ allSubDocs.add(subDocs);
+ doc.add(packIDField);
+ docsList.add(_TestUtil.cloneDocument(doc));
+ docIDs.add(doc.get("docid"));
+
+ final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
+ while(docsList.size() < maxDocCount) {
+ doc = docs.nextDoc();
+ if (doc == null) {
+ break;
+ }
+ docsList.add(_TestUtil.cloneDocument(doc));
+ docIDs.add(doc.get("docid"));
+ }
+ addCount.addAndGet(docsList.size());
+
+ final Term packIDTerm = new Term("packID", packID);
+
+ if (delSubDocs != null) {
+ delSubDocs.deleted = true;
+ delIDs.addAll(delSubDocs.subIDs);
+ delCount.addAndGet(delSubDocs.subIDs.size());
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ updateDocuments(packIDTerm, docsList);
+ } else {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ addDocuments(packIDTerm, docsList);
+ }
+ doc.removeField("packID");
+
+ if (random.nextInt(5) == 2) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
+ }
+ toDeleteSubDocs.add(subDocs);
+ }
+
+ } else {
+ // Add single doc
+ final String docid = doc.get("docid");
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid);
+ }
+ addDocument(new Term("docid", docid), doc);
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(docid);
+ }
+ }
+ } else {
+
+ // Update single doc, but we never re-use
+ // and ID so the delete will never
+ // actually happen:
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
+ }
+ final String docid = doc.get("docid");
+ updateDocument(new Term("docid", docid), doc);
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(docid);
+ }
+ }
+
+ if (random.nextInt(30) == 17) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
+ }
+ for(String id : toDeleteIDs) {
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
+ }
+ deleteDocuments(new Term("docid", id));
+ }
+ final int count = delCount.addAndGet(toDeleteIDs.size());
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
+ }
+ delIDs.addAll(toDeleteIDs);
+ toDeleteIDs.clear();
+
+ for(SubDocs subDocs : toDeleteSubDocs) {
+ assert !subDocs.deleted;
+ delPackIDs.add(subDocs.packID);
+ deleteDocuments(new Term("packID", subDocs.packID));
+ subDocs.deleted = true;
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
+ }
+ delIDs.addAll(subDocs.subIDs);
+ delCount.addAndGet(subDocs.subIDs.size());
+ }
+ toDeleteSubDocs.clear();
+ }
+ if (addedField != null) {
+ doc.removeField(addedField);
+ }
+ } catch (Throwable t) {
+ System.out.println(Thread.currentThread().getName() + ": hit exc");
+ t.printStackTrace();
+ failed.set(true);
+ throw new RuntimeException(t);
+ }
+ }
+ if (VERBOSE) {
+ System.out.println(Thread.currentThread().getName() + ": indexing done");
+ }
+
+ doAfterIndexingThreadDone();
+ }
+ };
+ threads[thread].setDaemon(true);
+ threads[thread].start();
+ }
+
+ return threads;
+ }
+
+ protected void runSearchThreads(final long stopTimeMS) throws Exception {
+ final int numThreads = _TestUtil.nextInt(random, 1, 5);
+ final Thread[] searchThreads = new Thread[numThreads];
+ final AtomicInteger totHits = new AtomicInteger();
+
+ // silly starting guess:
+ final AtomicInteger totTermCount = new AtomicInteger(100);
+
+ // TODO: we should enrich this to do more interesting searches
+ for(int thread=0;thread<searchThreads.length;thread++) {
+ searchThreads[thread] = new Thread() {
+ @Override
+ public void run() {
+ while (System.currentTimeMillis() < stopTimeMS) {
+ try {
+ final IndexSearcher s = getCurrentSearcher();
+ try {
+ if (s.getIndexReader().numDocs() > 0) {
+ smokeTestSearcher(s);
+ TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
+ int seenTermCount = 0;
+ int shift;
+ int trigger;
+ if (totTermCount.get() < 10) {
+ shift = 0;
+ trigger = 1;
+ } else {
+ trigger = totTermCount.get()/10;
+ shift = random.nextInt(trigger);
+ }
+ while(System.currentTimeMillis() < stopTimeMS) {
+ Term term = termEnum.term();
+ if (term == null) {
+ if (seenTermCount == 0) {
+ break;
+ }
+ totTermCount.set(seenTermCount);
+ seenTermCount = 0;
+ if (totTermCount.get() < 10) {
+ shift = 0;
+ trigger = 1;
+ } else {
+ trigger = totTermCount.get()/10;
+ //System.out.println("trigger " + trigger);
+ shift = random.nextInt(trigger);
+ }
+ termEnum = s.getIndexReader().terms(new Term("body", ""));
+ continue;
+ }
+ seenTermCount++;
+ // search 10 terms
+ if (trigger == 0) {
+ trigger = 1;
+ }
+ if ((seenTermCount + shift) % trigger == 0) {
+ //if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
+ //}
+ totHits.addAndGet(runQuery(s, new TermQuery(term)));
+ }
+ }
+ //if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": search done");
+ //}
+ }
+ } finally {
+ releaseSearcher(s);
+ }
+ } catch (Throwable t) {
+ System.out.println(Thread.currentThread().getName() + ": hit exc");
+ failed.set(true);
+ t.printStackTrace(System.out);
+ throw new RuntimeException(t);
+ }
+ }
+ }
+ };
+ searchThreads[thread].setDaemon(true);
+ searchThreads[thread].start();
+ }
+
+ for(int thread=0;thread<searchThreads.length;thread++) {
+ searchThreads[thread].join();
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: DONE search: totHits=" + totHits);
+ }
+ }
+
+ protected void doAfterWriter(ExecutorService es) throws Exception {
+ }
+
+ protected void doClose() throws Exception {
+ }
+
+ public void runTest(String testName) throws Exception {
+
+ failed.set(false);
+ addCount.set(0);
+ delCount.set(0);
+ packCount.set(0);
+
+ final long t0 = System.currentTimeMillis();
+
+ final LineFileDocs docs = new LineFileDocs(random);
+ final File tempDir = _TestUtil.getTempDir(testName);
+ dir = newFSDirectory(tempDir);
+ ((MockDirectoryWrapper) dir).setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
+ final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
+
+ if (LuceneTestCase.TEST_NIGHTLY) {
+ // newIWConfig makes smallish max seg size, which
+ // results in tons and tons of segments for this test
+ // when run nightly:
+ MergePolicy mp = conf.getMergePolicy();
+ if (mp instanceof TieredMergePolicy) {
+ ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
+ } else if (mp instanceof LogByteSizeMergePolicy) {
+ ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
+ } else if (mp instanceof LogMergePolicy) {
+ ((LogMergePolicy) mp).setMaxMergeDocs(100000);
+ }
+ }
+
+ conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
+ @Override
+ public void warm(IndexReader reader) throws IOException {
+ if (VERBOSE) {
+ System.out.println("TEST: now warm merged reader=" + reader);
+ }
+ final int maxDoc = reader.maxDoc();
+ int sum = 0;
+ final int inc = Math.max(1, maxDoc/50);
+ for(int docID=0;docID<maxDoc;docID += inc) {
+ if (!reader.isDeleted(docID)) {
+ final Document doc = reader.document(docID);
+ sum += doc.getFields().size();
+ }
+ }
+
+ IndexSearcher searcher = newSearcher(reader);
+ sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
+ searcher.close();
+
+ if (VERBOSE) {
+ System.out.println("TEST: warm visited " + sum + " fields");
+ }
+ }
+ });
+
+ writer = new IndexWriter(dir, conf);
+ if (VERBOSE) {
+ writer.setInfoStream(System.out);
+ }
+ _TestUtil.reduceOpenFiles(writer);
+
+ final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory(testName));
+
+ doAfterWriter(es);
+
+ final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 2, 4);
+
+ final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
+
+ final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
+ final Set<String> delPackIDs = Collections.synchronizedSet(new HashSet<String>());
+ final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
+
+ final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
+
+ final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs);
+
+ if (VERBOSE) {
+ System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
+ }
+
+ // Let index build up a bit
+ Thread.sleep(100);
+
+ doSearching(es, stopTime);
+
+ if (VERBOSE) {
+ System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
+ }
+
+ for(int thread=0;thread<indexThreads.length;thread++) {
+ indexThreads[thread].join();
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: done join indexing threads [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
+ }
+
+ final IndexSearcher s = getFinalSearcher();
+ if (VERBOSE) {
+ System.out.println("TEST: finalSearcher=" + s);
+ }
+ boolean doFail = false;
+
+ // Verify: make sure delIDs are in fact deleted:
+ for(String id : delIDs) {
+ final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
+ if (hits.totalHits != 0) {
+ System.out.println("doc id=" + id + " is supposed to be deleted, but got " + hits.totalHits + " hits; first docID=" + hits.scoreDocs[0].doc);
+ doFail = true;
+ }
+ }
+
+ // Verify: make sure delPackIDs are in fact deleted:
+ for(String id : delPackIDs) {
+ final TopDocs hits = s.search(new TermQuery(new Term("packID", id)), 1);
+ if (hits.totalHits != 0) {
+ System.out.println("packID=" + id + " is supposed to be deleted, but got " + hits.totalHits + " matches");
+ doFail = true;
+ }
+ }
+
+ // Verify: make sure each group of sub-docs are still in docID order:
+ for(SubDocs subDocs : allSubDocs) {
+ TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
+ if (!subDocs.deleted) {
+ // We sort by relevance but the scores should be identical so sort falls back to by docID:
+ if (hits.totalHits != subDocs.subIDs.size()) {
+ System.out.println("packID=" + subDocs.packID + ": expected " + subDocs.subIDs.size() + " hits but got " + hits.totalHits);
+ doFail = true;
+ } else {
+ int lastDocID = -1;
+ int startDocID = -1;
+ for(ScoreDoc scoreDoc : hits.scoreDocs) {
+ final int docID = scoreDoc.doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ } else {
+ startDocID = docID;
+ }
+ lastDocID = docID;
+ final Document doc = s.doc(docID);
+ assertEquals(subDocs.packID, doc.get("packID"));
+ }
+
+ lastDocID = startDocID - 1;
+ for(String subID : subDocs.subIDs) {
+ hits = s.search(new TermQuery(new Term("docid", subID)), 1);
+ assertEquals(1, hits.totalHits);
+ final int docID = hits.scoreDocs[0].doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ }
+ lastDocID = docID;
+ }
+ }
+ } else {
+ // Pack was deleted -- make sure its docs are
+ // deleted. We can't verify packID is deleted
+ // because we can re-use packID for update:
+ for(String subID : subDocs.subIDs) {
+ assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
+ }
+ }
+ }
+
+ // Verify: make sure all not-deleted docs are in fact
+ // not deleted:
+ final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
+ docs.close();
+
+ for(int id=0;id<endID;id++) {
+ String stringID = ""+id;
+ if (!delIDs.contains(stringID)) {
+ final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
+ if (hits.totalHits != 1) {
+ System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
+ doFail = true;
+ }
+ }
+ }
+ assertFalse(doFail);
+
+ assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
+ releaseSearcher(s);
+
+ if (es != null) {
+ es.shutdown();
+ es.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ writer.commit();
+ assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
+
+ assertFalse(writer.anyNonBulkMerges);
+ doClose();
+ writer.close(false);
+ _TestUtil.checkIndex(dir);
+ dir.close();
+ _TestUtil.rmDir(tempDir);
+
+ if (VERBOSE) {
+ System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
+ }
+ }
+
+ private int runQuery(IndexSearcher s, Query q) throws Exception {
+ s.search(q, 10);
+ return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
+ }
+
+ protected void smokeTestSearcher(IndexSearcher s) throws Exception {
+ runQuery(s, new TermQuery(new Term("body", "united")));
+ runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
+ PhraseQuery pq = new PhraseQuery();
+ pq.add(new Term("body", "united"));
+ pq.add(new Term("body", "states"));
+ runQuery(s, pq);
+ }
+}
diff --git a/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
index 6138622..264ec22 100644
--- a/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -1938,4 +1938,29 @@
w.close();
d.close();
}
+
+ public void testNRTReaderVersion() throws Exception {
+ Directory d = new MockDirectoryWrapper(random, new RAMDirectory());
+ IndexWriter w = new IndexWriter(d, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+ Document doc = new Document();
+ doc.add(newField("id", "0", Field.Store.YES, Field.Index.ANALYZED));
+ w.addDocument(doc);
+ IndexReader r = w.getReader();
+ long version = r.getVersion();
+ r.close();
+
+ w.addDocument(doc);
+ r = w.getReader();
+ long version2 = r.getVersion();
+ r.close();
+ assert(version2 > version);
+
+ w.deleteDocuments(new Term("id", "0"));
+ r = w.getReader();
+ w.close();
+ long version3 = r.getVersion();
+ r.close();
+ assert(version3 > version2);
+ d.close();
+ }
}
diff --git a/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java b/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
index 5f489b7..975156f 100644
--- a/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
+++ b/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
@@ -17,301 +17,26 @@
* limitations under the License.
*/
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.lucene.analysis.MockAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.PhraseQuery;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.util.LineFileDocs;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util._TestUtil;
-import org.junit.Test;
// TODO
// - mix in optimize, addIndexes
// - randomoly mix in non-congruent docs
-public class TestNRTThreads extends LuceneTestCase {
+public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase {
+
+ @Override
+ protected void doSearching(ExecutorService es, long stopTime) throws Exception {
- private static class SubDocs {
- public final String packID;
- public final List<String> subIDs;
- public boolean deleted;
-
- public SubDocs(String packID, List<String> subIDs) {
- this.packID = packID;
- this.subIDs = subIDs;
- }
- }
-
- @Test
- public void testNRTThreads() throws Exception {
-
- final long t0 = System.currentTimeMillis();
-
- final LineFileDocs docs = new LineFileDocs(random);
- final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
- final MockDirectoryWrapper dir = newFSDirectory(tempDir);
- dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
- final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
- conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
- @Override
- public void warm(IndexReader reader) throws IOException {
- if (VERBOSE) {
- System.out.println("TEST: now warm merged reader=" + reader);
- }
- final int maxDoc = reader.maxDoc();
- int sum = 0;
- final int inc = Math.max(1, maxDoc/50);
- for(int docID=0;docID<maxDoc;docID += inc) {
- if (reader.isDeleted(docID)) {
- final Document doc = reader.document(docID);
- sum += doc.getFields().size();
- }
- }
-
- IndexSearcher searcher = newSearcher(reader);
- sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
- searcher.close();
-
- if (VERBOSE) {
- System.out.println("TEST: warm visited " + sum + " fields");
- }
- }
- });
-
- final IndexWriter writer = new IndexWriter(dir, conf);
- if (VERBOSE) {
- writer.setInfoStream(System.out);
- }
- _TestUtil.reduceOpenFiles(writer);
-
- final int NUM_INDEX_THREADS = 2;
- final int NUM_SEARCH_THREADS = 3;
-
- final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
-
- final AtomicBoolean failed = new AtomicBoolean();
- final AtomicInteger addCount = new AtomicInteger();
- final AtomicInteger delCount = new AtomicInteger();
- final AtomicInteger packCount = new AtomicInteger();
-
- final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
- final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
-
- final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
- Thread[] threads = new Thread[NUM_INDEX_THREADS];
- for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
- threads[thread] = new Thread() {
- @Override
- public void run() {
- // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
- final List<String> toDeleteIDs = new ArrayList<String>();
- final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
- while(System.currentTimeMillis() < stopTime && !failed.get()) {
- try {
- Document doc = docs.nextDoc();
- if (doc == null) {
- break;
- }
- final String addedField;
- if (random.nextBoolean()) {
- addedField = "extra" + random.nextInt(10);
- doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
- } else {
- addedField = null;
- }
- if (random.nextBoolean()) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
- }
-
- if (random.nextBoolean()) {
- // Add a pack of adjacent sub-docs
- final String packID;
- final SubDocs delSubDocs;
- if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
- delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
- assert !delSubDocs.deleted;
- toDeleteSubDocs.remove(delSubDocs);
- // reuse prior packID
- packID = delSubDocs.packID;
- } else {
- delSubDocs = null;
- // make new packID
- packID = packCount.getAndIncrement() + "";
- }
-
- final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
- final List<String> docIDs = new ArrayList<String>();
- final SubDocs subDocs = new SubDocs(packID, docIDs);
- final List<Document> docsList = new ArrayList<Document>();
-
- allSubDocs.add(subDocs);
- doc.add(packIDField);
- docsList.add(_TestUtil.cloneDocument(doc));
- docIDs.add(doc.get("docid"));
-
- final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
- while(docsList.size() < maxDocCount) {
- doc = docs.nextDoc();
- if (doc == null) {
- break;
- }
- docsList.add(_TestUtil.cloneDocument(doc));
- docIDs.add(doc.get("docid"));
- }
- addCount.addAndGet(docsList.size());
-
- if (delSubDocs != null) {
- delSubDocs.deleted = true;
- delIDs.addAll(delSubDocs.subIDs);
- delCount.addAndGet(delSubDocs.subIDs.size());
- if (VERBOSE) {
- System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
- }
- writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
- /*
- // non-atomic:
- writer.deleteDocuments(new Term("packID", delSubDocs.packID));
- for(Document subDoc : docsList) {
- writer.addDocument(subDoc);
- }
- */
- } else {
- if (VERBOSE) {
- System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
- }
- writer.addDocuments(docsList);
-
- /*
- // non-atomic:
- for(Document subDoc : docsList) {
- writer.addDocument(subDoc);
- }
- */
- }
- doc.removeField("packID");
-
- if (random.nextInt(5) == 2) {
- if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
- }
- toDeleteSubDocs.add(subDocs);
- }
-
- } else {
- writer.addDocument(doc);
- addCount.getAndIncrement();
-
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
- }
- toDeleteIDs.add(doc.get("docid"));
- }
- }
- } else {
- // we use update but it never replaces a
- // prior doc
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
- }
- writer.updateDocument(new Term("docid", doc.get("docid")), doc);
- addCount.getAndIncrement();
-
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
- }
- toDeleteIDs.add(doc.get("docid"));
- }
- }
-
- if (random.nextInt(30) == 17) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
- }
- for(String id : toDeleteIDs) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
- }
- writer.deleteDocuments(new Term("docid", id));
- }
- final int count = delCount.addAndGet(toDeleteIDs.size());
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
- }
- delIDs.addAll(toDeleteIDs);
- toDeleteIDs.clear();
-
- for(SubDocs subDocs : toDeleteSubDocs) {
- assert !subDocs.deleted;
- writer.deleteDocuments(new Term("packID", subDocs.packID));
- subDocs.deleted = true;
- if (VERBOSE) {
- System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
- }
- delIDs.addAll(subDocs.subIDs);
- delCount.addAndGet(subDocs.subIDs.size());
- }
- toDeleteSubDocs.clear();
- }
- if (addedField != null) {
- doc.removeField(addedField);
- }
- } catch (Throwable t) {
- System.out.println(Thread.currentThread().getName() + ": hit exc");
- t.printStackTrace();
- failed.set(true);
- throw new RuntimeException(t);
- }
- }
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": indexing done");
- }
- }
- };
- threads[thread].setDaemon(true);
- threads[thread].start();
- }
-
- if (VERBOSE) {
- System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
- }
-
- // let index build up a bit
- Thread.sleep(100);
+ boolean anyOpenDelFiles = false;
IndexReader r = IndexReader.open(writer, true);
- boolean any = false;
- // silly starting guess:
- final AtomicInteger totTermCount = new AtomicInteger(100);
-
- final ExecutorService es = Executors.newCachedThreadPool();
-
- while(System.currentTimeMillis() < stopTime && !failed.get()) {
+ while (System.currentTimeMillis() < stopTime && !failed.get()) {
if (random.nextBoolean()) {
if (VERBOSE) {
System.out.println("TEST: now reopen r=" + r);
@@ -327,11 +52,11 @@
}
r.close();
writer.commit();
- final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
+ final Set<String> openDeletedFiles = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
if (openDeletedFiles.size() > 0) {
System.out.println("OBD files: " + openDeletedFiles);
}
- any |= openDeletedFiles.size() > 0;
+ anyOpenDelFiles |= openDeletedFiles.size() > 0;
//assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
if (VERBOSE) {
System.out.println("TEST: now open");
@@ -344,204 +69,52 @@
//System.out.println("numDocs=" + r.numDocs() + "
//openDelFileCount=" + dir.openDeleteFileCount());
- smokeTestReader(r);
-
if (r.numDocs() > 0) {
-
- final IndexSearcher s = new IndexSearcher(r, es);
-
- // run search threads
- final long searchStopTime = System.currentTimeMillis() + 500;
- final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
- final AtomicInteger totHits = new AtomicInteger();
- for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
- searchThreads[thread] = new Thread() {
- @Override
- public void run() {
- try {
- TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
- int seenTermCount = 0;
- int shift;
- int trigger;
- if (totTermCount.get() < 10) {
- shift = 0;
- trigger = 1;
- } else {
- trigger = totTermCount.get()/10;
- shift = random.nextInt(trigger);
- }
- while(System.currentTimeMillis() < searchStopTime) {
- Term term = termEnum.term();
- if (term == null) {
- if (seenTermCount < 10) {
- break;
- }
- totTermCount.set(seenTermCount);
- seenTermCount = 0;
- trigger = totTermCount.get()/10;
- //System.out.println("trigger " + trigger);
- shift = random.nextInt(trigger);
- termEnum = s.getIndexReader().terms(new Term("body", ""));
- continue;
- }
- seenTermCount++;
- // search 10 terms
- if (trigger == 0) {
- trigger = 1;
- }
- if ((seenTermCount + shift) % trigger == 0) {
- //if (VERBOSE) {
- //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
- //}
- totHits.addAndGet(runQuery(s, new TermQuery(term)));
- }
- termEnum.next();
- }
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": search done");
- }
- } catch (Throwable t) {
- System.out.println(Thread.currentThread().getName() + ": hit exc");
- failed.set(true);
- t.printStackTrace(System.out);
- throw new RuntimeException(t);
- }
- }
- };
- searchThreads[thread].setDaemon(true);
- searchThreads[thread].start();
- }
-
- for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
- searchThreads[thread].join();
- }
-
- if (VERBOSE) {
- System.out.println("TEST: DONE search: totHits=" + totHits);
- }
- } else {
- Thread.sleep(100);
+ fixedSearcher = new IndexSearcher(r, es);
+ smokeTestSearcher(fixedSearcher);
+ runSearchThreads(System.currentTimeMillis() + 500);
}
}
-
- es.shutdown();
- es.awaitTermination(1, TimeUnit.SECONDS);
-
- if (VERBOSE) {
- System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
- }
+ r.close();
//System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
- r.close();
- final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
+ final Set<String> openDeletedFiles = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
if (openDeletedFiles.size() > 0) {
System.out.println("OBD files: " + openDeletedFiles);
}
- any |= openDeletedFiles.size() > 0;
+ anyOpenDelFiles |= openDeletedFiles.size() > 0;
- assertFalse("saw non-zero open-but-deleted count", any);
- if (VERBOSE) {
- System.out.println("TEST: now join");
- }
- for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
- threads[thread].join();
- }
- if (VERBOSE) {
- System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
- }
+ assertFalse("saw non-zero open-but-deleted count", anyOpenDelFiles);
+ }
- final IndexReader r2 = writer.getReader();
- final IndexSearcher s = newSearcher(r2);
- boolean doFail = false;
- for(String id : delIDs) {
- final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
- if (hits.totalHits != 0) {
- System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
- doFail = true;
- }
- }
+ private IndexSearcher fixedSearcher;
- // Make sure each group of sub-docs are still in docID order:
- for(SubDocs subDocs : allSubDocs) {
- if (!subDocs.deleted) {
- // We sort by relevance but the scores should be identical so sort falls back to by docID:
- TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
- assertEquals(subDocs.subIDs.size(), hits.totalHits);
- int lastDocID = -1;
- int startDocID = -1;
- for(ScoreDoc scoreDoc : hits.scoreDocs) {
- final int docID = scoreDoc.doc;
- if (lastDocID != -1) {
- assertEquals(1+lastDocID, docID);
- } else {
- startDocID = docID;
- }
- lastDocID = docID;
- final Document doc = s.doc(docID);
- assertEquals(subDocs.packID, doc.get("packID"));
- }
+ protected IndexSearcher getCurrentSearcher() throws Exception {
+ return fixedSearcher;
+ }
- lastDocID = startDocID - 1;
- for(String subID : subDocs.subIDs) {
- hits = s.search(new TermQuery(new Term("docid", subID)), 1);
- assertEquals(1, hits.totalHits);
- final int docID = hits.scoreDocs[0].doc;
- if (lastDocID != -1) {
- assertEquals(1+lastDocID, docID);
- }
- lastDocID = docID;
- }
- } else {
- for(String subID : subDocs.subIDs) {
- assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
- }
- }
- }
-
- final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
- for(int id=0;id<endID;id++) {
- String stringID = ""+id;
- if (!delIDs.contains(stringID)) {
- final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
- if (hits.totalHits != 1) {
- System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
- doFail = true;
- }
- }
- }
- assertFalse(doFail);
-
- assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
- r2.close();
-
- writer.commit();
- assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
-
- assertFalse(writer.anyNonBulkMerges);
- writer.close(false);
- _TestUtil.checkIndex(dir);
- s.close();
- dir.close();
- _TestUtil.rmDir(tempDir);
- docs.close();
- if (VERBOSE) {
- System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
+ @Override
+ protected void releaseSearcher(IndexSearcher s) throws Exception {
+ if (s != fixedSearcher) {
+ // Final searcher:
+ s.getIndexReader().close();
+ s.close();
}
}
- private int runQuery(IndexSearcher s, Query q) throws Exception {
- s.search(q, 10);
- return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
+ @Override
+ protected IndexSearcher getFinalSearcher() throws Exception {
+ final IndexReader r2;
+ if (random.nextBoolean()) {
+ r2 = writer.getReader();
+ } else {
+ writer.commit();
+ r2 = IndexReader.open(dir);
+ }
+ return newSearcher(r2);
}
- private void smokeTestReader(IndexReader r) throws Exception {
- IndexSearcher s = newSearcher(r);
- runQuery(s, new TermQuery(new Term("body", "united")));
- runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
- PhraseQuery pq = new PhraseQuery();
- pq.add(new Term("body", "united"));
- pq.add(new Term("body", "states"));
- runQuery(s, pq);
- s.close();
+ public void testNRTThreads() throws Exception {
+ runTest("TestNRTThreads");
}
}