blob: b14a887ae5197ed664ec9f152e60e7f6daf57b9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
@LuceneTestCase.SuppressCodecs({"SimpleText", "Direct"})
public class TestIndexWriterThreadsToSegments extends LuceneTestCase {
// LUCENE-5644: for first segment, two threads each indexed one doc (likely concurrently), but for second segment, each thread indexed the
// doc NOT at the same time, and should have shared the same thread state / segment
public void testSegmentCountOnFlushBasic() throws Exception {
Directory dir = newDirectory();
final IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())));
final CountDownLatch startingGun = new CountDownLatch(1);
final CountDownLatch startDone = new CountDownLatch(2);
final CountDownLatch middleGun = new CountDownLatch(1);
final CountDownLatch finalGun = new CountDownLatch(1);
Thread[] threads = new Thread[2];
for(int i=0;i<threads.length;i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
Document doc = new Document();
doc.add(newTextField("field", "here is some text", Field.Store.NO));
w.addDocument(doc);
startDone.countDown();
middleGun.await();
if (threadID == 0) {
w.addDocument(doc);
} else {
finalGun.await();
w.addDocument(doc);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
startDone.await();
IndexReader r = DirectoryReader.open(w);
assertEquals(2, r.numDocs());
int numSegments = r.leaves().size();
// 1 segment if the threads ran sequentially, else 2:
assertTrue(numSegments <= 2);
r.close();
middleGun.countDown();
threads[0].join();
finalGun.countDown();
threads[1].join();
r = DirectoryReader.open(w);
assertEquals(4, r.numDocs());
// Both threads should have shared a single thread state since they did not try to index concurrently:
assertEquals(1+numSegments, r.leaves().size());
r.close();
w.close();
dir.close();
}
/** Maximum number of simultaneous threads to use for each iteration. */
private static final int MAX_THREADS_AT_ONCE = 10;
static class CheckSegmentCount implements Runnable, Closeable {
private final IndexWriter w;
private final AtomicInteger maxThreadCountPerIter;
private final AtomicInteger indexingCount;
private DirectoryReader r;
public CheckSegmentCount(IndexWriter w, AtomicInteger maxThreadCountPerIter, AtomicInteger indexingCount) throws IOException {
this.w = w;
this.maxThreadCountPerIter = maxThreadCountPerIter;
this.indexingCount = indexingCount;
r = DirectoryReader.open(w);
assertEquals(0, r.leaves().size());
setNextIterThreadCount();
}
@Override
public void run() {
try {
int oldSegmentCount = r.leaves().size();
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
assertNotNull(r2);
r.close();
r = r2;
int maxExpectedSegments = oldSegmentCount + maxThreadCountPerIter.get();
if (VERBOSE) {
System.out.println("TEST: iter done; now verify oldSegCount=" + oldSegmentCount + " newSegCount=" + r2.leaves().size() + " maxExpected=" + maxExpectedSegments);
}
// NOTE: it won't necessarily be ==, in case some threads were strangely scheduled and never conflicted with one another (should be uncommon...?):
assertTrue(r.leaves().size() <= maxExpectedSegments);
setNextIterThreadCount();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void setNextIterThreadCount() {
indexingCount.set(0);
maxThreadCountPerIter.set(TestUtil.nextInt(random(), 1, MAX_THREADS_AT_ONCE));
if (VERBOSE) {
System.out.println("TEST: iter set maxThreadCount=" + maxThreadCountPerIter.get());
}
}
@Override
public void close() throws IOException {
r.close();
r = null;
}
}
// LUCENE-5644: index docs w/ multiple threads but in between flushes we limit how many threads can index concurrently in the next
// iteration, and then verify that no more segments were flushed than number of threads:
public void testSegmentCountOnFlushRandom() throws Exception {
Directory dir = newFSDirectory(createTempDir());
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
// Never trigger flushes (so we only flush on getReader):
iwc.setMaxBufferedDocs(100000000);
iwc.setRAMBufferSizeMB(-1);
// Never trigger merges (so we can simplistically count flushed segments):
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
final IndexWriter w = new IndexWriter(dir, iwc);
// How many threads are indexing in the current cycle:
final AtomicInteger indexingCount = new AtomicInteger();
// How many threads we will use on each cycle:
final AtomicInteger maxThreadCount = new AtomicInteger();
CheckSegmentCount checker = new CheckSegmentCount(w, maxThreadCount, indexingCount);
// We spin up 10 threads up front, but then in between flushes we limit how many can run on each iteration
final int ITERS = TEST_NIGHTLY ? 300 : 10;
Thread[] threads = new Thread[MAX_THREADS_AT_ONCE];
// We use this to stop all threads once they've indexed their docs in the current iter, and pull a new NRT reader, and verify the
// segment count:
final CyclicBarrier barrier = new CyclicBarrier(MAX_THREADS_AT_ONCE, checker);
for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
for(int iter=0;iter<ITERS;iter++) {
if (indexingCount.incrementAndGet() <= maxThreadCount.get()) {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": do index");
}
// We get to index on this cycle:
Document doc = new Document();
doc.add(new TextField("field", "here is some text that is a bit longer than normal trivial text", Field.Store.NO));
for(int j=0;j<200;j++) {
w.addDocument(doc);
}
} else {
// We lose: no indexing for us on this cycle
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": don't index");
}
}
barrier.await();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
for(Thread t : threads) {
t.join();
}
IOUtils.close(checker, w, dir);
}
public void testManyThreadsClose() throws Exception {
Directory dir = newDirectory();
Random r = random();
IndexWriterConfig iwc = newIndexWriterConfig(r, new MockAnalyzer(r));
iwc.setCommitOnClose(false);
final RandomIndexWriter w = new RandomIndexWriter(r, dir, iwc);
w.setDoRandomForceMerge(false);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 4, 30)];
final CountDownLatch startingGun = new CountDownLatch(1);
for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
Document doc = new Document();
doc.add(new TextField("field", "here is some text that is a bit longer than normal trivial text", Field.Store.NO));
for(int j=0;j<1000;j++) {
w.addDocument(doc);
}
} catch (AlreadyClosedException ace) {
// ok
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
Thread.sleep(100);
try {
w.close();
} catch (IllegalStateException ise) {
// OK but not required
}
for(Thread t : threads) {
t.join();
}
w.close();
dir.close();
}
public void testDocsStuckInRAMForever() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setRAMBufferSizeMB(.2);
Codec codec = TestUtil.getDefaultCodec();
iwc.setCodec(codec);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
final IndexWriter w = new IndexWriter(dir, iwc);
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[2];
for(int i=0;i<threads.length;i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int j=0;j<1000;j++) {
Document doc = new Document();
doc.add(newStringField("field", "threadID" + threadID, Field.Store.NO));
w.addDocument(doc);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread t : threads) {
t.join();
}
Set<String> segSeen = new HashSet<>();
int thread0Count = 0;
int thread1Count = 0;
// At this point the writer should have 2 thread states w/ docs; now we index with only 1 thread until we see all 1000 thread0 & thread1
// docs flushed. If the writer incorrectly holds onto previously indexed docs forever then this will run forever:
long counter = 0;
long checkAt = 100;
while (thread0Count < 1000 || thread1Count < 1000) {
Document doc = new Document();
doc.add(newStringField("field", "threadIDmain", Field.Store.NO));
w.addDocument(doc);
if (counter++ == checkAt) {
for(String fileName : dir.listAll()) {
if (fileName.endsWith(".si")) {
String segName = IndexFileNames.parseSegmentName(fileName);
if (segSeen.contains(segName) == false) {
segSeen.add(segName);
byte id[] = readSegmentInfoID(dir, fileName);
SegmentInfo si = TestUtil.getDefaultCodec().segmentInfoFormat().read(dir, segName, id, IOContext.DEFAULT);
si.setCodec(codec);
SegmentCommitInfo sci = new SegmentCommitInfo(si, 0, 0, -1, -1, -1, StringHelper.randomId());
SegmentReader sr = new SegmentReader(sci, Version.LATEST.major, IOContext.DEFAULT);
try {
thread0Count += sr.docFreq(new Term("field", "threadID0"));
thread1Count += sr.docFreq(new Term("field", "threadID1"));
} finally {
sr.close();
}
}
}
}
checkAt = (long) (checkAt * 1.25);
counter = 0;
}
}
w.close();
dir.close();
}
// TODO: remove this hack and fix this test to be better?
// the whole thing relies on default codec too...
byte[] readSegmentInfoID(Directory dir, String file) throws IOException {
try (IndexInput in = dir.openInput(file, IOContext.DEFAULT)) {
in.readInt(); // magic
in.readString(); // codec name
in.readInt(); // version
byte id[] = new byte[StringHelper.ID_LENGTH];
in.readBytes(id, 0, id.length);
return id;
}
}
}