| diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java |
| index b61d152..9d444bb 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java |
| @@ -19,7 +19,6 @@ package org.apache.lucene.index; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| -import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| @@ -213,18 +212,18 @@ final class DocumentsWriter { |
| infoStream.message("DW", "abort"); |
| } |
| |
| - final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator(); |
| - while (threadsIterator.hasNext()) { |
| - final ThreadState perThread = threadsIterator.next(); |
| + final int limit = perThreadPool.getActiveThreadState(); |
| + for (int i = 0; i < limit; i++) { |
| + final ThreadState perThread = perThreadPool.getThreadState(i); |
| perThread.lock(); |
| try { |
| if (perThread.isActive()) { // we might be closed |
| try { |
| - perThread.perThread.abort(); |
| + perThread.dwpt.abort(); |
| } catch (IOException ex) { |
| // continue |
| } finally { |
| - perThread.perThread.checkAndResetHasAborted(); |
| + perThread.dwpt.checkAndResetHasAborted(); |
| flushControl.doOnAbort(perThread); |
| } |
| } else { |
| @@ -338,7 +337,7 @@ final class DocumentsWriter { |
| assert false: "perThread is not active but we are still open"; |
| } |
| |
| - final DocumentsWriterPerThread dwpt = perThread.perThread; |
| + final DocumentsWriterPerThread dwpt = perThread.dwpt; |
| try { |
| final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); |
| numDocsInRAM.addAndGet(docCount); |
| @@ -372,7 +371,7 @@ final class DocumentsWriter { |
| assert false: "perThread is not active but we are still open"; |
| } |
| |
| - final DocumentsWriterPerThread dwpt = perThread.perThread; |
| + final DocumentsWriterPerThread dwpt = perThread.dwpt; |
| try { |
| dwpt.updateDocument(doc, analyzer, delTerm); |
| numDocsInRAM.incrementAndGet(); |
| @@ -587,22 +586,4 @@ final class DocumentsWriter { |
| } |
| |
| } |
| - |
| - |
| - |
| - |
| - // use by IW during close to assert all DWPT are inactive after final flush |
| - boolean assertNoActiveDWPT() { |
| - Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator(); |
| - while(activePerThreadsIterator.hasNext()) { |
| - ThreadState next = activePerThreadsIterator.next(); |
| - next.lock(); |
| - try { |
| - assert !next.isActive(); |
| - } finally { |
| - next.unlock(); |
| - } |
| - } |
| - return true; |
| - } |
| } |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java |
| index d4c8be3..18b4769 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java |
| @@ -18,8 +18,8 @@ package org.apache.lucene.index; |
| */ |
| import java.io.IOException; |
| import java.util.ArrayList; |
| +import java.util.IdentityHashMap; |
| import java.util.List; |
| -import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Queue; |
| @@ -51,6 +51,8 @@ public final class DocumentsWriterFlushControl { |
| private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>(); |
| // only for safety reasons if a DWPT is close to the RAM limit |
| private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>(); |
| + private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<DocumentsWriterPerThread, Long>(); |
| + |
| |
| double maxConfiguredRamBuffer = 0; |
| long peakActiveBytes = 0;// only with assert |
| @@ -61,7 +63,6 @@ public final class DocumentsWriterFlushControl { |
| private final DocumentsWriterPerThreadPool perThreadPool; |
| private final FlushPolicy flushPolicy; |
| private boolean closed = false; |
| - private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>(); |
| private final DocumentsWriter documentsWriter; |
| private final IndexWriterConfig config; |
| |
| @@ -122,7 +123,7 @@ public final class DocumentsWriterFlushControl { |
| } |
| |
| private void commitPerThreadBytes(ThreadState perThread) { |
| - final long delta = perThread.perThread.bytesUsed() |
| + final long delta = perThread.dwpt.bytesUsed() |
| - perThread.bytesUsed; |
| perThread.bytesUsed += delta; |
| /* |
| @@ -212,7 +213,7 @@ public final class DocumentsWriterFlushControl { |
| */ |
| public synchronized void setFlushPending(ThreadState perThread) { |
| assert !perThread.flushPending; |
| - if (perThread.perThread.getNumDocsInRAM() > 0) { |
| + if (perThread.dwpt.getNumDocsInRAM() > 0) { |
| perThread.flushPending = true; // write access synced |
| final long bytes = perThread.bytesUsed; |
| flushBytes += bytes; |
| @@ -295,18 +296,21 @@ public final class DocumentsWriterFlushControl { |
| } |
| |
| DocumentsWriterPerThread nextPendingFlush() { |
| + int numPending; |
| + boolean fullFlush; |
| synchronized (this) { |
| final DocumentsWriterPerThread poll; |
| if ((poll = flushQueue.poll()) != null) { |
| stallControl.updateStalled(this); |
| return poll; |
| } |
| + fullFlush = this.fullFlush; |
| + numPending = this.numPending; |
| } |
| if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush |
| - final Iterator<ThreadState> allActiveThreads = perThreadPool |
| - .getActivePerThreadsIterator(); |
| - while (allActiveThreads.hasNext() && numPending > 0) { |
| - ThreadState next = allActiveThreads.next(); |
| + final int limit = perThreadPool.getActiveThreadState(); |
| + for (int i = 0; i < limit && numPending > 0; i++) { |
| + final ThreadState next = perThreadPool.getThreadState(i); |
| if (next.flushPending) { |
| final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next); |
| if (dwpt != null) { |
| @@ -327,9 +331,29 @@ public final class DocumentsWriterFlushControl { |
| /** |
| * Returns an iterator that provides access to all currently active {@link ThreadState}s |
| */ |
| - public Iterator<ThreadState> allActiveThreads() { |
| - return perThreadPool.getActivePerThreadsIterator(); |
| + public Iterator<ThreadState> allActiveThreadStates() { |
| + return getPerThreadsIterator(perThreadPool.getActiveThreadState()); |
| } |
| + |
| + private Iterator<ThreadState> getPerThreadsIterator(final int upto) { |
| + return new Iterator<ThreadState>() { |
| + int i = 0; |
| + |
| + public boolean hasNext() { |
| + return i < upto; |
| + } |
| + |
| + public ThreadState next() { |
| + return perThreadPool.getThreadState(i++); |
| + } |
| + |
| + public void remove() { |
| + throw new UnsupportedOperationException("remove() not supported."); |
| + } |
| + }; |
| + } |
| + |
| + |
| |
| synchronized void doOnDelete() { |
| // pass null this is a global delete no update |
| @@ -369,7 +393,7 @@ public final class DocumentsWriterFlushControl { |
| boolean success = false; |
| try { |
| if (perThread.isActive() |
| - && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) { |
| + && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { |
| // There is a flush-all in process and this DWPT is |
| // now stale -- enroll it for flush and try for |
| // another DWPT: |
| @@ -397,23 +421,23 @@ public final class DocumentsWriterFlushControl { |
| DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1); |
| documentsWriter.deleteQueue = newQueue; |
| } |
| - final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator(); |
| - while (allActiveThreads.hasNext()) { |
| - final ThreadState next = allActiveThreads.next(); |
| + final int limit = perThreadPool.getActiveThreadState(); |
| + for (int i = 0; i < limit; i++) { |
| + final ThreadState next = perThreadPool.getThreadState(i); |
| next.lock(); |
| try { |
| if (!next.isActive()) { |
| continue; |
| } |
| - assert next.perThread.deleteQueue == flushingQueue |
| - || next.perThread.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " |
| + assert next.dwpt.deleteQueue == flushingQueue |
| + || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " |
| + flushingQueue |
| + " currentqueue: " |
| + documentsWriter.deleteQueue |
| + " perThread queue: " |
| - + next.perThread.deleteQueue |
| - + " numDocsInRam: " + next.perThread.getNumDocsInRAM(); |
| - if (next.perThread.deleteQueue != flushingQueue) { |
| + + next.dwpt.deleteQueue |
| + + " numDocsInRam: " + next.dwpt.getNumDocsInRAM(); |
| + if (next.dwpt.deleteQueue != flushingQueue) { |
| // this one is already a new DWPT |
| continue; |
| } |
| @@ -437,12 +461,12 @@ public final class DocumentsWriterFlushControl { |
| } |
| |
| private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { |
| - final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator(); |
| - while (allActiveThreads.hasNext()) { |
| - final ThreadState next = allActiveThreads.next(); |
| + final int limit = perThreadPool.getActiveThreadState(); |
| + for (int i = 0; i < limit; i++) { |
| + final ThreadState next = perThreadPool.getThreadState(i); |
| next.lock(); |
| try { |
| - assert !next.isActive() || next.perThread.deleteQueue == queue; |
| + assert !next.isActive() || next.dwpt.deleteQueue == queue; |
| } finally { |
| next.unlock(); |
| } |
| @@ -454,9 +478,9 @@ public final class DocumentsWriterFlushControl { |
| |
| void addFlushableState(ThreadState perThread) { |
| if (documentsWriter.infoStream.isEnabled("DWFC")) { |
| - documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread); |
| + documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.dwpt); |
| } |
| - final DocumentsWriterPerThread dwpt = perThread.perThread; |
| + final DocumentsWriterPerThread dwpt = perThread.dwpt; |
| assert perThread.isHeldByCurrentThread(); |
| assert perThread.isActive(); |
| assert fullFlush; |
| @@ -473,9 +497,9 @@ public final class DocumentsWriterFlushControl { |
| } |
| } else { |
| if (closed) { |
| - perThread.resetWriter(null); // make this state inactive |
| + perThreadPool.deactivateThreadState(perThread); // make this state inactive |
| } else { |
| - dwpt.initialize(); |
| + perThreadPool.reinitThreadState(perThread); |
| } |
| } |
| } |
| @@ -597,4 +621,6 @@ public final class DocumentsWriterFlushControl { |
| boolean anyStalledThreads() { |
| return stallControl.anyStalledThreads(); |
| } |
| + |
| + |
| } |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java |
| index 8b959ca..9d5bf30 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java |
| @@ -16,7 +16,6 @@ package org.apache.lucene.index; |
| * limitations under the License. |
| */ |
| |
| -import java.util.Iterator; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; |
| @@ -38,11 +37,6 @@ import org.apache.lucene.util.SetOnce; |
| * </p> |
| */ |
| public abstract class DocumentsWriterPerThreadPool { |
| - /** The maximum number of simultaneous threads that may be |
| - * indexing documents at once in IndexWriter; if more |
| - * than this many threads arrive they will wait for |
| - * others to finish. */ |
| - public final static int DEFAULT_MAX_THREAD_STATES = 8; |
| |
| /** |
| * {@link ThreadState} references and guards a |
| @@ -57,17 +51,18 @@ public abstract class DocumentsWriterPerThreadPool { |
| */ |
| @SuppressWarnings("serial") |
| public final static class ThreadState extends ReentrantLock { |
| - // package private for FlushPolicy |
| - DocumentsWriterPerThread perThread; |
| + DocumentsWriterPerThread dwpt; |
| + // TODO this should really be part of DocumentsWriterFlushControl |
| // write access guarded by DocumentsWriterFlushControl |
| volatile boolean flushPending = false; |
| + // TODO this should really be part of DocumentsWriterFlushControl |
| // write access guarded by DocumentsWriterFlushControl |
| long bytesUsed = 0; |
| // guarded by Reentrant lock |
| private boolean isActive = true; |
| |
| - ThreadState(DocumentsWriterPerThread perThread) { |
| - this.perThread = perThread; |
| + ThreadState(DocumentsWriterPerThread dpwt) { |
| + this.dwpt = dpwt; |
| } |
| |
| /** |
| @@ -76,12 +71,12 @@ public abstract class DocumentsWriterPerThreadPool { |
| * for indexing anymore. |
| * @see #isActive() |
| */ |
| - void resetWriter(DocumentsWriterPerThread perThread) { |
| + private void resetWriter(DocumentsWriterPerThread dwpt) { |
| assert this.isHeldByCurrentThread(); |
| - if (perThread == null) { |
| + if (dwpt == null) { |
| isActive = false; |
| } |
| - this.perThread = perThread; |
| + this.dwpt = dwpt; |
| this.bytesUsed = 0; |
| this.flushPending = false; |
| } |
| @@ -112,7 +107,7 @@ public abstract class DocumentsWriterPerThreadPool { |
| public DocumentsWriterPerThread getDocumentsWriterPerThread() { |
| assert this.isHeldByCurrentThread(); |
| // public for FlushPolicy |
| - return perThread; |
| + return dwpt; |
| } |
| |
| /** |
| @@ -124,40 +119,37 @@ public abstract class DocumentsWriterPerThreadPool { |
| } |
| } |
| |
| - private final ThreadState[] perThreads; |
| + private final ThreadState[] threadStates; |
| private volatile int numThreadStatesActive; |
| - private FieldNumberBiMap globalFieldMap; |
| + private final SetOnce<FieldNumberBiMap> globalFieldMap = new SetOnce<FieldNumberBiMap>(); |
| private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>(); |
| |
| /** |
| - * Creates a new {@link DocumentsWriterPerThreadPool} with max. |
| - * {@link #DEFAULT_MAX_THREAD_STATES} thread states. |
| + * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s. |
| */ |
| - public DocumentsWriterPerThreadPool() { |
| - this(DEFAULT_MAX_THREAD_STATES); |
| - } |
| - |
| - public DocumentsWriterPerThreadPool(int maxNumPerThreads) { |
| - maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads; |
| - perThreads = new ThreadState[maxNumPerThreads]; |
| + public DocumentsWriterPerThreadPool(int maxNumThreadStates) { |
| + if (maxNumThreadStates < 1) { |
| + throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates); |
| + } |
| + threadStates = new ThreadState[maxNumThreadStates]; |
| numThreadStatesActive = 0; |
| } |
| |
| public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) { |
| this.documentsWriter.set(documentsWriter); // thread pool is bound to DW |
| - this.globalFieldMap = globalFieldMap; |
| - for (int i = 0; i < perThreads.length; i++) { |
| + this.globalFieldMap.set(globalFieldMap); |
| + for (int i = 0; i < threadStates.length; i++) { |
| final FieldInfos infos = new FieldInfos(globalFieldMap); |
| - perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); |
| + threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); |
| } |
| } |
| - |
| + |
| /** |
| * Returns the max number of {@link ThreadState} instances available in this |
| * {@link DocumentsWriterPerThreadPool} |
| */ |
| public int getMaxThreadStates() { |
| - return perThreads.length; |
| + return threadStates.length; |
| } |
| |
| /** |
| @@ -178,16 +170,16 @@ public abstract class DocumentsWriterPerThreadPool { |
| * <code>null</code> |
| */ |
| public synchronized ThreadState newThreadState() { |
| - if (numThreadStatesActive < perThreads.length) { |
| - final ThreadState threadState = perThreads[numThreadStatesActive]; |
| + if (numThreadStatesActive < threadStates.length) { |
| + final ThreadState threadState = threadStates[numThreadStatesActive]; |
| threadState.lock(); // lock so nobody else will get this ThreadState |
| boolean unlock = true; |
| try { |
| if (threadState.isActive()) { |
| // unreleased thread states are deactivated during DW#close() |
| numThreadStatesActive++; // increment will publish the ThreadState |
| - assert threadState.perThread != null; |
| - threadState.perThread.initialize(); |
| + assert threadState.dwpt != null; |
| + threadState.dwpt.initialize(); |
| unlock = false; |
| return threadState; |
| } |
| @@ -205,12 +197,12 @@ public abstract class DocumentsWriterPerThreadPool { |
| } |
| |
| private synchronized boolean assertUnreleasedThreadStatesInactive() { |
| - for (int i = numThreadStatesActive; i < perThreads.length; i++) { |
| - assert perThreads[i].tryLock() : "unreleased threadstate should not be locked"; |
| + for (int i = numThreadStatesActive; i < threadStates.length; i++) { |
| + assert threadStates[i].tryLock() : "unreleased threadstate should not be locked"; |
| try { |
| - assert !perThreads[i].isActive() : "expected unreleased thread state to be inactive"; |
| + assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive"; |
| } finally { |
| - perThreads[i].unlock(); |
| + threadStates[i].unlock(); |
| } |
| } |
| return true; |
| @@ -220,8 +212,8 @@ public abstract class DocumentsWriterPerThreadPool { |
| * Deactivate all unreleased threadstates |
| */ |
| protected synchronized void deactivateUnreleasedStates() { |
| - for (int i = numThreadStatesActive; i < perThreads.length; i++) { |
| - final ThreadState threadState = perThreads[i]; |
| + for (int i = numThreadStatesActive; i < threadStates.length; i++) { |
| + final ThreadState threadState = threadStates[i]; |
| threadState.lock(); |
| try { |
| threadState.resetWriter(null); |
| @@ -233,9 +225,10 @@ public abstract class DocumentsWriterPerThreadPool { |
| |
| protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) { |
| assert threadState.isHeldByCurrentThread(); |
| - final DocumentsWriterPerThread dwpt = threadState.perThread; |
| + assert globalFieldMap.get() != null; |
| + final DocumentsWriterPerThread dwpt = threadState.dwpt; |
| if (!closed) { |
| - final FieldInfos infos = new FieldInfos(globalFieldMap); |
| + final FieldInfos infos = new FieldInfos(globalFieldMap.get()); |
| final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos); |
| newDwpt.initialize(); |
| threadState.resetWriter(newDwpt); |
| @@ -251,45 +244,19 @@ public abstract class DocumentsWriterPerThreadPool { |
| |
| public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); |
| |
| + |
| /** |
| - * Returns an iterator providing access to all {@link ThreadState} |
| - * instances. |
| - */ |
| - // TODO: new Iterator per indexed doc is overkill...? |
| - public Iterator<ThreadState> getAllPerThreadsIterator() { |
| - return getPerThreadsIterator(this.perThreads.length); |
| - } |
| - |
| - /** |
| - * Returns an iterator providing access to all active {@link ThreadState} |
| - * instances. |
| - * <p> |
| - * Note: The returned iterator will only iterator |
| - * {@link ThreadState}s that are active at the point in time when this method |
| - * has been called. |
| + * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the |
| + * given ord. |
| * |
| + * @param ord |
| + * the ordinal of the {@link ThreadState} |
| + * @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the |
| + * given ord. |
| */ |
| - // TODO: new Iterator per indexed doc is overkill...? |
| - public Iterator<ThreadState> getActivePerThreadsIterator() { |
| - return getPerThreadsIterator(numThreadStatesActive); |
| - } |
| - |
| - private Iterator<ThreadState> getPerThreadsIterator(final int upto) { |
| - return new Iterator<ThreadState>() { |
| - int i = 0; |
| - |
| - public boolean hasNext() { |
| - return i < upto; |
| - } |
| - |
| - public ThreadState next() { |
| - return perThreads[i++]; |
| - } |
| - |
| - public void remove() { |
| - throw new UnsupportedOperationException("remove() not supported."); |
| - } |
| - }; |
| + ThreadState getThreadState(int ord) { |
| + assert ord < numThreadStatesActive; |
| + return threadStates[ord]; |
| } |
| |
| /** |
| @@ -299,14 +266,59 @@ public abstract class DocumentsWriterPerThreadPool { |
| */ |
| protected ThreadState minContendedThreadState() { |
| ThreadState minThreadState = null; |
| - // TODO: new Iterator per indexed doc is overkill...? |
| - final Iterator<ThreadState> it = getActivePerThreadsIterator(); |
| - while (it.hasNext()) { |
| - final ThreadState state = it.next(); |
| + final int limit = numThreadStatesActive; |
| + for (int i = 0; i < limit; i++) { |
| + final ThreadState state = threadStates[i]; |
| if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) { |
| minThreadState = state; |
| } |
| } |
| return minThreadState; |
| } |
| + |
| + /** |
| + * Returns the number of currently deactivated {@link ThreadState} instances. |
| + * A deactivated {@link ThreadState} should not be used for indexing anymore. |
| + * |
| + * @return the number of currently deactivated {@link ThreadState} instances. |
| + */ |
| + int numDeactivatedThreadStates() { |
| + int count = 0; |
| + for (int i = 0; i < threadStates.length; i++) { |
| + final ThreadState threadState = threadStates[i]; |
| + threadState.lock(); |
| + try { |
| + if (!threadState.isActive) { |
| + count++; |
| + } |
| + } finally { |
| + threadState.unlock(); |
| + } |
| + } |
| + return count; |
| + } |
| + |
| + /** |
| + * Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can |
| + * not be used for indexing anymore once they are deactivated. This method should only be used |
| + * if the parent {@link DocumentsWriter} is closed or aborted. |
| + * |
| + * @param threadState the state to deactivate |
| + */ |
| + void deactivateThreadState(ThreadState threadState) { |
| + assert threadState.isActive(); |
| + threadState.resetWriter(null); |
| + } |
| + |
| + /** |
| + * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should |
| + * only be reinitialized if it is active without any pending documents. |
| + * |
| + * @param threadState the state to reinitialize |
| + */ |
| + void reinitThreadState(ThreadState threadState) { |
| + assert threadState.isActive; |
| + assert threadState.dwpt.getNumDocsInRAM() == 0; |
| + threadState.dwpt.initialize(); |
| + } |
| } |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java |
| index 4c7ca1f..f7bf325 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java |
| @@ -72,7 +72,7 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { |
| @Override |
| public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { |
| if (flushOnDocCount() |
| - && state.perThread.getNumDocsInRAM() >= indexWriterConfig |
| + && state.dwpt.getNumDocsInRAM() >= indexWriterConfig |
| .getMaxBufferedDocs()) { |
| // Flush this state by num docs |
| control.setFlushPending(state); |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java |
| index 4f85eae..93ac3f0 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java |
| @@ -75,9 +75,7 @@ public abstract class FlushPolicy { |
| */ |
| public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) { |
| onInsert(control, state); |
| - if (!state.flushPending) { |
| - onDelete(control, state); |
| - } |
| + onDelete(control, state); |
| } |
| |
| /** |
| @@ -107,17 +105,17 @@ public abstract class FlushPolicy { |
| */ |
| protected ThreadState findLargestNonPendingWriter( |
| DocumentsWriterFlushControl control, ThreadState perThreadState) { |
| - assert perThreadState.perThread.getNumDocsInRAM() > 0; |
| + assert perThreadState.dwpt.getNumDocsInRAM() > 0; |
| long maxRamSoFar = perThreadState.bytesUsed; |
| // the dwpt which needs to be flushed eventually |
| ThreadState maxRamUsingThreadState = perThreadState; |
| assert !perThreadState.flushPending : "DWPT should have flushed"; |
| - Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads(); |
| + Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates(); |
| while (activePerThreadsIterator.hasNext()) { |
| ThreadState next = activePerThreadsIterator.next(); |
| if (!next.flushPending) { |
| final long nextRam = next.bytesUsed; |
| - if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) { |
| + if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) { |
| maxRamSoFar = nextRam; |
| maxRamUsingThreadState = next; |
| } |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java |
| index 6b3160c..b916b70 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java |
| @@ -1144,7 +1144,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { |
| synchronized(this) { |
| closed = true; |
| } |
| - assert oldWriter.assertNoActiveDWPT(); |
| + assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates(); |
| } catch (OutOfMemoryError oom) { |
| handleOOM(oom, "closeInternal"); |
| } finally { |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java |
| index a4e33f2..6adaebe 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java |
| @@ -94,6 +94,13 @@ public final class IndexWriterConfig implements Cloneable { |
| |
| /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */ |
| public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945; |
| + |
| + /** The maximum number of simultaneous threads that may be |
| + * indexing documents at once in IndexWriter; if more |
| + * than this many threads arrive they will wait for |
| + * others to finish. Default value is 8. */ |
| + public final static int DEFAULT_MAX_THREAD_STATES = 8; |
| + |
| /** |
| * Sets the default (for any instance) maximum time to wait for a write lock |
| * (in milliseconds). |
| @@ -172,7 +179,7 @@ public final class IndexWriterConfig implements Cloneable { |
| } |
| flushPolicy = new FlushByRamOrCountsPolicy(); |
| readerPooling = DEFAULT_READER_POOLING; |
| - indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(); |
| + indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES); |
| readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR; |
| perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; |
| } |
| @@ -554,8 +561,8 @@ public final class IndexWriterConfig implements Cloneable { |
| * IndexWriter to assign thread-states to incoming indexing threads. If no |
| * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use |
| * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of |
| - * thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see |
| - * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}). |
| + * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see |
| + * {@link #DEFAULT_MAX_THREAD_STATES}). |
| * </p> |
| * <p> |
| * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with |
| diff --git a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java |
| index b5f0b6c..c07ece5 100644 |
| --- a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java |
| +++ b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java |
| @@ -34,13 +34,8 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT |
| private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>(); |
| |
| /** |
| - * Creates a new {@link DocumentsWriterPerThreadPool} with max. |
| - * {@link #DEFAULT_MAX_THREAD_STATES} thread states. |
| + * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s. |
| */ |
| - public ThreadAffinityDocumentsWriterThreadPool() { |
| - this(DEFAULT_MAX_THREAD_STATES); |
| - } |
| - |
| public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) { |
| super(maxNumPerThreads); |
| assert getMaxThreadStates() >= 1; |
| diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java |
| index 5217de3..a85589d 100644 |
| --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java |
| +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java |
| @@ -281,10 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { |
| } |
| |
| protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) { |
| - Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads(); |
| + Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates(); |
| long bytesUsed = 0; |
| while (allActiveThreads.hasNext()) { |
| - bytesUsed += allActiveThreads.next().perThread.bytesUsed(); |
| + bytesUsed += allActiveThreads.next().dwpt.bytesUsed(); |
| } |
| assertEquals(bytesUsed, flushControl.activeBytes()); |
| } |
| @@ -343,7 +343,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { |
| if (state.flushPending) { |
| toFlush = state; |
| } else if (flushOnDeleteTerms() |
| - && state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig |
| + && state.dwpt.pendingDeletes.numTermDeletes.get() >= indexWriterConfig |
| .getMaxBufferedDeleteTerms()) { |
| toFlush = state; |
| } else { |
| @@ -376,7 +376,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { |
| if (state.flushPending) { |
| toFlush = state; |
| } else if (flushOnDocCount() |
| - && state.perThread.getNumDocsInRAM() >= indexWriterConfig |
| + && state.dwpt.getNumDocsInRAM() >= indexWriterConfig |
| .getMaxBufferedDocs()) { |
| toFlush = state; |
| } else if (flushOnRAM() |
| @@ -397,7 +397,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { |
| hasMarkedPending = true; |
| } else { |
| peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush); |
| - peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(), |
| + peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(), |
| peakDocCountWithoutFlush); |
| } |
| |
| @@ -409,7 +409,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { |
| |
| static void findPending(DocumentsWriterFlushControl flushControl, |
| ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) { |
| - Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads(); |
| + Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates(); |
| while (allActiveThreads.hasNext()) { |
| ThreadState next = allActiveThreads.next(); |
| if (next.flushPending) { |