blob: d8cf98cc98fb43e9ce3a5eaa1d171293e241faf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
@SuppressCodecs({ "SimpleText", "Direct" })
public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase {
// Not guaranteed to reflect deletes:
private SearcherManager nrtNoDeletes;
// Is guaranteed to reflect deletes:
private SearcherManager nrtDeletes;
private IndexWriter genWriter;
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
private final ThreadLocal<Long> lastGens = new ThreadLocal<>();
private boolean warmCalled;
public void testControlledRealTimeReopenThread() throws Exception {
runTest("TestControlledRealTimeReopenThread");
}
@Override
protected IndexSearcher getFinalSearcher() throws Exception {
if (VERBOSE) {
System.out.println("TEST: finalSearcher maxGen=" + maxGen);
}
nrtDeletesThread.waitForGeneration(maxGen);
return nrtDeletes.acquire();
}
@Override
protected Directory getDirectory(Directory in) {
// Randomly swap in NRTCachingDir
if (random().nextBoolean()) {
if (VERBOSE) {
System.out.println("TEST: wrap NRTCachingDir");
}
return new NRTCachingDirectory(in, 5.0, 60.0);
} else {
return in;
}
}
@Override
protected void updateDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = genWriter.updateDocuments(id, docs);
// Randomly verify the update "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void addDocuments(Term id, List<? extends Iterable<? extends IndexableField>> docs) throws Exception {
final long gen = genWriter.addDocuments(docs);
// Randomly verify the add "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen);
}
nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals("generation: " + gen, docs.size(), s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void addDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = genWriter.addDocument(doc);
// Randomly verify the add "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen);
}
nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtNoDeletes.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void updateDocument(Term id, Iterable<? extends IndexableField> doc) throws Exception {
final long gen = genWriter.updateDocument(id, doc);
// Randomly verify the udpate "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals("generation: " + gen, 1, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void deleteDocuments(Term id) throws Exception {
final long gen = genWriter.deleteDocuments(id);
// randomly verify the delete "took":
if (random().nextInt(20) == 7) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits.value);
} finally {
nrtDeletes.release(s);
}
}
lastGens.set(gen);
}
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());
if (VERBOSE) {
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
genWriter = writer;
final SearcherFactory sf = new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader r, IndexReader previous) throws IOException {
TestControlledRealTimeReopenThread.this.warmCalled = true;
IndexSearcher s = new IndexSearcher(r, es);
s.search(new TermQuery(new Term("body", "united")), 10);
return s;
}
};
nrtNoDeletes = new SearcherManager(writer, false, false, sf);
nrtDeletes = new SearcherManager(writer, sf);
nrtDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
nrtDeletesThread.setName("NRTDeletes Reopen Thread");
nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtDeletesThread.setDaemon(true);
nrtDeletesThread.start();
nrtNoDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec);
nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
nrtNoDeletesThread.setDaemon(true);
nrtNoDeletesThread.start();
}
@Override
protected void doAfterIndexingThreadDone() {
Long gen = lastGens.get();
if (gen != null) {
addMaxGen(gen);
}
}
private long maxGen = -1;
private synchronized void addMaxGen(long gen) {
maxGen = Math.max(gen, maxGen);
}
@Override
protected void doSearching(ExecutorService es, long stopTime) throws Exception {
runSearchThreads(stopTime);
}
@Override
protected IndexSearcher getCurrentSearcher() throws Exception {
// Test doesn't assert deletions until the end, so we
// can randomize whether dels must be applied
final SearcherManager nrt;
if (random().nextBoolean()) {
nrt = nrtDeletes;
} else {
nrt = nrtNoDeletes;
}
return nrt.acquire();
}
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
// NOTE: a bit iffy... technically you should release
// against the same SearcherManager you acquired from... but
// both impls just decRef the underlying reader so we
// can get away w/ cheating:
nrtNoDeletes.release(s);
}
@Override
protected void doClose() throws Exception {
assertTrue(warmCalled);
if (VERBOSE) {
System.out.println("TEST: now close SearcherManagers");
}
nrtDeletesThread.close();
nrtDeletes.close();
nrtNoDeletesThread.close();
nrtNoDeletes.close();
}
/*
* LUCENE-3528 - NRTManager hangs in certain situations
*/
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
conf.setMergePolicy(NoMergePolicy.INSTANCE);
Directory d = newDirectory();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
final SearcherManager manager = new SearcherManager(writer, false, false, null);
Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES));
writer.addDocument(doc);
manager.maybeRefresh();
Thread t = new Thread() {
@Override
public void run() {
try {
signal.await();
manager.maybeRefresh();
writer.deleteDocuments(new TermQuery(new Term("foo", "barista")));
manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown(); // let the add below finish
}
}
};
t.start();
writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
// We now eagerly resolve deletes so the manager should see it after update:
assertTrue(manager.isSearcherCurrent());
IndexSearcher searcher = manager.acquire();
try {
assertEquals(2, searcher.getIndexReader().numDocs());
} finally {
manager.release(searcher);
}
final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<>(writer, manager, 0.01, 0.01);
thread.start(); // start reopening
if (VERBOSE) {
System.out.println("waiting now for generation " + lastGen);
}
final AtomicBoolean finished = new AtomicBoolean(false);
Thread waiter = new Thread() {
@Override
public void run() {
try {
thread.waitForGeneration(lastGen);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
finished.set(true);
}
};
waiter.start();
manager.maybeRefresh();
waiter.join(1000);
if (!finished.get()) {
waiter.interrupt();
fail("thread deadlocked on waitForGeneration");
}
thread.close();
thread.join();
writer.close();
IOUtils.close(manager, d);
}
public static class LatchedIndexWriter extends IndexWriter {
private CountDownLatch latch;
boolean waitAfterUpdate = false;
private CountDownLatch signal;
public LatchedIndexWriter(Directory d, IndexWriterConfig conf,
CountDownLatch latch, CountDownLatch signal)
throws IOException {
super(d, conf);
this.latch = latch;
this.signal = signal;
}
@Override
public long updateDocument(Term term,
Iterable<? extends IndexableField> doc)
throws IOException {
long result = super.updateDocument(term, doc);
try {
if (waitAfterUpdate) {
signal.countDown();
latch.await();
}
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
return result;
}
}
public void testEvilSearcherFactory() throws Exception {
final Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
w.commit();
final IndexReader other = DirectoryReader.open(dir);
final SearcherFactory theEvilOne = new SearcherFactory() {
@Override
public IndexSearcher newSearcher(IndexReader ignored, IndexReader previous) {
return LuceneTestCase.newSearcher(other);
}
};
expectThrows(IllegalStateException.class, () -> {
new SearcherManager(w.w, false, false, theEvilOne);
});
w.close();
other.close();
dir.close();
}
public void testListenerCalled() throws Exception {
Directory dir = newDirectory();
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(null));
final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false);
SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {
}
@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
afterRefreshCalled.set(true);
}
}
});
iw.addDocument(new Document());
iw.commit();
assertFalse(afterRefreshCalled.get());
sm.maybeRefreshBlocking();
assertTrue(afterRefreshCalled.get());
sm.close();
iw.close();
dir.close();
}
// Relies on wall clock time, so it can easily false-fail when the machine is otherwise busy:
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/LUCENE-5737")
// LUCENE-5461
public void testCRTReopen() throws Exception {
//test behaving badly
//should be high enough
int maxStaleSecs = 20;
//build crap data just to store it.
String s = " abcdefghijklmnopqrstuvwxyz ";
char[] chars = s.toCharArray();
StringBuilder builder = new StringBuilder(2048);
for (int i = 0; i < 2048; i++) {
builder.append(chars[random().nextInt(chars.length)]);
}
String content = builder.toString();
final SnapshotDeletionPolicy sdp = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
final Directory dir = new NRTCachingDirectory(newFSDirectory(createTempDir("nrt")), 5, 128);
IndexWriterConfig config = new IndexWriterConfig(new MockAnalyzer(random()));
config.setCommitOnClose(true);
config.setIndexDeletionPolicy(sdp);
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
final IndexWriter iw = new IndexWriter(dir, config);
SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
new ControlledRealTimeReopenThread<>(iw, sm, maxStaleSecs, 0);
controlledRealTimeReopenThread.setDaemon(true);
controlledRealTimeReopenThread.start();
List<Thread> commitThreads = new ArrayList<>();
for (int i = 0; i < 500; i++) {
if (i > 0 && i % 50 == 0) {
Thread commitThread = new Thread(new Runnable() {
@Override
public void run() {
try {
iw.commit();
IndexCommit ic = sdp.snapshot();
for (String name : ic.getFileNames()) {
//distribute, and backup
//System.out.println(names);
assertTrue(slowFileExists(dir, name));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
commitThread.start();
commitThreads.add(commitThread);
}
Document d = new Document();
d.add(new TextField("count", i + "", Field.Store.NO));
d.add(new TextField("content", content, Field.Store.YES));
long start = System.currentTimeMillis();
long l = iw.addDocument(d);
controlledRealTimeReopenThread.waitForGeneration(l);
long wait = System.currentTimeMillis() - start;
assertTrue("waited too long for generation " + wait,
wait < (maxStaleSecs *1000));
IndexSearcher searcher = sm.acquire();
TopDocs td = searcher.search(new TermQuery(new Term("count", i + "")), 10);
sm.release(searcher);
assertEquals(1, td.totalHits.value);
}
for(Thread commitThread : commitThreads) {
commitThread.join();
}
controlledRealTimeReopenThread.close();
sm.close();
iw.close();
dir.close();
}
public void testDeleteAll() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
SearcherManager mgr = new SearcherManager(w, new SearcherFactory());
nrtDeletesThread = new ControlledRealTimeReopenThread<>(w, mgr, 0.1, 0.01);
nrtDeletesThread.setName("NRTDeletes Reopen Thread");
nrtDeletesThread.setDaemon(true);
nrtDeletesThread.start();
long gen1 = w.addDocument(new Document());
long gen2 = w.deleteAll();
nrtDeletesThread.waitForGeneration(gen2);
IOUtils.close(nrtDeletesThread, nrtDeletes, w, dir);
}
}