blob: cef4410b19ff77d4c499bb26bc5c2298491b1c80 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* This class controls {@link DocumentsWriterPerThread} flushing during
* indexing. It tracks the memory consumption per
* {@link DocumentsWriterPerThread} and uses a configured {@link FlushPolicy} to
* decide if a {@link DocumentsWriterPerThread} must flush.
* <p>
* In addition to the {@link FlushPolicy} the flush control might set certain
* {@link DocumentsWriterPerThread} as flush pending iff a
* {@link DocumentsWriterPerThread} exceeds the
* {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
* space exhaustion.
*/
final class DocumentsWriterFlushControl {
private final long hardMaxBytesPerDWPT;
private long activeBytes = 0;
private long flushBytes = 0;
private volatile int numPending = 0;
private int numDocsSinceStalled = 0; // only with assert
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
private boolean fullFlush = false;
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
// only for safety reasons if a DWPT is close to the RAM limit
private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<DocumentsWriterPerThread, Long>();
double maxConfiguredRamBuffer = 0;
long peakActiveBytes = 0;// only with assert
long peakFlushBytes = 0;// only with assert
long peakNetBytes = 0;// only with assert
long peakDelta = 0; // only with assert
final DocumentsWriterStallControl stallControl;
private final DocumentsWriterPerThreadPool perThreadPool;
private final FlushPolicy flushPolicy;
private boolean closed = false;
private final DocumentsWriter documentsWriter;
private final LiveIndexWriterConfig config;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
this.documentsWriter = documentsWriter;
}
public synchronized long activeBytes() {
return activeBytes;
}
public synchronized long flushBytes() {
return flushBytes;
}
public synchronized long netBytes() {
return flushBytes + activeBytes;
}
private long stallLimitBytes() {
final double maxRamMB = config.getRAMBufferSizeMB();
return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
}
private boolean assertMemory() {
final double maxRamMB = config.getRAMBufferSizeMB();
if (maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
// for this assert we must be tolerant to ram buffer changes!
maxConfiguredRamBuffer = Math.max(maxRamMB, maxConfiguredRamBuffer);
final long ram = flushBytes + activeBytes;
final long ramBufferBytes = (long) (maxConfiguredRamBuffer * 1024 * 1024);
// take peakDelta into account - worst case is that all flushing, pending and blocked DWPT had maxMem and the last doc had the peakDelta
// 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit
// (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed
// all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
// (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
// the expected ram consumption is an upper bound at this point and not really the expected consumption
if (peakDelta < (ramBufferBytes >> 1)) {
/*
* if we are indexing with very low maxRamBuffer like 0.1MB memory can
* easily overflow if we check out some DWPT based on docCount and have
* several DWPT in flight indexing large documents (compared to the ram
* buffer). This means that those DWPT and their threads will not hit
* the stall control before asserting the memory which would in turn
* fail. To prevent this we only assert if the the largest document seen
* is smaller than the 1/2 of the maxRamBufferMB
*/
assert ram <= expected : "actual mem: " + ram + " byte, expected mem: " + expected
+ " byte, flush mem: " + flushBytes + ", active mem: " + activeBytes
+ ", pending DWPT: " + numPending + ", flushing DWPT: "
+ numFlushingDWPT() + ", blocked DWPT: " + numBlockedFlushes()
+ ", peakDelta mem: " + peakDelta + " byte";
}
}
return true;
}
private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.dwpt.bytesUsed()
- perThread.bytesUsed;
perThread.bytesUsed += delta;
/*
* We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to
* pending during a delete
*/
if (perThread.flushPending) {
flushBytes += delta;
} else {
activeBytes += delta;
}
assert updatePeaks(delta);
}
// only for asserts
private boolean updatePeaks(long delta) {
peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
peakNetBytes = Math.max(peakNetBytes, netBytes());
peakDelta = Math.max(peakDelta, delta);
return true;
}
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
boolean isUpdate) {
try {
commitPerThreadBytes(perThread);
if (!perThread.flushPending) {
if (isUpdate) {
flushPolicy.onUpdate(this, perThread);
} else {
flushPolicy.onInsert(this, perThread);
}
if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}
}
final DocumentsWriterPerThread flushingDWPT;
if (fullFlush) {
if (perThread.flushPending) {
checkoutAndBlock(perThread);
flushingDWPT = nextPendingFlush();
} else {
flushingDWPT = null;
}
} else {
flushingDWPT = tryCheckoutForFlush(perThread);
}
return flushingDWPT;
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
private boolean assertNumDocsSinceStalled(boolean stalled) {
/*
* updates the number of documents "finished" while we are in a stalled state.
* this is important for asserting memory upper bounds since it corresponds
* to the number of threads that are in-flight and crossed the stall control
* check before we actually stalled.
* see #assertMemory()
*/
if (stalled) {
numDocsSinceStalled++;
} else {
numDocsSinceStalled = 0;
}
return true;
}
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
assert flushingWriters.containsKey(dwpt);
try {
Long bytes = flushingWriters.remove(dwpt);
flushBytes -= bytes.longValue();
perThreadPool.recycle(dwpt);
assert assertMemory();
} finally {
try {
updateStallState();
} finally {
notifyAll();
}
}
}
private final boolean updateStallState() {
assert Thread.holdsLock(this);
final long limit = stallLimitBytes();
/*
* we block indexing threads if net byte grows due to slow flushes
* yet, for small ram buffers and large documents we can easily
* reach the limit without any ongoing flushes. we need to ensure
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
final boolean stall = ((activeBytes + flushBytes) > limit) &&
(activeBytes < limit) &&
!closed;
stallControl.updateStalled(stall);
return stall;
}
public synchronized void waitForFlush() {
assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
while (flushingWriters.size() != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
}
/**
* Sets flush pending state on the given {@link ThreadState}. The
* {@link ThreadState} must have indexed at least on Document and must not be
* already pending.
*/
public synchronized void setFlushPending(ThreadState perThread) {
assert !perThread.flushPending;
if (perThread.dwpt.getNumDocsInRAM() > 0) {
perThread.flushPending = true; // write access synced
final long bytes = perThread.bytesUsed;
flushBytes += bytes;
activeBytes -= bytes;
numPending++; // write access synced
assert assertMemory();
} // don't assert on numDocs since we could hit an abort excp. while selecting that dwpt for flushing
}
synchronized void doOnAbort(ThreadState state) {
try {
if (state.flushPending) {
flushBytes -= state.bytesUsed;
} else {
activeBytes -= state.bytesUsed;
}
assert assertMemory();
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
} finally {
updateStallState();
}
}
synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread) {
return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
}
private void checkoutAndBlock(ThreadState perThread) {
perThread.lock();
try {
assert perThread.flushPending : "can not block non-pending threadstate";
assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed;
dwpt = perThreadPool.replaceForFlush(perThread, closed);
numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes));
}finally {
perThread.unlock();
}
}
private DocumentsWriterPerThread internalTryCheckOutForFlush(
ThreadState perThread) {
assert Thread.holdsLock(this);
assert perThread.flushPending;
try {
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isActive()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before
// replace!
dwpt = perThreadPool.replaceForFlush(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
numPending--; // write access synced
return dwpt;
}
} finally {
perThread.unlock();
}
}
return null;
} finally {
updateStallState();
}
}
@Override
public String toString() {
return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
+ ", flushBytes=" + flushBytes + "]";
}
DocumentsWriterPerThread nextPendingFlush() {
int numPending;
boolean fullFlush;
synchronized (this) {
final DocumentsWriterPerThread poll;
if ((poll = flushQueue.poll()) != null) {
updateStallState();
return poll;
}
fullFlush = this.fullFlush;
numPending = this.numPending;
}
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit && numPending > 0; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
if (next.flushPending) {
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
if (dwpt != null) {
return dwpt;
}
}
}
}
return null;
}
synchronized void setClosed() {
// set by DW to signal that we should not release new DWPT after close
if (!closed) {
this.closed = true;
perThreadPool.deactivateUnreleasedStates();
}
}
/**
* Returns an iterator that provides access to all currently active {@link ThreadState}s
*/
public Iterator<ThreadState> allActiveThreadStates() {
return getPerThreadsIterator(perThreadPool.getActiveThreadState());
}
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
return new Iterator<ThreadState>() {
int i = 0;
@Override
public boolean hasNext() {
return i < upto;
}
@Override
public ThreadState next() {
return perThreadPool.getThreadState(i++);
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove() not supported.");
}
};
}
synchronized void doOnDelete() {
// pass null this is a global delete no update
flushPolicy.onDelete(this, null);
}
/**
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
}
public long getDeleteBytesUsed() {
return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {
return flushingWriters.size();
}
public boolean doApplyAllDeletes() {
return flushDeletes.getAndSet(false);
}
public void setApplyAllDeletes() {
flushDeletes.set(true);
}
int numActiveDWPT() {
return this.perThreadPool.getActiveThreadState();
}
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
boolean success = false;
try {
if (perThread.isActive()
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
addFlushableState(perThread);
}
success = true;
// simply return the ThreadState even in a flush all case sine we already hold the lock
return perThread;
} finally {
if (!success) { // make sure we unlock if this fails
perThread.unlock();
}
}
}
void markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
synchronized (this) {
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
if (!next.isActive()) {
continue;
}
assert next.dwpt.deleteQueue == flushingQueue
|| next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+ flushingQueue
+ " currentqueue: "
+ documentsWriter.deleteQueue
+ " perThread queue: "
+ next.dwpt.deleteQueue
+ " numDocsInRam: " + next.dwpt.getNumDocsInRAM();
if (next.dwpt.deleteQueue != flushingQueue) {
// this one is already a new DWPT
continue;
}
addFlushableState(next);
} finally {
next.unlock();
}
}
synchronized (this) {
/* make sure we move all DWPT that are where concurrently marked as
* pending and moved to blocked are moved over to the flushQueue. There is
* a chance that this happens since we marking DWPT for full flush without
* blocking indexing.*/
pruneBlockedQueue(flushingQueue);
assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear();
updateStallState();
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
assert !next.isActive() || next.dwpt.deleteQueue == queue;
} finally {
next.unlock();
}
}
return true;
}
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
void addFlushableState(ThreadState perThread) {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
}
final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
assert perThread.isActive();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
synchronized(this) {
if (!perThread.flushPending) {
setFlushPending(perThread);
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
fullFlushBuffer.add(flushingDWPT);
}
} else {
if (closed) {
perThreadPool.deactivateThreadState(perThread); // make this state inactive
} else {
perThreadPool.reinitThreadState(perThread);
}
}
}
/**
* Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue.
*/
private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) {
Iterator<BlockedFlush> iterator = blockedFlushes.iterator();
while (iterator.hasNext()) {
BlockedFlush blockedFlush = iterator.next();
if (blockedFlush.dwpt.deleteQueue == flushingQueue) {
iterator.remove();
assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
// don't decr pending here - its already done when DWPT is blocked
flushQueue.add(blockedFlush.dwpt);
}
}
}
synchronized void finishFullFlush() {
assert fullFlush;
assert flushQueue.isEmpty();
assert flushingWriters.isEmpty();
try {
if (!blockedFlushes.isEmpty()) {
assert assertBlockedFlushes(documentsWriter.deleteQueue);
pruneBlockedQueue(documentsWriter.deleteQueue);
assert blockedFlushes.isEmpty();
}
} finally {
fullFlush = false;
updateStallState();
}
}
boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
for (BlockedFlush blockedFlush : blockedFlushes) {
assert blockedFlush.dwpt.deleteQueue == flushingQueue;
}
return true;
}
synchronized void abortFullFlushes() {
try {
abortPendingFlushes();
} finally {
fullFlush = false;
}
}
synchronized void abortPendingFlushes() {
try {
for (DocumentsWriterPerThread dwpt : flushQueue) {
try {
dwpt.abort();
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
} finally {
doAfterFlush(dwpt);
}
}
for (BlockedFlush blockedFlush : blockedFlushes) {
try {
flushingWriters
.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
blockedFlush.dwpt.abort();
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
} finally {
doAfterFlush(blockedFlush.dwpt);
}
}
} finally {
flushQueue.clear();
blockedFlushes.clear();
updateStallState();
}
}
/**
* Returns <code>true</code> if a full flush is currently running
*/
synchronized boolean isFullFlush() {
return fullFlush;
}
/**
* Returns the number of flushes that are already checked out but not yet
* actively flushing
*/
synchronized int numQueuedFlushes() {
return flushQueue.size();
}
/**
* Returns the number of flushes that are checked out but not yet available
* for flushing. This only applies during a full flush if a DWPT needs
* flushing but must not be flushed until the full flush has finished.
*/
synchronized int numBlockedFlushes() {
return blockedFlushes.size();
}
private static class BlockedFlush {
final DocumentsWriterPerThread dwpt;
final long bytes;
BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) {
super();
this.dwpt = dwpt;
this.bytes = bytes;
}
}
/**
* This method will block if too many DWPT are currently flushing and no
* checked out DWPT are available
*/
void waitIfStalled() {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC",
"waitIfStalled: numFlushesPending: " + flushQueue.size()
+ " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
+ " fullFlush: " + fullFlush);
}
stallControl.waitIfStalled();
}
/**
* Returns <code>true</code> iff stalled
*/
boolean anyStalledThreads() {
return stallControl.anyStalledThreads();
}
}