| package org.apache.lucene.index; |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.util.concurrent.CountDownLatch; |
| |
| import org.apache.lucene.analysis.MockAnalyzer; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.document.Field; |
| import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.MockDirectoryWrapper; |
| import org.apache.lucene.util.LuceneTestCase; |
| import org.apache.lucene.util.ThreadInterruptedException; |
| |
| /** |
| * MultiThreaded IndexWriter tests |
| */ |
| public class TestIndexWriterWithThreads extends LuceneTestCase { |
| |
| // Used by test cases below |
| private class IndexerThread extends Thread { |
| |
| boolean diskFull; |
| Throwable error; |
| AlreadyClosedException ace; |
| IndexWriter writer; |
| boolean noErrors; |
| volatile int addCount; |
| |
| public IndexerThread(IndexWriter writer, boolean noErrors) { |
| this.writer = writer; |
| this.noErrors = noErrors; |
| } |
| |
| @Override |
| public void run() { |
| |
| final Document doc = new Document(); |
| doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); |
| |
| int idUpto = 0; |
| int fullCount = 0; |
| final long stopTime = System.currentTimeMillis() + 200; |
| |
| do { |
| try { |
| writer.updateDocument(new Term("id", ""+(idUpto++)), doc); |
| addCount++; |
| } catch (IOException ioe) { |
| if (VERBOSE) { |
| System.out.println("TEST: expected exc:"); |
| ioe.printStackTrace(System.out); |
| } |
| //System.out.println(Thread.currentThread().getName() + ": hit exc"); |
| //ioe.printStackTrace(System.out); |
| if (ioe.getMessage().startsWith("fake disk full at") || |
| ioe.getMessage().equals("now failing on purpose")) { |
| diskFull = true; |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException ie) { |
| throw new ThreadInterruptedException(ie); |
| } |
| if (fullCount++ >= 5) |
| break; |
| } else { |
| if (noErrors) { |
| System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:"); |
| ioe.printStackTrace(System.out); |
| error = ioe; |
| } |
| break; |
| } |
| } catch (Throwable t) { |
| //t.printStackTrace(System.out); |
| if (noErrors) { |
| System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:"); |
| t.printStackTrace(System.out); |
| error = t; |
| } |
| break; |
| } |
| } while(System.currentTimeMillis() < stopTime); |
| } |
| } |
| |
| // LUCENE-1130: make sure immediate disk full on creating |
| // an IndexWriter (hit during DW.ThreadState.init()), with |
| // multiple threads, is OK: |
| public void testImmediateDiskFullWithThreads() throws Exception { |
| |
| int NUM_THREADS = 3; |
| |
| for(int iter=0;iter<10;iter++) { |
| if (VERBOSE) { |
| System.out.println("\nTEST: iter=" + iter); |
| } |
| MockDirectoryWrapper dir = newDirectory(); |
| IndexWriter writer = new IndexWriter( |
| dir, |
| newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)). |
| setMaxBufferedDocs(2). |
| setMergeScheduler(new ConcurrentMergeScheduler()). |
| setMergePolicy(newLogMergePolicy(4)) |
| ); |
| ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions(); |
| dir.setMaxSizeInBytes(4*1024+20*iter); |
| writer.setInfoStream(VERBOSE ? System.out : null); |
| |
| IndexerThread[] threads = new IndexerThread[NUM_THREADS]; |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i] = new IndexerThread(writer, true); |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i].start(); |
| |
| for(int i=0;i<NUM_THREADS;i++) { |
| // Without fix for LUCENE-1130: one of the |
| // threads will hang |
| threads[i].join(); |
| assertTrue("hit unexpected Throwable", threads[i].error == null); |
| } |
| |
| // Make sure once disk space is avail again, we can |
| // cleanly close: |
| dir.setMaxSizeInBytes(0); |
| writer.close(false); |
| dir.close(); |
| } |
| } |
| |
| |
| // LUCENE-1130: make sure we can close() even while |
| // threads are trying to add documents. Strictly |
| // speaking, this isn't valid us of Lucene's APIs, but we |
| // still want to be robust to this case: |
| public void testCloseWithThreads() throws Exception { |
| int NUM_THREADS = 3; |
| |
| for(int iter=0;iter<7;iter++) { |
| Directory dir = newDirectory(); |
| IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)) |
| .setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4)); |
| // We expect AlreadyClosedException |
| ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions(); |
| IndexWriter writer = new IndexWriter(dir, conf); |
| |
| IndexerThread[] threads = new IndexerThread[NUM_THREADS]; |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i] = new IndexerThread(writer, false); |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i].start(); |
| |
| boolean done = false; |
| while(!done) { |
| Thread.sleep(100); |
| for(int i=0;i<NUM_THREADS;i++) |
| // only stop when at least one thread has added a doc |
| if (threads[i].addCount > 0) { |
| done = true; |
| break; |
| } else if (!threads[i].isAlive()) { |
| fail("thread failed before indexing a single document"); |
| } |
| } |
| |
| writer.close(false); |
| |
| // Make sure threads that are adding docs are not hung: |
| for(int i=0;i<NUM_THREADS;i++) { |
| // Without fix for LUCENE-1130: one of the |
| // threads will hang |
| threads[i].join(); |
| if (threads[i].isAlive()) |
| fail("thread seems to be hung"); |
| } |
| |
| // Quick test to make sure index is not corrupt: |
| IndexReader reader = IndexReader.open(dir, true); |
| TermDocs tdocs = reader.termDocs(new Term("field", "aaa")); |
| int count = 0; |
| while(tdocs.next()) { |
| count++; |
| } |
| assertTrue(count > 0); |
| reader.close(); |
| |
| dir.close(); |
| } |
| } |
| |
| // Runs test, with multiple threads, using the specific |
| // failure to trigger an IOException |
| public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception { |
| |
| int NUM_THREADS = 3; |
| |
| for(int iter=0;iter<2;iter++) { |
| if (VERBOSE) { |
| System.out.println("TEST: iter=" + iter); |
| } |
| MockDirectoryWrapper dir = newDirectory(); |
| IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, |
| new MockAnalyzer(random)).setMaxBufferedDocs(2) |
| .setMergeScheduler(new ConcurrentMergeScheduler()) |
| .setMergePolicy(newLogMergePolicy(4)); |
| // We expect disk full exceptions in the merge threads |
| ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions(); |
| IndexWriter writer = new IndexWriter(dir, conf); |
| writer.setInfoStream(VERBOSE ? System.out : null); |
| |
| IndexerThread[] threads = new IndexerThread[NUM_THREADS]; |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i] = new IndexerThread(writer, true); |
| |
| for(int i=0;i<NUM_THREADS;i++) |
| threads[i].start(); |
| |
| Thread.sleep(10); |
| |
| dir.failOn(failure); |
| failure.setDoFail(); |
| |
| for(int i=0;i<NUM_THREADS;i++) { |
| threads[i].join(); |
| assertTrue("hit unexpected Throwable", threads[i].error == null); |
| } |
| |
| boolean success = false; |
| try { |
| writer.close(false); |
| success = true; |
| } catch (IOException ioe) { |
| failure.clearDoFail(); |
| writer.close(false); |
| } |
| |
| if (success) { |
| IndexReader reader = IndexReader.open(dir, true); |
| for(int j=0;j<reader.maxDoc();j++) { |
| if (!reader.isDeleted(j)) { |
| reader.document(j); |
| reader.getTermFreqVectors(j); |
| } |
| } |
| reader.close(); |
| } |
| |
| dir.close(); |
| } |
| } |
| |
| // Runs test, with one thread, using the specific failure |
| // to trigger an IOException |
| public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException { |
| MockDirectoryWrapper dir = newDirectory(); |
| |
| IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random)) |
| .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler())); |
| final Document doc = new Document(); |
| doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); |
| |
| for(int i=0;i<6;i++) |
| writer.addDocument(doc); |
| |
| dir.failOn(failure); |
| failure.setDoFail(); |
| try { |
| writer.addDocument(doc); |
| writer.addDocument(doc); |
| writer.commit(); |
| fail("did not hit exception"); |
| } catch (IOException ioe) { |
| } |
| failure.clearDoFail(); |
| writer.addDocument(doc); |
| writer.close(false); |
| dir.close(); |
| } |
| |
| // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort |
| private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure { |
| private boolean onlyOnce; |
| public FailOnlyOnAbortOrFlush(boolean onlyOnce) { |
| this.onlyOnce = onlyOnce; |
| } |
| @Override |
| public void eval(MockDirectoryWrapper dir) throws IOException { |
| if (doFail) { |
| StackTraceElement[] trace = new Exception().getStackTrace(); |
| boolean sawAbortOrFlushDoc = false; |
| boolean sawClose = false; |
| for (int i = 0; i < trace.length; i++) { |
| if ("abort".equals(trace[i].getMethodName()) || |
| "flushDocument".equals(trace[i].getMethodName())) { |
| sawAbortOrFlushDoc = true; |
| } |
| if ("close".equals(trace[i].getMethodName())) { |
| sawClose = true; |
| } |
| } |
| if (sawAbortOrFlushDoc && !sawClose) { |
| if (onlyOnce) |
| doFail = false; |
| //System.out.println(Thread.currentThread().getName() + ": now fail"); |
| //new Throwable().printStackTrace(System.out); |
| throw new IOException("now failing on purpose"); |
| } |
| } |
| } |
| } |
| |
| |
| |
| // LUCENE-1130: make sure initial IOException, and then 2nd |
| // IOException during rollback(), is OK: |
| public void testIOExceptionDuringAbort() throws IOException { |
| _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false)); |
| } |
| |
| // LUCENE-1130: make sure initial IOException, and then 2nd |
| // IOException during rollback(), is OK: |
| public void testIOExceptionDuringAbortOnlyOnce() throws IOException { |
| _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true)); |
| } |
| |
| // LUCENE-1130: make sure initial IOException, and then 2nd |
| // IOException during rollback(), with multiple threads, is OK: |
| public void testIOExceptionDuringAbortWithThreads() throws Exception { |
| _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false)); |
| } |
| |
| // LUCENE-1130: make sure initial IOException, and then 2nd |
| // IOException during rollback(), with multiple threads, is OK: |
| public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception { |
| _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true)); |
| } |
| |
| // Throws IOException during DocumentsWriter.writeSegment |
| private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure { |
| private boolean onlyOnce; |
| public FailOnlyInWriteSegment(boolean onlyOnce) { |
| this.onlyOnce = onlyOnce; |
| } |
| @Override |
| public void eval(MockDirectoryWrapper dir) throws IOException { |
| if (doFail) { |
| StackTraceElement[] trace = new Exception().getStackTrace(); |
| for (int i = 0; i < trace.length; i++) { |
| if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) { |
| if (onlyOnce) |
| doFail = false; |
| throw new IOException("now failing on purpose"); |
| } |
| } |
| } |
| } |
| } |
| |
| // LUCENE-1130: test IOException in writeSegment |
| public void testIOExceptionDuringWriteSegment() throws IOException { |
| _testSingleThreadFailure(new FailOnlyInWriteSegment(false)); |
| } |
| |
| // LUCENE-1130: test IOException in writeSegment |
| public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException { |
| _testSingleThreadFailure(new FailOnlyInWriteSegment(true)); |
| } |
| |
| // LUCENE-1130: test IOException in writeSegment, with threads |
| public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception { |
| _testMultipleThreadsFailure(new FailOnlyInWriteSegment(false)); |
| } |
| |
| // LUCENE-1130: test IOException in writeSegment, with threads |
| public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception { |
| _testMultipleThreadsFailure(new FailOnlyInWriteSegment(true)); |
| } |
| |
| // LUCENE-3365: Test adding two documents with the same field from two different IndexWriters |
| // that we attempt to open at the same time. As long as the first IndexWriter completes |
| // and closes before the second IndexWriter time's out trying to get the Lock, |
| // we should see both documents |
| public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException { |
| final MockDirectoryWrapper dir = newDirectory(); |
| CountDownLatch oneIWConstructed = new CountDownLatch(1); |
| DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable( |
| dir, oneIWConstructed); |
| DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable( |
| dir, oneIWConstructed); |
| |
| thread1.start(); |
| thread2.start(); |
| oneIWConstructed.await(); |
| |
| thread1.startIndexing(); |
| thread2.startIndexing(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| assertFalse("Failed due to: " + thread1.failure, thread1.failed); |
| assertFalse("Failed due to: " + thread2.failure, thread2.failed); |
| // now verify that we have two documents in the index |
| IndexReader reader = IndexReader.open(dir, true); |
| assertEquals("IndexReader should have one document per thread running", 2, |
| reader.numDocs()); |
| |
| reader.close(); |
| dir.close(); |
| } |
| |
| static class DelayedIndexAndCloseRunnable extends Thread { |
| private final Directory dir; |
| boolean failed = false; |
| Throwable failure = null; |
| private final CountDownLatch startIndexing = new CountDownLatch(1); |
| private CountDownLatch iwConstructed; |
| |
| public DelayedIndexAndCloseRunnable(Directory dir, |
| CountDownLatch iwConstructed) { |
| this.dir = dir; |
| this.iwConstructed = iwConstructed; |
| } |
| |
| public void startIndexing() { |
| this.startIndexing.countDown(); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| Document doc = new Document(); |
| Field field = newField("field", "testData", Field.Store.YES, |
| Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS); |
| doc.add(field); |
| IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( |
| TEST_VERSION_CURRENT, new MockAnalyzer(random))); |
| iwConstructed.countDown(); |
| startIndexing.await(); |
| writer.addDocument(doc); |
| writer.close(); |
| } catch (Throwable e) { |
| failed = true; |
| failure = e; |
| failure.printStackTrace(System.out); |
| return; |
| } |
| } |
| } |
| } |