| package org.apache.lucene.index; |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.lucene.analysis.MockAnalyzer; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.PhraseQuery; |
| import org.apache.lucene.search.Query; |
| import org.apache.lucene.search.ScoreDoc; |
| import org.apache.lucene.search.Sort; |
| import org.apache.lucene.search.SortField; |
| import org.apache.lucene.search.TermQuery; |
| import org.apache.lucene.search.TopDocs; |
| import org.apache.lucene.store.MockDirectoryWrapper; |
| import org.apache.lucene.util.LineFileDocs; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util._TestUtil; |
| import org.junit.Test; |
| |
| // TODO |
| // - mix in optimize, addIndexes |
| // - randomoly mix in non-congruent docs |
| |
| public class TestNRTThreads extends LuceneTestCase { |
| |
| private static class SubDocs { |
| public final String packID; |
| public final List<String> subIDs; |
| public boolean deleted; |
| |
| public SubDocs(String packID, List<String> subIDs) { |
| this.packID = packID; |
| this.subIDs = subIDs; |
| } |
| } |
| |
| @Test |
| public void testNRTThreads() throws Exception { |
| |
| final long t0 = System.currentTimeMillis(); |
| |
| final LineFileDocs docs = new LineFileDocs(random); |
| final File tempDir = _TestUtil.getTempDir("nrtopenfiles"); |
| final MockDirectoryWrapper dir = newFSDirectory(tempDir); |
| dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves. |
| final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); |
| conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() { |
| @Override |
| public void warm(IndexReader reader) throws IOException { |
| if (VERBOSE) { |
| System.out.println("TEST: now warm merged reader=" + reader); |
| } |
| final int maxDoc = reader.maxDoc(); |
| int sum = 0; |
| final int inc = Math.max(1, maxDoc/50); |
| for(int docID=0;docID<maxDoc;docID += inc) { |
| if (reader.isDeleted(docID)) { |
| final Document doc = reader.document(docID); |
| sum += doc.getFields().size(); |
| } |
| } |
| |
| IndexSearcher searcher = newSearcher(reader); |
| sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits; |
| searcher.close(); |
| |
| if (VERBOSE) { |
| System.out.println("TEST: warm visited " + sum + " fields"); |
| } |
| } |
| }); |
| |
| final IndexWriter writer = new IndexWriter(dir, conf); |
| if (VERBOSE) { |
| writer.setInfoStream(System.out); |
| } |
| _TestUtil.reduceOpenFiles(writer); |
| |
| final int NUM_INDEX_THREADS = 2; |
| final int NUM_SEARCH_THREADS = 3; |
| |
| final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER; |
| |
| final AtomicBoolean failed = new AtomicBoolean(); |
| final AtomicInteger addCount = new AtomicInteger(); |
| final AtomicInteger delCount = new AtomicInteger(); |
| final AtomicInteger packCount = new AtomicInteger(); |
| |
| final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>()); |
| final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>()); |
| |
| final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000; |
| Thread[] threads = new Thread[NUM_INDEX_THREADS]; |
| for(int thread=0;thread<NUM_INDEX_THREADS;thread++) { |
| threads[thread] = new Thread() { |
| @Override |
| public void run() { |
| // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works: |
| final List<String> toDeleteIDs = new ArrayList<String>(); |
| final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>(); |
| while(System.currentTimeMillis() < stopTime && !failed.get()) { |
| try { |
| Document doc = docs.nextDoc(); |
| if (doc == null) { |
| break; |
| } |
| final String addedField; |
| if (random.nextBoolean()) { |
| addedField = "extra" + random.nextInt(10); |
| doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED)); |
| } else { |
| addedField = null; |
| } |
| if (random.nextBoolean()) { |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid")); |
| } |
| |
| if (random.nextBoolean()) { |
| // Add a pack of adjacent sub-docs |
| final String packID; |
| final SubDocs delSubDocs; |
| if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) { |
| delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size())); |
| assert !delSubDocs.deleted; |
| toDeleteSubDocs.remove(delSubDocs); |
| // reuse prior packID |
| packID = delSubDocs.packID; |
| } else { |
| delSubDocs = null; |
| // make new packID |
| packID = packCount.getAndIncrement() + ""; |
| } |
| |
| final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED); |
| final List<String> docIDs = new ArrayList<String>(); |
| final SubDocs subDocs = new SubDocs(packID, docIDs); |
| final List<Document> docsList = new ArrayList<Document>(); |
| |
| allSubDocs.add(subDocs); |
| doc.add(packIDField); |
| docsList.add(_TestUtil.cloneDocument(doc)); |
| docIDs.add(doc.get("docid")); |
| |
| final int maxDocCount = _TestUtil.nextInt(random, 1, 10); |
| while(docsList.size() < maxDocCount) { |
| doc = docs.nextDoc(); |
| if (doc == null) { |
| break; |
| } |
| docsList.add(_TestUtil.cloneDocument(doc)); |
| docIDs.add(doc.get("docid")); |
| } |
| addCount.addAndGet(docsList.size()); |
| |
| if (delSubDocs != null) { |
| delSubDocs.deleted = true; |
| delIDs.addAll(delSubDocs.subIDs); |
| delCount.addAndGet(delSubDocs.subIDs.size()); |
| if (VERBOSE) { |
| System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs); |
| } |
| writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList); |
| /* |
| // non-atomic: |
| writer.deleteDocuments(new Term("packID", delSubDocs.packID)); |
| for(Document subDoc : docsList) { |
| writer.addDocument(subDoc); |
| } |
| */ |
| } else { |
| if (VERBOSE) { |
| System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs); |
| } |
| writer.addDocuments(docsList); |
| |
| /* |
| // non-atomic: |
| for(Document subDoc : docsList) { |
| writer.addDocument(subDoc); |
| } |
| */ |
| } |
| doc.removeField("packID"); |
| |
| if (random.nextInt(5) == 2) { |
| if (VERBOSE) { |
| //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID); |
| } |
| toDeleteSubDocs.add(subDocs); |
| } |
| |
| } else { |
| writer.addDocument(doc); |
| addCount.getAndIncrement(); |
| |
| if (random.nextInt(5) == 3) { |
| if (VERBOSE) { |
| //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); |
| } |
| toDeleteIDs.add(doc.get("docid")); |
| } |
| } |
| } else { |
| // we use update but it never replaces a |
| // prior doc |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid")); |
| } |
| writer.updateDocument(new Term("docid", doc.get("docid")), doc); |
| addCount.getAndIncrement(); |
| |
| if (random.nextInt(5) == 3) { |
| if (VERBOSE) { |
| //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid")); |
| } |
| toDeleteIDs.add(doc.get("docid")); |
| } |
| } |
| |
| if (random.nextInt(30) == 17) { |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes"); |
| } |
| for(String id : toDeleteIDs) { |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": del term=id:" + id); |
| } |
| writer.deleteDocuments(new Term("docid", id)); |
| } |
| final int count = delCount.addAndGet(toDeleteIDs.size()); |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes"); |
| } |
| delIDs.addAll(toDeleteIDs); |
| toDeleteIDs.clear(); |
| |
| for(SubDocs subDocs : toDeleteSubDocs) { |
| assert !subDocs.deleted; |
| writer.deleteDocuments(new Term("packID", subDocs.packID)); |
| subDocs.deleted = true; |
| if (VERBOSE) { |
| System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID); |
| } |
| delIDs.addAll(subDocs.subIDs); |
| delCount.addAndGet(subDocs.subIDs.size()); |
| } |
| toDeleteSubDocs.clear(); |
| } |
| if (addedField != null) { |
| doc.removeField(addedField); |
| } |
| } catch (Throwable t) { |
| System.out.println(Thread.currentThread().getName() + ": hit exc"); |
| t.printStackTrace(); |
| failed.set(true); |
| throw new RuntimeException(t); |
| } |
| } |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": indexing done"); |
| } |
| } |
| }; |
| threads[thread].setDaemon(true); |
| threads[thread].start(); |
| } |
| |
| if (VERBOSE) { |
| System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]"); |
| } |
| |
| // let index build up a bit |
| Thread.sleep(100); |
| |
| IndexReader r = IndexReader.open(writer, true); |
| boolean any = false; |
| |
| // silly starting guess: |
| final AtomicInteger totTermCount = new AtomicInteger(100); |
| |
| final ExecutorService es = Executors.newCachedThreadPool(); |
| |
| while(System.currentTimeMillis() < stopTime && !failed.get()) { |
| if (random.nextBoolean()) { |
| if (VERBOSE) { |
| System.out.println("TEST: now reopen r=" + r); |
| } |
| final IndexReader r2 = r.reopen(); |
| if (r != r2) { |
| r.close(); |
| r = r2; |
| } |
| } else { |
| if (VERBOSE) { |
| System.out.println("TEST: now close reader=" + r); |
| } |
| r.close(); |
| writer.commit(); |
| final Set<String> openDeletedFiles = dir.getOpenDeletedFiles(); |
| if (openDeletedFiles.size() > 0) { |
| System.out.println("OBD files: " + openDeletedFiles); |
| } |
| any |= openDeletedFiles.size() > 0; |
| //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size()); |
| if (VERBOSE) { |
| System.out.println("TEST: now open"); |
| } |
| r = IndexReader.open(writer, true); |
| } |
| if (VERBOSE) { |
| System.out.println("TEST: got new reader=" + r); |
| } |
| //System.out.println("numDocs=" + r.numDocs() + " |
| //openDelFileCount=" + dir.openDeleteFileCount()); |
| |
| smokeTestReader(r); |
| |
| if (r.numDocs() > 0) { |
| |
| final IndexSearcher s = new IndexSearcher(r, es); |
| |
| // run search threads |
| final long searchStopTime = System.currentTimeMillis() + 500; |
| final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS]; |
| final AtomicInteger totHits = new AtomicInteger(); |
| for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) { |
| searchThreads[thread] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| TermEnum termEnum = s.getIndexReader().terms(new Term("body", "")); |
| int seenTermCount = 0; |
| int shift; |
| int trigger; |
| if (totTermCount.get() < 10) { |
| shift = 0; |
| trigger = 1; |
| } else { |
| trigger = totTermCount.get()/10; |
| shift = random.nextInt(trigger); |
| } |
| while(System.currentTimeMillis() < searchStopTime) { |
| Term term = termEnum.term(); |
| if (term == null) { |
| if (seenTermCount < 10) { |
| break; |
| } |
| totTermCount.set(seenTermCount); |
| seenTermCount = 0; |
| trigger = totTermCount.get()/10; |
| //System.out.println("trigger " + trigger); |
| shift = random.nextInt(trigger); |
| termEnum = s.getIndexReader().terms(new Term("body", "")); |
| continue; |
| } |
| seenTermCount++; |
| // search 10 terms |
| if (trigger == 0) { |
| trigger = 1; |
| } |
| if ((seenTermCount + shift) % trigger == 0) { |
| //if (VERBOSE) { |
| //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString()); |
| //} |
| totHits.addAndGet(runQuery(s, new TermQuery(term))); |
| } |
| termEnum.next(); |
| } |
| if (VERBOSE) { |
| System.out.println(Thread.currentThread().getName() + ": search done"); |
| } |
| } catch (Throwable t) { |
| System.out.println(Thread.currentThread().getName() + ": hit exc"); |
| failed.set(true); |
| t.printStackTrace(System.out); |
| throw new RuntimeException(t); |
| } |
| } |
| }; |
| searchThreads[thread].setDaemon(true); |
| searchThreads[thread].start(); |
| } |
| |
| for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) { |
| searchThreads[thread].join(); |
| } |
| |
| if (VERBOSE) { |
| System.out.println("TEST: DONE search: totHits=" + totHits); |
| } |
| } else { |
| Thread.sleep(100); |
| } |
| } |
| |
| es.shutdown(); |
| es.awaitTermination(1, TimeUnit.SECONDS); |
| |
| if (VERBOSE) { |
| System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]"); |
| } |
| |
| //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount()); |
| r.close(); |
| final Set<String> openDeletedFiles = dir.getOpenDeletedFiles(); |
| if (openDeletedFiles.size() > 0) { |
| System.out.println("OBD files: " + openDeletedFiles); |
| } |
| any |= openDeletedFiles.size() > 0; |
| |
| assertFalse("saw non-zero open-but-deleted count", any); |
| if (VERBOSE) { |
| System.out.println("TEST: now join"); |
| } |
| for(int thread=0;thread<NUM_INDEX_THREADS;thread++) { |
| threads[thread].join(); |
| } |
| if (VERBOSE) { |
| System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount); |
| } |
| |
| final IndexReader r2 = writer.getReader(); |
| final IndexSearcher s = newSearcher(r2); |
| boolean doFail = false; |
| for(String id : delIDs) { |
| final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1); |
| if (hits.totalHits != 0) { |
| System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc); |
| doFail = true; |
| } |
| } |
| |
| // Make sure each group of sub-docs are still in docID order: |
| for(SubDocs subDocs : allSubDocs) { |
| if (!subDocs.deleted) { |
| // We sort by relevance but the scores should be identical so sort falls back to by docID: |
| TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20); |
| assertEquals(subDocs.subIDs.size(), hits.totalHits); |
| int lastDocID = -1; |
| int startDocID = -1; |
| for(ScoreDoc scoreDoc : hits.scoreDocs) { |
| final int docID = scoreDoc.doc; |
| if (lastDocID != -1) { |
| assertEquals(1+lastDocID, docID); |
| } else { |
| startDocID = docID; |
| } |
| lastDocID = docID; |
| final Document doc = s.doc(docID); |
| assertEquals(subDocs.packID, doc.get("packID")); |
| } |
| |
| lastDocID = startDocID - 1; |
| for(String subID : subDocs.subIDs) { |
| hits = s.search(new TermQuery(new Term("docid", subID)), 1); |
| assertEquals(1, hits.totalHits); |
| final int docID = hits.scoreDocs[0].doc; |
| if (lastDocID != -1) { |
| assertEquals(1+lastDocID, docID); |
| } |
| lastDocID = docID; |
| } |
| } else { |
| for(String subID : subDocs.subIDs) { |
| assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits); |
| } |
| } |
| } |
| |
| final int endID = Integer.parseInt(docs.nextDoc().get("docid")); |
| for(int id=0;id<endID;id++) { |
| String stringID = ""+id; |
| if (!delIDs.contains(stringID)) { |
| final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1); |
| if (hits.totalHits != 1) { |
| System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits); |
| doFail = true; |
| } |
| } |
| } |
| assertFalse(doFail); |
| |
| assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs()); |
| r2.close(); |
| |
| writer.commit(); |
| assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs()); |
| |
| assertFalse(writer.anyNonBulkMerges); |
| writer.close(false); |
| _TestUtil.checkIndex(dir); |
| s.close(); |
| dir.close(); |
| _TestUtil.rmDir(tempDir); |
| docs.close(); |
| if (VERBOSE) { |
| System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]"); |
| } |
| } |
| |
| private int runQuery(IndexSearcher s, Query q) throws Exception { |
| s.search(q, 10); |
| return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits; |
| } |
| |
| private void smokeTestReader(IndexReader r) throws Exception { |
| IndexSearcher s = newSearcher(r); |
| runQuery(s, new TermQuery(new Term("body", "united"))); |
| runQuery(s, new TermQuery(new Term("titleTokenized", "states"))); |
| PhraseQuery pq = new PhraseQuery(); |
| pq.add(new Term("body", "united")); |
| pq.add(new Term("body", "states")); |
| runQuery(s, pq); |
| s.close(); |
| } |
| } |