blob: cfdb21867cb4b2ca25651cf47c8eec4ed23c38c2 [file] [log] [blame]
package org.apache.lucene.index;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.IdentityHashMap;
import java.util.Map;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException;
* Controls the health status of a {@link DocumentsWriter} sessions. This class
* used to block incoming indexing threads if flushing significantly slower than
* indexing to ensure the {@link DocumentsWriter}s healthiness. If flushing is
* significantly slower than indexing the net memory used within an
* {@link IndexWriter} session can increase very quickly and easily exceed the
* JVM's available memory.
* <p>
* To prevent OOM Errors and ensure IndexWriter's stability this class blocks
* incoming threads from indexing once 2 x number of available
* {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded.
* Once flushing catches up and the number of flushing DWPT is equal or lower
* than the number of active {@link ThreadState}s threads are released and can
* continue indexing.
final class DocumentsWriterStallControl {
private volatile boolean stalled;
private int numWaiting; // only with assert
private boolean wasStalled; // only with assert
private final Map<Thread, Boolean> waiting = new IdentityHashMap<Thread, Boolean>(); // only with assert
* Update the stalled flag status. This method will set the stalled flag to
* <code>true</code> iff the number of flushing
* {@link DocumentsWriterPerThread} is greater than the number of active
* {@link DocumentsWriterPerThread}. Otherwise it will reset the
* {@link DocumentsWriterStallControl} to healthy and release all threads
* waiting on {@link #waitIfStalled()}
synchronized void updateStalled(boolean stalled) {
this.stalled = stalled;
if (stalled) {
wasStalled = true;
* Blocks if documents writing is currently in a stalled state.
void waitIfStalled() {
if (stalled) {
synchronized (this) {
if (stalled) { // react on the first wakeup call!
// don't loop here, higher level logic will re-stall!
try {
assert incWaiters();
assert decrWaiters();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
boolean anyStalledThreads() {
return stalled;
private boolean incWaiters() {
assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
return numWaiting > 0;
private boolean decrWaiters() {
assert waiting.remove(Thread.currentThread()) != null;
return numWaiting >= 0;
synchronized boolean hasBlocked() { // for tests
return numWaiting > 0;
boolean isHealthy() { // for tests
return !stalled; // volatile read!
synchronized boolean isThreadQueued(Thread t) { // for tests
return waiting.containsKey(t);
synchronized boolean wasStalled() { // for tests
return wasStalled;