blob: c273cdac3212cb30e9edc6c2dda1b34bdc2664f2 [file] [log] [blame]
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index b61d152..9d444bb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -213,18 +212,18 @@ final class DocumentsWriter {
infoStream.message("DW", "abort");
}
- final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- while (threadsIterator.hasNext()) {
- final ThreadState perThread = threadsIterator.next();
+ final int limit = perThreadPool.getActiveThreadState();
+ for (int i = 0; i < limit; i++) {
+ final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
try {
if (perThread.isActive()) { // we might be closed
try {
- perThread.perThread.abort();
+ perThread.dwpt.abort();
} catch (IOException ex) {
// continue
} finally {
- perThread.perThread.checkAndResetHasAborted();
+ perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
} else {
@@ -338,7 +337,7 @@ final class DocumentsWriter {
assert false: "perThread is not active but we are still open";
}
- final DocumentsWriterPerThread dwpt = perThread.perThread;
+ final DocumentsWriterPerThread dwpt = perThread.dwpt;
try {
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
numDocsInRAM.addAndGet(docCount);
@@ -372,7 +371,7 @@ final class DocumentsWriter {
assert false: "perThread is not active but we are still open";
}
- final DocumentsWriterPerThread dwpt = perThread.perThread;
+ final DocumentsWriterPerThread dwpt = perThread.dwpt;
try {
dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
@@ -587,22 +586,4 @@ final class DocumentsWriter {
}
}
-
-
-
-
- // use by IW during close to assert all DWPT are inactive after final flush
- boolean assertNoActiveDWPT() {
- Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();
- while(activePerThreadsIterator.hasNext()) {
- ThreadState next = activePerThreadsIterator.next();
- next.lock();
- try {
- assert !next.isActive();
- } finally {
- next.unlock();
- }
- }
- return true;
- }
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index d4c8be3..18b4769 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -18,8 +18,8 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.ArrayList;
+import java.util.IdentityHashMap;
import java.util.List;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
@@ -51,6 +51,8 @@ public final class DocumentsWriterFlushControl {
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
// only for safety reasons if a DWPT is close to the RAM limit
private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
+ private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<DocumentsWriterPerThread, Long>();
+
double maxConfiguredRamBuffer = 0;
long peakActiveBytes = 0;// only with assert
@@ -61,7 +63,6 @@ public final class DocumentsWriterFlushControl {
private final DocumentsWriterPerThreadPool perThreadPool;
private final FlushPolicy flushPolicy;
private boolean closed = false;
- private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
private final DocumentsWriter documentsWriter;
private final IndexWriterConfig config;
@@ -122,7 +123,7 @@ public final class DocumentsWriterFlushControl {
}
private void commitPerThreadBytes(ThreadState perThread) {
- final long delta = perThread.perThread.bytesUsed()
+ final long delta = perThread.dwpt.bytesUsed()
- perThread.bytesUsed;
perThread.bytesUsed += delta;
/*
@@ -212,7 +213,7 @@ public final class DocumentsWriterFlushControl {
*/
public synchronized void setFlushPending(ThreadState perThread) {
assert !perThread.flushPending;
- if (perThread.perThread.getNumDocsInRAM() > 0) {
+ if (perThread.dwpt.getNumDocsInRAM() > 0) {
perThread.flushPending = true; // write access synced
final long bytes = perThread.bytesUsed;
flushBytes += bytes;
@@ -295,18 +296,21 @@ public final class DocumentsWriterFlushControl {
}
DocumentsWriterPerThread nextPendingFlush() {
+ int numPending;
+ boolean fullFlush;
synchronized (this) {
final DocumentsWriterPerThread poll;
if ((poll = flushQueue.poll()) != null) {
stallControl.updateStalled(this);
return poll;
}
+ fullFlush = this.fullFlush;
+ numPending = this.numPending;
}
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
- final Iterator<ThreadState> allActiveThreads = perThreadPool
- .getActivePerThreadsIterator();
- while (allActiveThreads.hasNext() && numPending > 0) {
- ThreadState next = allActiveThreads.next();
+ final int limit = perThreadPool.getActiveThreadState();
+ for (int i = 0; i < limit && numPending > 0; i++) {
+ final ThreadState next = perThreadPool.getThreadState(i);
if (next.flushPending) {
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
if (dwpt != null) {
@@ -327,9 +331,29 @@ public final class DocumentsWriterFlushControl {
/**
* Returns an iterator that provides access to all currently active {@link ThreadState}s
*/
- public Iterator<ThreadState> allActiveThreads() {
- return perThreadPool.getActivePerThreadsIterator();
+ public Iterator<ThreadState> allActiveThreadStates() {
+ return getPerThreadsIterator(perThreadPool.getActiveThreadState());
}
+
+ private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
+ return new Iterator<ThreadState>() {
+ int i = 0;
+
+ public boolean hasNext() {
+ return i < upto;
+ }
+
+ public ThreadState next() {
+ return perThreadPool.getThreadState(i++);
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported.");
+ }
+ };
+ }
+
+
synchronized void doOnDelete() {
// pass null this is a global delete no update
@@ -369,7 +393,7 @@ public final class DocumentsWriterFlushControl {
boolean success = false;
try {
if (perThread.isActive()
- && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
+ && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
@@ -397,23 +421,23 @@ public final class DocumentsWriterFlushControl {
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
- final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
- while (allActiveThreads.hasNext()) {
- final ThreadState next = allActiveThreads.next();
+ final int limit = perThreadPool.getActiveThreadState();
+ for (int i = 0; i < limit; i++) {
+ final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
if (!next.isActive()) {
continue;
}
- assert next.perThread.deleteQueue == flushingQueue
- || next.perThread.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+ assert next.dwpt.deleteQueue == flushingQueue
+ || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+ flushingQueue
+ " currentqueue: "
+ documentsWriter.deleteQueue
+ " perThread queue: "
- + next.perThread.deleteQueue
- + " numDocsInRam: " + next.perThread.getNumDocsInRAM();
- if (next.perThread.deleteQueue != flushingQueue) {
+ + next.dwpt.deleteQueue
+ + " numDocsInRam: " + next.dwpt.getNumDocsInRAM();
+ if (next.dwpt.deleteQueue != flushingQueue) {
// this one is already a new DWPT
continue;
}
@@ -437,12 +461,12 @@ public final class DocumentsWriterFlushControl {
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
- final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
- while (allActiveThreads.hasNext()) {
- final ThreadState next = allActiveThreads.next();
+ final int limit = perThreadPool.getActiveThreadState();
+ for (int i = 0; i < limit; i++) {
+ final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
- assert !next.isActive() || next.perThread.deleteQueue == queue;
+ assert !next.isActive() || next.dwpt.deleteQueue == queue;
} finally {
next.unlock();
}
@@ -454,9 +478,9 @@ public final class DocumentsWriterFlushControl {
void addFlushableState(ThreadState perThread) {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
- documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
+ documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.dwpt);
}
- final DocumentsWriterPerThread dwpt = perThread.perThread;
+ final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
assert perThread.isActive();
assert fullFlush;
@@ -473,9 +497,9 @@ public final class DocumentsWriterFlushControl {
}
} else {
if (closed) {
- perThread.resetWriter(null); // make this state inactive
+ perThreadPool.deactivateThreadState(perThread); // make this state inactive
} else {
- dwpt.initialize();
+ perThreadPool.reinitThreadState(perThread);
}
}
}
@@ -597,4 +621,6 @@ public final class DocumentsWriterFlushControl {
boolean anyStalledThreads() {
return stallControl.anyStalledThreads();
}
+
+
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 8b959ca..9d5bf30 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -16,7 +16,6 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
@@ -38,11 +37,6 @@ import org.apache.lucene.util.SetOnce;
* </p>
*/
public abstract class DocumentsWriterPerThreadPool {
- /** The maximum number of simultaneous threads that may be
- * indexing documents at once in IndexWriter; if more
- * than this many threads arrive they will wait for
- * others to finish. */
- public final static int DEFAULT_MAX_THREAD_STATES = 8;
/**
* {@link ThreadState} references and guards a
@@ -57,17 +51,18 @@ public abstract class DocumentsWriterPerThreadPool {
*/
@SuppressWarnings("serial")
public final static class ThreadState extends ReentrantLock {
- // package private for FlushPolicy
- DocumentsWriterPerThread perThread;
+ DocumentsWriterPerThread dwpt;
+ // TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
+ // TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
- ThreadState(DocumentsWriterPerThread perThread) {
- this.perThread = perThread;
+ ThreadState(DocumentsWriterPerThread dpwt) {
+ this.dwpt = dpwt;
}
/**
@@ -76,12 +71,12 @@ public abstract class DocumentsWriterPerThreadPool {
* for indexing anymore.
* @see #isActive()
*/
- void resetWriter(DocumentsWriterPerThread perThread) {
+ private void resetWriter(DocumentsWriterPerThread dwpt) {
assert this.isHeldByCurrentThread();
- if (perThread == null) {
+ if (dwpt == null) {
isActive = false;
}
- this.perThread = perThread;
+ this.dwpt = dwpt;
this.bytesUsed = 0;
this.flushPending = false;
}
@@ -112,7 +107,7 @@ public abstract class DocumentsWriterPerThreadPool {
public DocumentsWriterPerThread getDocumentsWriterPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
- return perThread;
+ return dwpt;
}
/**
@@ -124,40 +119,37 @@ public abstract class DocumentsWriterPerThreadPool {
}
}
- private final ThreadState[] perThreads;
+ private final ThreadState[] threadStates;
private volatile int numThreadStatesActive;
- private FieldNumberBiMap globalFieldMap;
+ private final SetOnce<FieldNumberBiMap> globalFieldMap = new SetOnce<FieldNumberBiMap>();
private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
/**
- * Creates a new {@link DocumentsWriterPerThreadPool} with max.
- * {@link #DEFAULT_MAX_THREAD_STATES} thread states.
+ * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
- public DocumentsWriterPerThreadPool() {
- this(DEFAULT_MAX_THREAD_STATES);
- }
-
- public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
- maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
- perThreads = new ThreadState[maxNumPerThreads];
+ public DocumentsWriterPerThreadPool(int maxNumThreadStates) {
+ if (maxNumThreadStates < 1) {
+ throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
+ }
+ threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
}
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
- this.globalFieldMap = globalFieldMap;
- for (int i = 0; i < perThreads.length; i++) {
+ this.globalFieldMap.set(globalFieldMap);
+ for (int i = 0; i < threadStates.length; i++) {
final FieldInfos infos = new FieldInfos(globalFieldMap);
- perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
+ threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
}
}
-
+
/**
* Returns the max number of {@link ThreadState} instances available in this
* {@link DocumentsWriterPerThreadPool}
*/
public int getMaxThreadStates() {
- return perThreads.length;
+ return threadStates.length;
}
/**
@@ -178,16 +170,16 @@ public abstract class DocumentsWriterPerThreadPool {
* <code>null</code>
*/
public synchronized ThreadState newThreadState() {
- if (numThreadStatesActive < perThreads.length) {
- final ThreadState threadState = perThreads[numThreadStatesActive];
+ if (numThreadStatesActive < threadStates.length) {
+ final ThreadState threadState = threadStates[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState
boolean unlock = true;
try {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
- assert threadState.perThread != null;
- threadState.perThread.initialize();
+ assert threadState.dwpt != null;
+ threadState.dwpt.initialize();
unlock = false;
return threadState;
}
@@ -205,12 +197,12 @@ public abstract class DocumentsWriterPerThreadPool {
}
private synchronized boolean assertUnreleasedThreadStatesInactive() {
- for (int i = numThreadStatesActive; i < perThreads.length; i++) {
- assert perThreads[i].tryLock() : "unreleased threadstate should not be locked";
+ for (int i = numThreadStatesActive; i < threadStates.length; i++) {
+ assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
try {
- assert !perThreads[i].isActive() : "expected unreleased thread state to be inactive";
+ assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
} finally {
- perThreads[i].unlock();
+ threadStates[i].unlock();
}
}
return true;
@@ -220,8 +212,8 @@ public abstract class DocumentsWriterPerThreadPool {
* Deactivate all unreleased threadstates
*/
protected synchronized void deactivateUnreleasedStates() {
- for (int i = numThreadStatesActive; i < perThreads.length; i++) {
- final ThreadState threadState = perThreads[i];
+ for (int i = numThreadStatesActive; i < threadStates.length; i++) {
+ final ThreadState threadState = threadStates[i];
threadState.lock();
try {
threadState.resetWriter(null);
@@ -233,9 +225,10 @@ public abstract class DocumentsWriterPerThreadPool {
protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
- final DocumentsWriterPerThread dwpt = threadState.perThread;
+ assert globalFieldMap.get() != null;
+ final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
- final FieldInfos infos = new FieldInfos(globalFieldMap);
+ final FieldInfos infos = new FieldInfos(globalFieldMap.get());
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
@@ -251,45 +244,19 @@ public abstract class DocumentsWriterPerThreadPool {
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
+
/**
- * Returns an iterator providing access to all {@link ThreadState}
- * instances.
- */
- // TODO: new Iterator per indexed doc is overkill...?
- public Iterator<ThreadState> getAllPerThreadsIterator() {
- return getPerThreadsIterator(this.perThreads.length);
- }
-
- /**
- * Returns an iterator providing access to all active {@link ThreadState}
- * instances.
- * <p>
- * Note: The returned iterator will only iterator
- * {@link ThreadState}s that are active at the point in time when this method
- * has been called.
+ * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
+ * given ord.
*
+ * @param ord
+ * the ordinal of the {@link ThreadState}
+ * @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
+ * given ord.
*/
- // TODO: new Iterator per indexed doc is overkill...?
- public Iterator<ThreadState> getActivePerThreadsIterator() {
- return getPerThreadsIterator(numThreadStatesActive);
- }
-
- private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
- return new Iterator<ThreadState>() {
- int i = 0;
-
- public boolean hasNext() {
- return i < upto;
- }
-
- public ThreadState next() {
- return perThreads[i++];
- }
-
- public void remove() {
- throw new UnsupportedOperationException("remove() not supported.");
- }
- };
+ ThreadState getThreadState(int ord) {
+ assert ord < numThreadStatesActive;
+ return threadStates[ord];
}
/**
@@ -299,14 +266,59 @@ public abstract class DocumentsWriterPerThreadPool {
*/
protected ThreadState minContendedThreadState() {
ThreadState minThreadState = null;
- // TODO: new Iterator per indexed doc is overkill...?
- final Iterator<ThreadState> it = getActivePerThreadsIterator();
- while (it.hasNext()) {
- final ThreadState state = it.next();
+ final int limit = numThreadStatesActive;
+ for (int i = 0; i < limit; i++) {
+ final ThreadState state = threadStates[i];
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
+
+ /**
+ * Returns the number of currently deactivated {@link ThreadState} instances.
+ * A deactivated {@link ThreadState} should not be used for indexing anymore.
+ *
+ * @return the number of currently deactivated {@link ThreadState} instances.
+ */
+ int numDeactivatedThreadStates() {
+ int count = 0;
+ for (int i = 0; i < threadStates.length; i++) {
+ final ThreadState threadState = threadStates[i];
+ threadState.lock();
+ try {
+ if (!threadState.isActive) {
+ count++;
+ }
+ } finally {
+ threadState.unlock();
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can
+ * not be used for indexing anymore once they are deactivated. This method should only be used
+ * if the parent {@link DocumentsWriter} is closed or aborted.
+ *
+ * @param threadState the state to deactivate
+ */
+ void deactivateThreadState(ThreadState threadState) {
+ assert threadState.isActive();
+ threadState.resetWriter(null);
+ }
+
+ /**
+ * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
+ * only be reinitialized if it is active without any pending documents.
+ *
+ * @param threadState the state to reinitialize
+ */
+ void reinitThreadState(ThreadState threadState) {
+ assert threadState.isActive;
+ assert threadState.dwpt.getNumDocsInRAM() == 0;
+ threadState.dwpt.initialize();
+ }
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
index 4c7ca1f..f7bf325 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
@@ -72,7 +72,7 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
@Override
public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
if (flushOnDocCount()
- && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+ && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) {
// Flush this state by num docs
control.setFlushPending(state);
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
index 4f85eae..93ac3f0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
@@ -75,9 +75,7 @@ public abstract class FlushPolicy {
*/
public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
onInsert(control, state);
- if (!state.flushPending) {
- onDelete(control, state);
- }
+ onDelete(control, state);
}
/**
@@ -107,17 +105,17 @@ public abstract class FlushPolicy {
*/
protected ThreadState findLargestNonPendingWriter(
DocumentsWriterFlushControl control, ThreadState perThreadState) {
- assert perThreadState.perThread.getNumDocsInRAM() > 0;
+ assert perThreadState.dwpt.getNumDocsInRAM() > 0;
long maxRamSoFar = perThreadState.bytesUsed;
// the dwpt which needs to be flushed eventually
ThreadState maxRamUsingThreadState = perThreadState;
assert !perThreadState.flushPending : "DWPT should have flushed";
- Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads();
+ Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates();
while (activePerThreadsIterator.hasNext()) {
ThreadState next = activePerThreadsIterator.next();
if (!next.flushPending) {
final long nextRam = next.bytesUsed;
- if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
+ if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) {
maxRamSoFar = nextRam;
maxRamUsingThreadState = next;
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 6b3160c..b916b70 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1144,7 +1144,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized(this) {
closed = true;
}
- assert oldWriter.assertNoActiveDWPT();
+ assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
} catch (OutOfMemoryError oom) {
handleOOM(oom, "closeInternal");
} finally {
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index a4e33f2..6adaebe 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -94,6 +94,13 @@ public final class IndexWriterConfig implements Cloneable {
/** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
+
+ /** The maximum number of simultaneous threads that may be
+ * indexing documents at once in IndexWriter; if more
+ * than this many threads arrive they will wait for
+ * others to finish. Default value is 8. */
+ public final static int DEFAULT_MAX_THREAD_STATES = 8;
+
/**
* Sets the default (for any instance) maximum time to wait for a write lock
* (in milliseconds).
@@ -172,7 +179,7 @@ public final class IndexWriterConfig implements Cloneable {
}
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = DEFAULT_READER_POOLING;
- indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
+ indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
@@ -554,8 +561,8 @@ public final class IndexWriterConfig implements Cloneable {
* IndexWriter to assign thread-states to incoming indexing threads. If no
* {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
* {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
- * thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
- * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
+ * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
+ * {@link #DEFAULT_MAX_THREAD_STATES}).
* </p>
* <p>
* NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
diff --git a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
index b5f0b6c..c07ece5 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
@@ -34,13 +34,8 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
/**
- * Creates a new {@link DocumentsWriterPerThreadPool} with max.
- * {@link #DEFAULT_MAX_THREAD_STATES} thread states.
+ * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
*/
- public ThreadAffinityDocumentsWriterThreadPool() {
- this(DEFAULT_MAX_THREAD_STATES);
- }
-
public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
super(maxNumPerThreads);
assert getMaxThreadStates() >= 1;
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
index 5217de3..a85589d 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
@@ -281,10 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
}
protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
- Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+ Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
long bytesUsed = 0;
while (allActiveThreads.hasNext()) {
- bytesUsed += allActiveThreads.next().perThread.bytesUsed();
+ bytesUsed += allActiveThreads.next().dwpt.bytesUsed();
}
assertEquals(bytesUsed, flushControl.activeBytes());
}
@@ -343,7 +343,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
if (state.flushPending) {
toFlush = state;
} else if (flushOnDeleteTerms()
- && state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
+ && state.dwpt.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
.getMaxBufferedDeleteTerms()) {
toFlush = state;
} else {
@@ -376,7 +376,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
if (state.flushPending) {
toFlush = state;
} else if (flushOnDocCount()
- && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+ && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) {
toFlush = state;
} else if (flushOnRAM()
@@ -397,7 +397,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
hasMarkedPending = true;
} else {
peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
- peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(),
+ peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(),
peakDocCountWithoutFlush);
}
@@ -409,7 +409,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
static void findPending(DocumentsWriterFlushControl flushControl,
ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) {
- Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+ Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
while (allActiveThreads.hasNext()) {
ThreadState next = allActiveThreads.next();
if (next.flushPending) {