| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.index; |
| |
| |
| import java.io.Closeable; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Queue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.util.Accountable; |
| import org.apache.lucene.util.InfoStream; |
| import org.apache.lucene.util.ThreadInterruptedException; |
| |
| /** |
| * This class controls {@link DocumentsWriterPerThread} flushing during |
| * indexing. It tracks the memory consumption per |
| * {@link DocumentsWriterPerThread} and uses a configured {@link FlushPolicy} to |
| * decide if a {@link DocumentsWriterPerThread} must flush. |
| * <p> |
| * In addition to the {@link FlushPolicy} the flush control might set certain |
| * {@link DocumentsWriterPerThread} as flush pending iff a |
| * {@link DocumentsWriterPerThread} exceeds the |
| * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address |
| * space exhaustion. |
| */ |
| final class DocumentsWriterFlushControl implements Accountable, Closeable { |
| |
| private final long hardMaxBytesPerDWPT; |
| private long activeBytes = 0; |
| private volatile long flushBytes = 0; |
| private volatile int numPending = 0; |
| private int numDocsSinceStalled = 0; // only with assert |
| private final AtomicBoolean flushDeletes = new AtomicBoolean(false); |
| private boolean fullFlush = false; |
| private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool |
| // The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in |
| // progress. This might be triggered by a commit or NRT refresh. The trigger will only walk all eligible DWPTs and |
| // mark them as flushable putting them in the flushQueue ready for other threads (ie. indexing threads) to help flushing |
| private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<>(); |
| // only for safety reasons if a DWPT is close to the RAM limit |
| private final Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<>(); |
| // flushingWriters holds all currently flushing writers. There might be writers in this list that |
| // are also in the flushQueue which means that writers in the flushingWriters list are not necessarily |
| // already actively flushing. They are only in the state of flushing and might be picked up in the future by |
| // polling the flushQueue |
| private final List<DocumentsWriterPerThread> flushingWriters = new ArrayList<>(); |
| |
| private double maxConfiguredRamBuffer = 0; |
| private long peakActiveBytes = 0;// only with assert |
| private long peakFlushBytes = 0;// only with assert |
| private long peakNetBytes = 0;// only with assert |
| private long peakDelta = 0; // only with assert |
| private boolean flushByRAMWasDisabled; // only with assert |
| final DocumentsWriterStallControl stallControl = new DocumentsWriterStallControl(); |
| private final DocumentsWriterPerThreadPool perThreadPool; |
| private final FlushPolicy flushPolicy; |
| private boolean closed = false; |
| private final DocumentsWriter documentsWriter; |
| private final LiveIndexWriterConfig config; |
| private final InfoStream infoStream; |
| |
| DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) { |
| this.infoStream = config.getInfoStream(); |
| this.perThreadPool = documentsWriter.perThreadPool; |
| this.flushPolicy = config.getFlushPolicy(); |
| this.config = config; |
| this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; |
| this.documentsWriter = documentsWriter; |
| } |
| |
| public synchronized long activeBytes() { |
| return activeBytes; |
| } |
| |
| long getFlushingBytes() { |
| return flushBytes; |
| } |
| |
| synchronized long netBytes() { |
| return flushBytes + activeBytes; |
| } |
| |
| private long stallLimitBytes() { |
| final double maxRamMB = config.getRAMBufferSizeMB(); |
| return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE; |
| } |
| |
| private boolean assertMemory() { |
| final double maxRamMB = config.getRAMBufferSizeMB(); |
| // We can only assert if we have always been flushing by RAM usage; otherwise the assert will false trip if e.g. the |
| // flush-by-doc-count * doc size was large enough to use far more RAM than the sudden change to IWC's maxRAMBufferSizeMB: |
| if (maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && flushByRAMWasDisabled == false) { |
| // for this assert we must be tolerant to ram buffer changes! |
| maxConfiguredRamBuffer = Math.max(maxRamMB, maxConfiguredRamBuffer); |
| final long ram = flushBytes + activeBytes; |
| final long ramBufferBytes = (long) (maxConfiguredRamBuffer * 1024 * 1024); |
| // take peakDelta into account - worst case is that all flushing, pending and blocked DWPT had maxMem and the last doc had the peakDelta |
| |
| // 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit |
| // (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully flushed |
| // all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta |
| // (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document |
| final long expected = (2 * ramBufferBytes) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta); |
| // the expected ram consumption is an upper bound at this point and not really the expected consumption |
| if (peakDelta < (ramBufferBytes >> 1)) { |
| /* |
| * if we are indexing with very low maxRamBuffer like 0.1MB memory can |
| * easily overflow if we check out some DWPT based on docCount and have |
| * several DWPT in flight indexing large documents (compared to the ram |
| * buffer). This means that those DWPT and their threads will not hit |
| * the stall control before asserting the memory which would in turn |
| * fail. To prevent this we only assert if the the largest document seen |
| * is smaller than the 1/2 of the maxRamBufferMB |
| */ |
| assert ram <= expected : "actual mem: " + ram + " byte, expected mem: " + expected |
| + " byte, flush mem: " + flushBytes + ", active mem: " + activeBytes |
| + ", pending DWPT: " + numPending + ", flushing DWPT: " |
| + numFlushingDWPT() + ", blocked DWPT: " + numBlockedFlushes() |
| + ", peakDelta mem: " + peakDelta + " bytes, ramBufferBytes=" + ramBufferBytes |
| + ", maxConfiguredRamBuffer=" + maxConfiguredRamBuffer; |
| } |
| } else { |
| flushByRAMWasDisabled = true; |
| } |
| return true; |
| } |
| |
| private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) { |
| final long delta = perThread.commitLastBytesUsed(); |
| /* |
| * We need to differentiate here if we are pending since setFlushPending |
| * moves the perThread memory to the flushBytes and we could be set to |
| * pending during a delete |
| */ |
| if (perThread.isFlushPending()) { |
| flushBytes += delta; |
| } else { |
| activeBytes += delta; |
| } |
| assert updatePeaks(delta); |
| } |
| |
| // only for asserts |
| private boolean updatePeaks(long delta) { |
| peakActiveBytes = Math.max(peakActiveBytes, activeBytes); |
| peakFlushBytes = Math.max(peakFlushBytes, flushBytes); |
| peakNetBytes = Math.max(peakNetBytes, netBytes()); |
| peakDelta = Math.max(peakDelta, delta); |
| |
| return true; |
| } |
| |
| synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { |
| try { |
| commitPerThreadBytes(perThread); |
| if (!perThread.isFlushPending()) { |
| if (isUpdate) { |
| flushPolicy.onUpdate(this, perThread); |
| } else { |
| flushPolicy.onInsert(this, perThread); |
| } |
| if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) { |
| // Safety check to prevent a single DWPT exceeding its RAM limit. This |
| // is super important since we can not address more than 2048 MB per DWPT |
| setFlushPending(perThread); |
| } |
| } |
| return checkout(perThread, false); |
| } finally { |
| boolean stalled = updateStallState(); |
| assert assertNumDocsSinceStalled(stalled) && assertMemory(); |
| } |
| } |
| |
| private DocumentsWriterPerThread checkout(DocumentsWriterPerThread perThread, boolean markPending) { |
| assert Thread.holdsLock(this); |
| if (fullFlush) { |
| if (perThread.isFlushPending()) { |
| checkoutAndBlock(perThread); |
| return nextPendingFlush(); |
| } |
| } else { |
| if (markPending) { |
| assert perThread.isFlushPending() == false; |
| setFlushPending(perThread); |
| } |
| |
| if (perThread.isFlushPending()) { |
| return checkOutForFlush(perThread); |
| } |
| } |
| return null; |
| } |
| |
| private boolean assertNumDocsSinceStalled(boolean stalled) { |
| /* |
| * updates the number of documents "finished" while we are in a stalled state. |
| * this is important for asserting memory upper bounds since it corresponds |
| * to the number of threads that are in-flight and crossed the stall control |
| * check before we actually stalled. |
| * see #assertMemory() |
| */ |
| if (stalled) { |
| numDocsSinceStalled++; |
| } else { |
| numDocsSinceStalled = 0; |
| } |
| return true; |
| } |
| |
| synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { |
| assert flushingWriters.contains(dwpt); |
| try { |
| flushingWriters.remove(dwpt); |
| flushBytes -= dwpt.getLastCommittedBytesUsed(); |
| assert assertMemory(); |
| } finally { |
| try { |
| updateStallState(); |
| } finally { |
| notifyAll(); |
| } |
| } |
| } |
| |
| private long stallStartNS; |
| |
| private boolean updateStallState() { |
| |
| assert Thread.holdsLock(this); |
| final long limit = stallLimitBytes(); |
| /* |
| * we block indexing threads if net byte grows due to slow flushes |
| * yet, for small ram buffers and large documents we can easily |
| * reach the limit without any ongoing flushes. we need to ensure |
| * that we don't stall/block if an ongoing or pending flush can |
| * not free up enough memory to release the stall lock. |
| */ |
| final boolean stall = (activeBytes + flushBytes) > limit && |
| activeBytes < limit && |
| !closed; |
| |
| if (infoStream.isEnabled("DWFC")) { |
| if (stall != stallControl.anyStalledThreads()) { |
| if (stall) { |
| infoStream.message("DW", String.format(Locale.ROOT, "now stalling flushes: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b", |
| netBytes()/1024./1024., getFlushingBytes()/1024./1024., fullFlush)); |
| stallStartNS = System.nanoTime(); |
| } else { |
| infoStream.message("DW", String.format(Locale.ROOT, "done stalling flushes for %.1f msec: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b", |
| (System.nanoTime()-stallStartNS)/1000000., netBytes()/1024./1024., getFlushingBytes()/1024./1024., fullFlush)); |
| } |
| } |
| } |
| |
| stallControl.updateStalled(stall); |
| return stall; |
| } |
| |
| public synchronized void waitForFlush() { |
| while (flushingWriters.size() != 0) { |
| try { |
| this.wait(); |
| } catch (InterruptedException e) { |
| throw new ThreadInterruptedException(e); |
| } |
| } |
| } |
| |
| /** |
| * Sets flush pending state on the given {@link DocumentsWriterPerThread}. The |
| * {@link DocumentsWriterPerThread} must have indexed at least on Document and must not be |
| * already pending. |
| */ |
| public synchronized void setFlushPending(DocumentsWriterPerThread perThread) { |
| assert !perThread.isFlushPending(); |
| if (perThread.getNumDocsInRAM() > 0) { |
| perThread.setFlushPending(); // write access synced |
| final long bytes = perThread.getLastCommittedBytesUsed(); |
| flushBytes += bytes; |
| activeBytes -= bytes; |
| numPending++; // write access synced |
| assert assertMemory(); |
| } // don't assert on numDocs since we could hit an abort excp. while selecting that dwpt for flushing |
| |
| } |
| |
| synchronized void doOnAbort(DocumentsWriterPerThread perThread) { |
| try { |
| assert perThreadPool.isRegistered(perThread); |
| assert perThread.isHeldByCurrentThread(); |
| if (perThread.isFlushPending()) { |
| flushBytes -= perThread.getLastCommittedBytesUsed(); |
| } else { |
| activeBytes -= perThread.getLastCommittedBytesUsed(); |
| } |
| assert assertMemory(); |
| // Take it out of the loop this DWPT is stale |
| } finally { |
| updateStallState(); |
| boolean checkedOut = perThreadPool.checkout(perThread); |
| assert checkedOut; |
| } |
| } |
| |
| private void checkoutAndBlock(DocumentsWriterPerThread perThread) { |
| assert perThreadPool.isRegistered(perThread); |
| assert perThread.isHeldByCurrentThread(); |
| assert perThread.isFlushPending() : "can not block non-pending threadstate"; |
| assert fullFlush : "can not block if fullFlush == false"; |
| numPending--; |
| blockedFlushes.add(perThread); |
| boolean checkedOut = perThreadPool.checkout(perThread); |
| assert checkedOut; |
| } |
| |
| private synchronized DocumentsWriterPerThread checkOutForFlush(DocumentsWriterPerThread perThread) { |
| assert Thread.holdsLock(this); |
| assert perThread.isFlushPending(); |
| assert perThread.isHeldByCurrentThread(); |
| assert perThreadPool.isRegistered(perThread); |
| try { |
| addFlushingDWPT(perThread); |
| numPending--; // write access synced |
| boolean checkedOut = perThreadPool.checkout(perThread); |
| assert checkedOut; |
| return perThread; |
| } finally { |
| updateStallState(); |
| } |
| } |
| |
| private void addFlushingDWPT(DocumentsWriterPerThread perThread) { |
| assert flushingWriters.contains(perThread) == false : "DWPT is already flushing"; |
| // Record the flushing DWPT to reduce flushBytes in doAfterFlush |
| flushingWriters.add(perThread); |
| } |
| |
| @Override |
| public String toString() { |
| return "DocumentsWriterFlushControl [activeBytes=" + activeBytes |
| + ", flushBytes=" + flushBytes + "]"; |
| } |
| |
| DocumentsWriterPerThread nextPendingFlush() { |
| int numPending; |
| boolean fullFlush; |
| synchronized (this) { |
| final DocumentsWriterPerThread poll; |
| if ((poll = flushQueue.poll()) != null) { |
| updateStallState(); |
| return poll; |
| } |
| fullFlush = this.fullFlush; |
| numPending = this.numPending; |
| } |
| if (numPending > 0 && fullFlush == false) { // don't check if we are doing a full flush |
| for (final DocumentsWriterPerThread next : perThreadPool) { |
| if (next.isFlushPending()) { |
| if (next.tryLock()) { |
| try { |
| if (perThreadPool.isRegistered(next)) { |
| return checkOutForFlush(next); |
| } |
| } finally { |
| next.unlock(); |
| } |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public synchronized void close() { |
| // set by DW to signal that we are closing. in this case we try to not stall any threads anymore etc. |
| closed = true; |
| } |
| |
| /** |
| * Returns an iterator that provides access to all currently active {@link DocumentsWriterPerThread}s |
| */ |
| public Iterator<DocumentsWriterPerThread> allActiveWriters() { |
| return perThreadPool.iterator(); |
| } |
| |
| synchronized void doOnDelete() { |
| // pass null this is a global delete no update |
| flushPolicy.onDelete(this, null); |
| } |
| |
| /** Returns heap bytes currently consumed by buffered deletes/updates that would be |
| * freed if we pushed all deletes. This does not include bytes consumed by |
| * already pushed delete/update packets. */ |
| public long getDeleteBytesUsed() { |
| return documentsWriter.deleteQueue.ramBytesUsed(); |
| } |
| |
| @Override |
| public long ramBytesUsed() { |
| // TODO: improve this to return more detailed info? |
| return getDeleteBytesUsed() + netBytes(); |
| } |
| |
| synchronized int numFlushingDWPT() { |
| return flushingWriters.size(); |
| } |
| |
| public boolean getAndResetApplyAllDeletes() { |
| return flushDeletes.getAndSet(false); |
| } |
| |
| public void setApplyAllDeletes() { |
| flushDeletes.set(true); |
| } |
| |
| DocumentsWriterPerThread obtainAndLock() { |
| while (closed == false) { |
| final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(); |
| if (perThread.deleteQueue == documentsWriter.deleteQueue) { |
| // simply return the DWPT even in a flush all case since we already hold the lock and the DWPT is not stale |
| // since it has the current delete queue associated with it. This means we have established a happens-before |
| // relationship and all docs indexed into this DWPT are guaranteed to not be flushed with the currently |
| // progress full flush. |
| return perThread; |
| } else { |
| try { |
| // we must first assert otherwise the full flush might make progress once we unlock the dwpt |
| assert fullFlush && fullFlushMarkDone == false : |
| "found a stale DWPT but full flush mark phase is already done fullFlush: " |
| + fullFlush + " markDone: " + fullFlushMarkDone; |
| } finally { |
| perThread.unlock(); |
| // There is a flush-all in process and this DWPT is |
| // now stale - try another one |
| } |
| } |
| } |
| throw new AlreadyClosedException("flush control is closed"); |
| } |
| |
| long markForFullFlush() { |
| final DocumentsWriterDeleteQueue flushingQueue; |
| long seqNo; |
| synchronized (this) { |
| assert fullFlush == false: "called DWFC#markForFullFlush() while full flush is still running"; |
| assert fullFlushMarkDone == false : "full flush collection marker is still set to true"; |
| fullFlush = true; |
| flushingQueue = documentsWriter.deleteQueue; |
| // Set a new delete queue - all subsequent DWPT will use this queue until |
| // we do another full flush |
| perThreadPool.lockNewWriters(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off |
| try { |
| // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine |
| // if we have some sequence numbers that were never assigned: |
| DocumentsWriterDeleteQueue newQueue = documentsWriter.deleteQueue.advanceQueue(perThreadPool.size()); |
| seqNo = documentsWriter.deleteQueue.getMaxSeqNo(); |
| documentsWriter.resetDeleteQueue(newQueue); |
| } finally { |
| perThreadPool.unlockNewWriters(); |
| } |
| } |
| final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>(); |
| for (final DocumentsWriterPerThread next : perThreadPool.filterAndLock(dwpt -> dwpt.deleteQueue == flushingQueue)) { |
| try { |
| assert next.deleteQueue == flushingQueue |
| || next.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " |
| + flushingQueue |
| + " currentqueue: " |
| + documentsWriter.deleteQueue |
| + " perThread queue: " |
| + next.deleteQueue |
| + " numDocsInRam: " + next.getNumDocsInRAM(); |
| |
| if (next.getNumDocsInRAM() > 0) { |
| final DocumentsWriterPerThread flushingDWPT; |
| synchronized(this) { |
| if (next.isFlushPending() == false) { |
| setFlushPending(next); |
| } |
| flushingDWPT = checkOutForFlush(next); |
| } |
| assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; |
| assert next == flushingDWPT : "flushControl returned different DWPT"; |
| fullFlushBuffer.add(flushingDWPT); |
| } else { |
| // it's possible that we get a DWPT with 0 docs if we flush concurrently to |
| // threads getting DWPTs from the pool. In this case we simply remove it from |
| // the pool and drop it on the floor. |
| boolean checkout = perThreadPool.checkout(next); |
| assert checkout; |
| } |
| } finally { |
| next.unlock(); |
| } |
| } |
| synchronized (this) { |
| /* make sure we move all DWPT that are where concurrently marked as |
| * pending and moved to blocked are moved over to the flushQueue. There is |
| * a chance that this happens since we marking DWPT for full flush without |
| * blocking indexing.*/ |
| pruneBlockedQueue(flushingQueue); |
| assert assertBlockedFlushes(documentsWriter.deleteQueue); |
| flushQueue.addAll(fullFlushBuffer); |
| updateStallState(); |
| fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue |
| } |
| assert assertActiveDeleteQueue(documentsWriter.deleteQueue); |
| assert flushingQueue.getLastSequenceNumber() <= flushingQueue.getMaxSeqNo(); |
| return seqNo; |
| } |
| |
| private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { |
| for (final DocumentsWriterPerThread next : perThreadPool) { |
| assert next.deleteQueue == queue : "numDocs: " + next.getNumDocsInRAM(); |
| } |
| return true; |
| } |
| |
| /** |
| * Prunes the blockedQueue by removing all DWPTs that are associated with the given flush queue. |
| */ |
| private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) { |
| assert Thread.holdsLock(this); |
| Iterator<DocumentsWriterPerThread> iterator = blockedFlushes.iterator(); |
| while (iterator.hasNext()) { |
| DocumentsWriterPerThread blockedFlush = iterator.next(); |
| if (blockedFlush.deleteQueue == flushingQueue) { |
| iterator.remove(); |
| addFlushingDWPT(blockedFlush); |
| // don't decr pending here - it's already done when DWPT is blocked |
| flushQueue.add(blockedFlush); |
| } |
| } |
| } |
| |
| synchronized void finishFullFlush() { |
| assert fullFlush; |
| assert flushQueue.isEmpty(); |
| assert flushingWriters.isEmpty(); |
| try { |
| if (!blockedFlushes.isEmpty()) { |
| assert assertBlockedFlushes(documentsWriter.deleteQueue); |
| pruneBlockedQueue(documentsWriter.deleteQueue); |
| assert blockedFlushes.isEmpty(); |
| } |
| } finally { |
| fullFlushMarkDone = fullFlush = false; |
| |
| updateStallState(); |
| } |
| } |
| |
| boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) { |
| for (DocumentsWriterPerThread blockedFlush : blockedFlushes) { |
| assert blockedFlush.deleteQueue == flushingQueue; |
| } |
| return true; |
| } |
| |
| synchronized void abortFullFlushes() { |
| try { |
| abortPendingFlushes(); |
| } finally { |
| fullFlushMarkDone = fullFlush = false; |
| } |
| } |
| |
| synchronized void abortPendingFlushes() { |
| try { |
| for (DocumentsWriterPerThread dwpt : flushQueue) { |
| try { |
| documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM()); |
| dwpt.abort(); |
| } catch (Exception ex) { |
| // that's fine we just abort everything here this is best effort |
| } finally { |
| doAfterFlush(dwpt); |
| } |
| } |
| for (DocumentsWriterPerThread blockedFlush : blockedFlushes) { |
| try { |
| addFlushingDWPT(blockedFlush); // add the blockedFlushes for correct accounting in doAfterFlush |
| documentsWriter.subtractFlushedNumDocs(blockedFlush.getNumDocsInRAM()); |
| blockedFlush.abort(); |
| } catch (Exception ex) { |
| // that's fine we just abort everything here this is best effort |
| } finally { |
| doAfterFlush(blockedFlush); |
| } |
| } |
| } finally { |
| flushQueue.clear(); |
| blockedFlushes.clear(); |
| updateStallState(); |
| } |
| } |
| |
| /** |
| * Returns <code>true</code> if a full flush is currently running |
| */ |
| synchronized boolean isFullFlush() { |
| return fullFlush; |
| } |
| |
| /** |
| * Returns the number of flushes that are already checked out but not yet |
| * actively flushing |
| */ |
| synchronized int numQueuedFlushes() { |
| return flushQueue.size(); |
| } |
| |
| /** |
| * Returns the number of flushes that are checked out but not yet available |
| * for flushing. This only applies during a full flush if a DWPT needs |
| * flushing but must not be flushed until the full flush has finished. |
| */ |
| synchronized int numBlockedFlushes() { |
| return blockedFlushes.size(); |
| } |
| |
| /** |
| * This method will block if too many DWPT are currently flushing and no |
| * checked out DWPT are available |
| */ |
| void waitIfStalled() { |
| stallControl.waitIfStalled(); |
| } |
| |
| /** |
| * Returns <code>true</code> iff stalled |
| */ |
| boolean anyStalledThreads() { |
| return stallControl.anyStalledThreads(); |
| } |
| |
| /** |
| * Returns the {@link IndexWriter} {@link InfoStream} |
| */ |
| public InfoStream getInfoStream() { |
| return infoStream; |
| } |
| |
| synchronized DocumentsWriterPerThread findLargestNonPendingWriter() { |
| DocumentsWriterPerThread maxRamUsingWriter = null; |
| long maxRamSoFar = 0; |
| int count = 0; |
| for (DocumentsWriterPerThread next : perThreadPool) { |
| if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) { |
| final long nextRam = next.getLastCommittedBytesUsed(); |
| if (infoStream.isEnabled("FP")) { |
| infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.getNumDocsInRAM()); |
| } |
| count++; |
| if (nextRam > maxRamSoFar) { |
| maxRamSoFar = nextRam; |
| maxRamUsingWriter = next; |
| } |
| } |
| } |
| if (infoStream.isEnabled("FP")) { |
| infoStream.message("FP", count + " in-use non-flushing threads states"); |
| } |
| return maxRamUsingWriter; |
| } |
| |
| /** |
| * Returns the largest non-pending flushable DWPT or <code>null</code> if there is none. |
| */ |
| final DocumentsWriterPerThread checkoutLargestNonPendingWriter() { |
| DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter(); |
| if (largestNonPendingWriter != null) { |
| // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue |
| largestNonPendingWriter.lock(); |
| try { |
| if (perThreadPool.isRegistered(largestNonPendingWriter)) { |
| synchronized (this) { |
| try { |
| return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false); |
| } finally { |
| updateStallState(); |
| } |
| } |
| } |
| } finally { |
| largestNonPendingWriter.unlock(); |
| } |
| } |
| return null; |
| } |
| |
| long getPeakActiveBytes() { |
| return peakActiveBytes; |
| } |
| |
| long getPeakNetBytes() { |
| return peakNetBytes; |
| } |
| } |