blob: 3da9a76b6001ae3be1184a462e9dbcf44fd4ee6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/**
* This class accepts multiple added documents and directly
* writes segment files.
*
* Each added document is passed to the indexing chain,
* which in turn processes the document into the different
* codec formats. Some formats write bytes to files
* immediately, e.g. stored fields and term vectors, while
* others are buffered by the indexing chain and written
* only on flush.
*
* Once we have used our allowed RAM buffer, or the number
* of added docs is large enough (in the case we are
* flushing by doc count instead of RAM usage), we create a
* real segment and flush it to the Directory.
*
* Threads:
*
* Multiple threads are allowed into addDocument at once.
* There is an initial synchronized call to
* {@link DocumentsWriterFlushControl#obtainAndLock()}
* which allocates a DWPT for this indexing thread. The same
* thread will not necessarily get the same DWPT over time.
* Then updateDocuments is called on that DWPT without
* synchronization (most of the "heavy lifting" is in this
* call). Once a DWPT fills up enough RAM or hold enough
* documents in memory the DWPT is checked out for flush
* and all changes are written to the directory. Each DWPT
* corresponds to one segment being written.
*
* When flush is called by IndexWriter we check out all DWPTs
* that are associated with the current {@link DocumentsWriterDeleteQueue}
* out of the {@link DocumentsWriterPerThreadPool} and write
* them to disk. The flush process can piggy-back on incoming
* indexing threads or even block them from adding documents
* if flushing can't keep up with new documents being added.
* Unless the stall control kicks in to block indexing threads
* flushes are happening concurrently to actual index requests.
*
*
* Exceptions:
*
* Because this class directly updates in-memory posting
* lists, and flushes stored fields and term vectors
* directly to files in the directory, there are certain
* limited times when an exception can corrupt this state.
* For example, a disk full while flushing stored fields
* leaves this file in a corrupt state. Or, an OOM
* exception while appending to the in-memory posting lists
* can corrupt that posting list. We call such exceptions
* "aborting exceptions". In these cases we must call
* abort() to discard all docs added since the last flush.
*
* All other exceptions ("non-aborting exceptions") can
* still partially update the index structures. These
* updates are consistent, but, they represent only a part
* of the document seen up until the exception was hit.
* When this happens, we immediately mark the document as
* deleted so that the document is always atomically ("all
* or none") added to the index.
*/
final class DocumentsWriter implements Closeable, Accountable {
private final AtomicLong pendingNumDocs;
private final FlushNotifications flushNotifications;
private volatile boolean closed;
private final InfoStream infoStream;
private final LiveIndexWriterConfig config;
private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue;
private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
/*
* we preserve changes during a full flush since IW might not checkout before
* we release all changes. NRT Readers otherwise suddenly return true from
* isCurrent while there are actually changes currently committed. See also
* #anyChanges() & #flushAllThreads
*/
private volatile boolean pendingChangesInCurrentFullFlush;
final DocumentsWriterPerThreadPool perThreadPool;
final DocumentsWriterFlushControl flushControl;
DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory,
FieldInfos.FieldNumbers globalFieldNumberMap) {
this.config = config;
this.infoStream = config.getInfoStream();
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool = new DocumentsWriterPerThreadPool(() -> {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread(indexCreatedVersionMajor,
segmentNameSupplier.get(), directoryOrig,
directory, config, deleteQueue, infos,
pendingNumDocs, enableTestPoints);
});
this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config);
this.flushNotifications = flushNotifications;
}
long deleteQueries(final Query... queries) throws IOException {
return applyDeleteOrUpdate(q -> q.addDelete(queries));
}
long deleteTerms(final Term... terms) throws IOException {
return applyDeleteOrUpdate(q -> q.addDelete(terms));
}
long updateDocValues(DocValuesUpdate... updates) throws IOException {
return applyDeleteOrUpdate(q -> q.addDocValuesUpdates(updates));
}
private synchronized long applyDeleteOrUpdate(ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException {
// This method is synchronized to make sure we don't replace the deleteQueue while applying this update / delete
// otherwise we might lose an update / delete if this happens concurrently to a full flush.
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = function.applyAsLong(deleteQueue);
flushControl.doOnDelete();
if (applyAllDeletes()) {
seqNo = -seqNo;
}
return seqNo;
}
/** If buffered deletes are using too much heap, resolve them and write disk and return true. */
private boolean applyAllDeletes() throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
if (flushControl.isFullFlush() == false // never apply deletes during full flush this breaks happens before relationship
&& deleteQueue.isOpen() // if it's closed then it's already fully applied and we have a new delete queue
&& flushControl.getAndResetApplyAllDeletes()) {
if (ticketQueue.addDeletes(deleteQueue)) {
flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
return true;
}
}
return false;
}
void purgeFlushTickets(boolean forced, IOUtils.IOConsumer<DocumentsWriterFlushQueue.FlushTicket> consumer)
throws IOException {
if (forced) {
ticketQueue.forcePurge(consumer);
} else {
ticketQueue.tryPurge(consumer);
}
}
/** Returns how many docs are currently buffered in RAM. */
int getNumDocs() {
return numDocsInRAM.get();
}
private void ensureOpen() throws AlreadyClosedException {
if (closed) {
throw new AlreadyClosedException("this DocumentsWriter is closed");
}
}
/** Called if we hit an exception at a bad time (when
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
boolean success = false;
try {
deleteQueue.clear();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort");
}
for (final DocumentsWriterPerThread perThread : perThreadPool.filterAndLock(x -> true)) {
try {
abortDocumentsWriterPerThread(perThread);
} finally {
perThread.unlock();
}
}
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
assert perThreadPool.size() == 0
: "There are still active DWPT in the pool: " + perThreadPool.size();
success = true;
} finally {
if (success) {
assert flushControl.getFlushingBytes() == 0 : "flushingBytes has unexpected value 0 != " + flushControl.getFlushingBytes();
assert flushControl.netBytes() == 0 : "netBytes has unexpected value 0 != " + flushControl.netBytes();
}
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "done abort success=" + success);
}
}
}
final boolean flushOneDWPT() throws IOException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFlushOneDWPT");
}
// first check if there is one pending
DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush();
if (documentsWriterPerThread == null) {
documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter();
}
if (documentsWriterPerThread != null) {
return doFlush(documentsWriterPerThread);
}
return false; // we didn't flush anything here
}
/** Locks all currently active DWPT and aborts them.
* The returned Closeable should be closed once the locks for the aborted
* DWPTs can be released. */
synchronized Closeable lockAndAbortAll() throws IOException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
}
// Make sure we move all pending tickets into the flush queue:
ticketQueue.forcePurge(ticket -> {
if (ticket.getFlushedSegment() != null) {
pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc());
}
});
List<DocumentsWriterPerThread> writers = new ArrayList<>();
AtomicBoolean released = new AtomicBoolean(false);
final Closeable release = () -> {
// we return this closure to unlock all writers once done
// or if hit an exception below in the try block.
// we can't assign this later otherwise the ref can't be final
if (released.compareAndSet(false, true)) { // only once
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread");
}
perThreadPool.unlockNewWriters();
for (DocumentsWriterPerThread writer : writers) {
writer.unlock();
}
}
};
try {
deleteQueue.clear();
perThreadPool.lockNewWriters();
writers.addAll(perThreadPool.filterAndLock(x -> true));
for (final DocumentsWriterPerThread perThread : writers) {
assert perThread.isHeldByCurrentThread();
abortDocumentsWriterPerThread(perThread);
}
deleteQueue.clear();
// jump over any possible in flight ops:
deleteQueue.skipSequenceNumbers(perThreadPool.size() + 1);
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "finished lockAndAbortAll success=true");
}
return release;
} catch (Throwable t) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "finished lockAndAbortAll success=false");
}
try {
// if something happens here we unlock all states again
release.close();
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
}
/** Returns how many documents were aborted. */
private void abortDocumentsWriterPerThread(final DocumentsWriterPerThread perThread) throws IOException {
assert perThread.isHeldByCurrentThread();
try {
subtractFlushedNumDocs(perThread.getNumDocsInRAM());
perThread.abort();
} finally {
flushControl.doOnAbort(perThread);
}
}
/** returns the maximum sequence number for all previously completed operations */
long getMaxCompletedSequenceNumber() {
return deleteQueue.getMaxCompletedSeqNo();
}
boolean anyChanges() {
/*
* changes are either in a DWPT or in the deleteQueue.
* yet if we currently flush deletes and / or dwpt there
* could be a window where all changes are in the ticket queue
* before they are published to the IW. ie we need to check if the
* ticket queue has any tickets.
*/
boolean anyChanges = numDocsInRAM.get() != 0 || anyDeletions() || ticketQueue.hasTickets() || pendingChangesInCurrentFullFlush;
if (infoStream.isEnabled("DW") && anyChanges) {
infoStream.message("DW", "anyChanges? numDocsInRam=" + numDocsInRAM.get()
+ " deletes=" + anyDeletions() + " hasTickets:"
+ ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+ pendingChangesInCurrentFullFlush);
}
return anyChanges;
}
int getBufferedDeleteTermsSize() {
return deleteQueue.getBufferedUpdatesTermsSize();
}
//for testing
int getNumBufferedDeleteTerms() {
return deleteQueue.numGlobalTermDeletes();
}
boolean anyDeletions() {
return deleteQueue.anyChanges();
}
@Override
public void close() throws IOException {
closed = true;
IOUtils.close(flushControl, perThreadPool);
}
private boolean preUpdate() throws IOException {
ensureOpen();
boolean hasEvents = false;
while (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
hasEvents |= doFlush(flushingDWPT);
}
flushControl.waitIfStalled(); // block if stalled
}
return hasEvents;
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
hasEvents |= applyAllDeletes();
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
} else if (config.checkPendingFlushOnUpdate) {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
hasEvents |= doFlush(nextPendingFlush);
}
}
return hasEvents;
}
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate();
final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the DWPT because IW.close
// waits for all DWPT to be released:
ensureOpen();
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocuments(docs, delNode, flushNotifications);
} finally {
if (dwpt.isAborted()) {
flushControl.doOnAbort(dwpt);
}
// We don't know how many documents were actually
// counted as indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);
} finally {
if (dwpt.isFlushPending() || dwpt.isAborted()) {
dwpt.unlock();
} else {
perThreadPool.marksAsFreeAndUnlock(dwpt);
}
assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort";
}
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {
assert flushingDWPT.hasFlushed() == false;
hasEvents = true;
boolean success = false;
DocumentsWriterFlushQueue.FlushTicket ticket = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+ currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+ " " + flushControl.isFullFlush();
/*
* Since with DWPT the flush process is concurrent and several DWPT
* could flush at the same time we must maintain the order of the
* flushes before we can apply the flushed segment and the frozen global
* deletes it is buffering. The reason for this is that the global
* deletes mark a certain point in time where we took a DWPT out of
* rotation and freeze the global deletes.
*
* Example: A flush 'A' starts and freezes the global deletes, then
* flush 'B' starts and freezes all deletes occurred since 'A' has
* started. if 'B' finishes before 'A' we need to wait until 'A' is done
* otherwise the deletes frozen by 'B' are not applied to 'A' and we
* might miss to deletes documents in 'A'.
*/
try {
assert assertTicketQueueModification(flushingDWPT.deleteQueue);
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush(flushNotifications);
ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
Set<String> files = flushingDWPT.pendingFilesToDelete();
flushNotifications.deleteUnusedFiles(files);
hasEvents = true;
}
if (dwptSuccess == false) {
flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
if (!success && ticket != null) {
// In the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed since the flush
// ticket could hold global deletes see FlushTicket#canPublish()
ticketQueue.markTicketFailed(ticket);
}
}
/*
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
if (ticketQueue.getTicketCount() >= perThreadPool.size()) {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers.
flushNotifications.onTicketBacklog();
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
}
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
flushNotifications.afterSegmentsFlushed();
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
hasEvents = true;
if (applyAllDeletes() == false) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed()/(1024.*1024.),
ramBufferSizeMB));
}
flushNotifications.onDeletesApplied();
}
}
return hasEvents;
}
synchronized long getNextSequenceNumber() {
// this must be synced otherwise the delete queue might change concurrently
return deleteQueue.getNextSequenceNumber();
}
synchronized void resetDeleteQueue(DocumentsWriterDeleteQueue newQueue) {
assert deleteQueue.isAdvanced();
assert newQueue.isAdvanced() == false;
assert deleteQueue.getLastSequenceNumber() <= newQueue.getLastSequenceNumber();
assert deleteQueue.getMaxSeqNo() <= newQueue.getLastSequenceNumber()
: "maxSeqNo: " + deleteQueue.getMaxSeqNo() + " vs. " + newQueue.getLastSequenceNumber();
deleteQueue = newQueue;
}
interface FlushNotifications { // TODO maybe we find a better name for this?
/**
* Called when files were written to disk that are not used anymore. It's the implementation's responsibility
* to clean these files up
*/
void deleteUnusedFiles(Collection<String> files);
/**
* Called when a segment failed to flush.
*/
void flushFailed(SegmentInfo info);
/**
* Called after one or more segments were flushed to disk.
*/
void afterSegmentsFlushed() throws IOException;
/**
* Should be called if a flush or an indexing operation caused a tragic / unrecoverable event.
*/
void onTragicEvent(Throwable event, String message);
/**
* Called once deletes have been applied either after a flush or on a deletes call
*/
void onDeletesApplied();
/**
* Called once the DocumentsWriter ticket queue has a backlog. This means there is an inner thread
* that tries to publish flushed segments but can't keep up with the other threads flushing new segments.
* This likely requires other thread to forcefully purge the buffer to help publishing. This
* can't be done in-place since we might hold index writer locks when this is called. The caller must ensure
* that the purge happens without an index writer lock being held.
*
* @see DocumentsWriter#purgeFlushTickets(boolean, IOUtils.IOConsumer)
*/
void onTicketBacklog();
}
void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed) == false) {
oldValue = numDocsInRAM.get();
}
assert numDocsInRAM.get() >= 0;
}
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
// for asserts
private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
assert currentFullFlushDelQueue == null
|| currentFullFlushDelQueue.isOpen() == false : "Can not replace a full flush queue if the queue is not closed";
currentFullFlushDelQueue = session;
return true;
}
private boolean assertTicketQueueModification(DocumentsWriterDeleteQueue deleteQueue) {
// assign it then we don't need to sync on DW
DocumentsWriterDeleteQueue currentFullFlushDelQueue = this.currentFullFlushDelQueue;
assert currentFullFlushDelQueue == null || currentFullFlushDelQueue == deleteQueue:
"only modifications from the current flushing queue are permitted while doing a full flush";
return true;
}
/*
* FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
long flushAllThreads()
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFullFlush");
}
long seqNo;
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
assert currentFullFlushDelQueue != deleteQueue;
boolean anythingFlushed = false;
try {
DocumentsWriterPerThread flushingDWPT;
// Help out with flushing:
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
assert assertTicketQueueModification(flushingDeleteQueue);
ticketQueue.addDeletes(flushingDeleteQueue);
}
// we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
flushingDeleteQueue.close(); // all DWPT have been processed and this queue has been fully flushed to the ticket-queue
}
if (anythingFlushed) {
return -seqNo;
} else {
return seqNo;
}
}
void finishFullFlush(boolean success) throws IOException {
try {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);
}
assert setFlushingDeleteQueue(null);
if (success) {
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
}
} finally {
pendingChangesInCurrentFullFlush = false;
applyAllDeletes(); // make sure we do execute this since we block applying deletes during full flush
}
}
@Override
public long ramBytesUsed() {
return flushControl.ramBytesUsed();
}
/**
* Returns the number of bytes currently being flushed
*
* This is a subset of the value returned by {@link #ramBytesUsed()}
*/
long getFlushingBytes() {
return flushControl.getFlushingBytes();
}
}