blob: 74502673bef869551102a3667dd57c96bebb9bbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
private static LineFileDocs lineDocFile;
@BeforeClass
public static void beforeClass() throws Exception {
lineDocFile = new LineFileDocs(random());
}
@AfterClass
public static void afterClass() throws Exception {
lineDocFile.close();
lineDocFile = null;
}
public void testFlushByRam() throws IOException, InterruptedException {
final double ramBuffer = (TEST_NIGHTLY ? 1 : 10) + atLeast(2)
+ random().nextDouble();
runFlushByRam(1 + random().nextInt(TEST_NIGHTLY ? 5 : 1), ramBuffer, false);
}
public void testFlushByRamLargeBuffer() throws IOException, InterruptedException {
// with a 256 mb ram buffer we should never stall
runFlushByRam(1 + random().nextInt(TEST_NIGHTLY ? 5 : 1), 256.d, true);
}
protected void runFlushByRam(int numThreads, double maxRamMB,
boolean ensureNotStalled) throws IOException, InterruptedException {
final int numDocumentsToIndex = 10 + atLeast(30);
AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
Directory dir = newDirectory();
MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy);
iwc.setRAMBufferSizeMB(maxRamMB);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
assertFalse(flushPolicy.flushOnDocCount());
assertTrue(flushPolicy.flushOnRAM());
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
assertEquals(" bytes must be 0 after init", 0, writer.getFlushingBytes());
IndexThread[] threads = new IndexThread[numThreads];
for (int x = 0; x < threads.length; x++) {
threads[x] = new IndexThread(numDocs, numThreads, writer, lineDocFile,
false);
threads[x].start();
}
for (int x = 0; x < threads.length; x++) {
threads[x].join();
}
final long maxRAMBytes = (long) (iwc.getRAMBufferSizeMB() * 1024. * 1024.);
assertEquals(" all flushes must be due numThreads=" + numThreads, 0,
writer.getFlushingBytes());
assertEquals(numDocumentsToIndex, writer.getDocStats().numDocs);
assertEquals(numDocumentsToIndex, writer.getDocStats().maxDoc);
assertTrue("peak bytes without flush exceeded watermark",
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
assertActiveBytesAfter(flushControl);
if (flushPolicy.hasMarkedPending) {
assertTrue(maxRAMBytes < flushControl.getPeakActiveBytes());
}
if (ensureNotStalled) {
assertFalse(docsWriter.flushControl.stallControl.wasStalled());
}
writer.close();
assertEquals(0, flushControl.activeBytes());
dir.close();
}
public void testFlushDocCount() throws IOException, InterruptedException {
int[] numThreads = new int[] { 2 + atLeast(1), 1 };
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
for (int i = 0; i < numThreads.length; i++) {
final int numDocumentsToIndex = 50 + atLeast(30);
AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
Directory dir = newDirectory();
MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy);
iwc.setMaxBufferedDocs(2 + atLeast(10));
iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
assertTrue(flushPolicy.flushOnDocCount());
assertFalse(flushPolicy.flushOnRAM());
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
assertEquals(" bytes must be 0 after init", 0, writer.getFlushingBytes());
IndexThread[] threads = new IndexThread[numThreads[i]];
for (int x = 0; x < threads.length; x++) {
threads[x] = new IndexThread(numDocs, numThreads[i], writer,
lineDocFile, false);
threads[x].start();
}
for (int x = 0; x < threads.length; x++) {
threads[x].join();
}
assertEquals(" all flushes must be due numThreads=" + numThreads[i], 0,
writer.getFlushingBytes());
assertEquals(numDocumentsToIndex, writer.getDocStats().numDocs);
assertEquals(numDocumentsToIndex, writer.getDocStats().maxDoc);
assertTrue("peak bytes without flush exceeded watermark",
flushPolicy.peakDocCountWithoutFlush <= iwc.getMaxBufferedDocs());
assertActiveBytesAfter(flushControl);
writer.close();
assertEquals(0, flushControl.activeBytes());
dir.close();
}
}
public void testRandom() throws IOException, InterruptedException {
final int numThreads = 1 + random().nextInt(8);
final int numDocumentsToIndex = 50 + atLeast(70);
AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
Directory dir = newDirectory();
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
iwc.setFlushPolicy(flushPolicy);
IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
assertEquals(" bytes must be 0 after init", 0, writer.getFlushingBytes());
IndexThread[] threads = new IndexThread[numThreads];
for (int x = 0; x < threads.length; x++) {
threads[x] = new IndexThread(numDocs, numThreads, writer, lineDocFile,
true);
threads[x].start();
}
for (int x = 0; x < threads.length; x++) {
threads[x].join();
}
assertEquals(" all flushes must be due", 0, writer.getFlushingBytes());
assertEquals(numDocumentsToIndex, writer.getDocStats().numDocs);
assertEquals(numDocumentsToIndex, writer.getDocStats().maxDoc);
if (flushPolicy.flushOnRAM() && !flushPolicy.flushOnDocCount()) {
final long maxRAMBytes = (long) (iwc.getRAMBufferSizeMB() * 1024. * 1024.);
assertTrue("peak bytes without flush exceeded watermark",
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
if (flushPolicy.hasMarkedPending) {
assertTrue("max: " + maxRAMBytes + " " + flushControl.getPeakActiveBytes(),
maxRAMBytes <= flushControl.getPeakActiveBytes());
}
}
assertActiveBytesAfter(flushControl);
writer.commit();
assertEquals(0, flushControl.activeBytes());
IndexReader r = DirectoryReader.open(dir);
assertEquals(numDocumentsToIndex, r.numDocs());
assertEquals(numDocumentsToIndex, r.maxDoc());
if (!flushPolicy.flushOnRAM()) {
assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled());
assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
}
r.close();
writer.close();
dir.close();
}
public void testStallControl() throws InterruptedException, IOException {
int[] numThreads = new int[] { 4 + random().nextInt(8), 1 };
final int numDocumentsToIndex = 50 + random().nextInt(50);
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
for (int i = 0; i < numThreads.length; i++) {
AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
MockDirectoryWrapper dir = newMockDirectory();
// mock a very slow harddisk sometimes here so that flushing is very slow
dir.setThrottling(MockDirectoryWrapper.Throttling.SOMETIMES);
IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy);
// with such a small ram buffer we should be stalled quite quickly
iwc.setRAMBufferSizeMB(0.25);
IndexWriter writer = new IndexWriter(dir, iwc);
IndexThread[] threads = new IndexThread[numThreads[i]];
for (int x = 0; x < threads.length; x++) {
threads[x] = new IndexThread(numDocs, numThreads[i], writer,
lineDocFile, false);
threads[x].start();
}
for (int x = 0; x < threads.length; x++) {
threads[x].join();
}
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;
assertEquals(" all flushes must be due", 0, writer.getFlushingBytes());
assertEquals(numDocumentsToIndex, writer.getDocStats().numDocs);
assertEquals(numDocumentsToIndex, writer.getDocStats().maxDoc);
if (numThreads[i] == 1) {
assertFalse(
"single thread must not block numThreads: " + numThreads[i],
docsWriter.flushControl.stallControl.hasBlocked());
}
if (docsWriter.flushControl.getPeakNetBytes() > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
assertTrue(docsWriter.flushControl.stallControl.wasStalled());
}
assertActiveBytesAfter(flushControl);
writer.close();
dir.close();
}
}
protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
Iterator<DocumentsWriterPerThread> allActiveWriter = flushControl.allActiveWriters();
long bytesUsed = 0;
while (allActiveWriter.hasNext()) {
DocumentsWriterPerThread next = allActiveWriter.next();
bytesUsed += next.ramBytesUsed();
}
assertEquals(bytesUsed, flushControl.activeBytes());
}
public static class IndexThread extends Thread {
IndexWriter writer;
LiveIndexWriterConfig iwc;
LineFileDocs docs;
private AtomicInteger pendingDocs;
private final boolean doRandomCommit;
public IndexThread(AtomicInteger pendingDocs, int numThreads,
IndexWriter writer, LineFileDocs docs, boolean doRandomCommit) {
this.pendingDocs = pendingDocs;
this.writer = writer;
iwc = writer.getConfig();
this.docs = docs;
this.doRandomCommit = doRandomCommit;
}
@Override
public void run() {
try {
long ramSize = 0;
while (pendingDocs.decrementAndGet() > -1) {
Document doc = docs.nextDoc();
writer.addDocument(doc);
long newRamSize = writer.ramBytesUsed();
if (newRamSize != ramSize) {
ramSize = newRamSize;
}
if (doRandomCommit) {
if (rarely()) {
writer.commit();
}
}
}
writer.commit();
} catch (Throwable ex) {
System.out.println("FAILED exc:");
ex.printStackTrace(System.out);
throw new RuntimeException(ex);
}
}
}
private static class MockDefaultFlushPolicy extends FlushByRamOrCountsPolicy {
long peakBytesWithoutFlush = Integer.MIN_VALUE;
long peakDocCountWithoutFlush = Integer.MIN_VALUE;
boolean hasMarkedPending = false;
@Override
public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
findPending(control, pending, notPending);
final boolean flushCurrent = perThread.isFlushPending();
final DocumentsWriterPerThread toFlush;
if (perThread.isFlushPending()) {
toFlush = perThread;
} else {
toFlush = null;
}
super.onDelete(control, perThread);
if (toFlush != null) {
if (flushCurrent) {
assertTrue(pending.remove(toFlush));
} else {
assertTrue(notPending.remove(toFlush));
}
assertTrue(toFlush.isFlushPending());
hasMarkedPending = true;
}
for (DocumentsWriterPerThread dwpt : notPending) {
assertFalse(dwpt.isFlushPending());
}
}
@Override
public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) {
final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
findPending(control, pending, notPending);
final boolean flushCurrent = dwpt.isFlushPending();
long activeBytes = control.activeBytes();
final DocumentsWriterPerThread toFlush;
if (dwpt.isFlushPending()) {
toFlush = dwpt;
} else if (flushOnDocCount()
&& dwpt.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) {
toFlush = dwpt;
} else if (flushOnRAM()
&& activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) {
toFlush = findLargestNonPendingWriter(control, dwpt);
assertFalse(toFlush.isFlushPending());
} else {
toFlush = null;
}
super.onInsert(control, dwpt);
if (toFlush != null) {
if (flushCurrent) {
assertTrue(pending.remove(toFlush));
} else {
assertTrue(notPending.remove(toFlush));
}
assertTrue(toFlush.isFlushPending());
hasMarkedPending = true;
} else {
peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
peakDocCountWithoutFlush = Math.max(dwpt.getNumDocsInRAM(),
peakDocCountWithoutFlush);
}
for (DocumentsWriterPerThread perThread : notPending) {
assertFalse(perThread.isFlushPending());
}
}
}
static void findPending(DocumentsWriterFlushControl flushControl,
ArrayList<DocumentsWriterPerThread> pending, ArrayList<DocumentsWriterPerThread> notPending) {
Iterator<DocumentsWriterPerThread> allActiveThreads = flushControl.allActiveWriters();
while (allActiveThreads.hasNext()) {
DocumentsWriterPerThread next = allActiveThreads.next();
if (next.isFlushPending()) {
pending.add(next);
} else {
notPending.add(next);
}
}
}
}