blob: 8b919a74d034023a8b165a9d13028e494dee0fd0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
final class DocumentsWriterPerThread implements Accountable {
/**
* The IndexingChain must define the {@link #getChain(int, SegmentInfo, Directory, FieldInfos.Builder, LiveIndexWriterConfig, Consumer)} method
* which returns the DocConsumer that the DocumentsWriter calls to process the
* documents.
*/
abstract static class IndexingChain {
abstract DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer);
}
private Throwable abortingException;
private void onAbortingException(Throwable throwable) {
assert throwable != null : "aborting exception must not be null";
assert abortingException == null: "aborting exception has already been set";
abortingException = throwable;
}
final boolean isAborted() {
return aborted;
}
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@Override
DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory,
FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig,
Consumer<Throwable> abortingExceptionConsumer) {
return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer);
}
};
static final class FlushedSegment {
final SegmentCommitInfo segmentInfo;
final FieldInfos fieldInfos;
final FrozenBufferedUpdates segmentUpdates;
final FixedBitSet liveDocs;
final Sorter.DocMap sortMap;
final int delCount;
private FlushedSegment(InfoStream infoStream, SegmentCommitInfo segmentInfo, FieldInfos fieldInfos,
BufferedUpdates segmentUpdates, FixedBitSet liveDocs, int delCount, Sorter.DocMap sortMap) {
this.segmentInfo = segmentInfo;
this.fieldInfos = fieldInfos;
this.segmentUpdates = segmentUpdates != null && segmentUpdates.any() ? new FrozenBufferedUpdates(infoStream, segmentUpdates, segmentInfo) : null;
this.liveDocs = liveDocs;
this.delCount = delCount;
this.sortMap = sortMap;
}
}
/** Called if we hit an exception at a bad time (when
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() throws IOException{
aborted = true;
pendingNumDocs.addAndGet(-numDocsInRAM);
try {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "now abort");
}
try {
consumer.abort();
} finally {
pendingUpdates.clear();
}
} finally {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "done abort");
}
}
}
private final static boolean INFO_VERBOSE = false;
final Codec codec;
final TrackingDirectoryWrapper directory;
private final DocConsumer consumer;
// Updates for our still-in-RAM (to be flushed next) segment
private final BufferedUpdates pendingUpdates;
private final SegmentInfo segmentInfo; // Current segment we are working on
private boolean aborted = false; // True if we aborted
private SetOnce<Boolean> flushPending = new SetOnce<>();
private volatile long lastCommittedBytesUsed;
private SetOnce<Boolean> hasFlushed = new SetOnce<>();
private final FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
private int numDocsInRAM;
final DocumentsWriterDeleteQueue deleteQueue;
private final DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
private final AtomicLong pendingNumDocs;
private final LiveIndexWriterConfig indexWriterConfig;
private final boolean enableTestPoints;
private final ReentrantLock lock = new ReentrantLock();
private int[] deleteDocIDs = new int[0];
private int numDeletedDocIds = 0;
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory,
LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) {
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = indexWriterConfig.getInfoStream();
this.codec = indexWriterConfig.getCodec();
this.pendingNumDocs = pendingNumDocs;
pendingUpdates = new BufferedUpdates(segmentName);
this.deleteQueue = Objects.requireNonNull(deleteQueue);
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort());
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
}
this.enableTestPoints = enableTestPoints;
consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException);
}
final void testPoint(String message) {
if (enableTestPoints) {
assert infoStream.isEnabled("TP"); // don't enable unless you need them.
infoStream.message("TP", message);
}
}
/** Anything that will add N docs to the index should reserve first to
* make sure it's allowed. */
private void reserveOneDoc() {
if (pendingNumDocs.incrementAndGet() > IndexWriter.getActualMaxDocs()) {
// Reserve failed: put the one doc back and throw exc:
pendingNumDocs.decrementAndGet();
throw new IllegalArgumentException("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs());
}
}
long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try {
testPoint("DocumentsWriterPerThread addDocuments start");
assert abortingException == null: "DWPT has hit aborting exception but is still indexing";
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name);
}
final int docsInRamBefore = numDocsInRAM;
boolean allDocsIndexed = false;
try {
for (Iterable<? extends IndexableField> doc : docs) {
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
reserveOneDoc();
consumer.processDocument(numDocsInRAM++, doc);
}
allDocsIndexed = true;
return finishDocuments(deleteNode, docsInRamBefore);
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
// go and mark all docs from this block as deleted
deleteLastDocs(numDocsInRAM - docsInRamBefore);
}
}
} finally {
maybeAbort("updateDocuments", flushNotifications);
}
}
private long finishDocuments(DocumentsWriterDeleteQueue.Node<?> deleteNode, int docIdUpTo) {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
* id.
*
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
long seqNo;
if (deleteNode != null) {
seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, docIdUpTo);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, docIdUpTo);
} else {
deleteSlice.reset();
}
}
return seqNo;
}
// This method marks the last N docs as deleted. This is used
// in the case of a non-aborting exception. There are several cases
// where we fail a document ie. due to an exception during analysis
// that causes the doc to be rejected but won't cause the DWPT to be
// stale nor the entire IW to abort and shutdown. In such a case
// we only mark these docs as deleted and turn it into a livedocs
// during flush
private void deleteLastDocs(int docCount) {
int from = numDocsInRAM-docCount;
int to = numDocsInRAM;
deleteDocIDs = ArrayUtil.grow(deleteDocIDs, numDeletedDocIds + (to-from));
for (int docId = from; docId < to; docId++) {
deleteDocIDs[numDeletedDocIds++] = docId;
}
// NOTE: we do not trigger flush here. This is
// potentially a RAM leak, if you have an app that tries
// to add docs but every single doc always hits a
// non-aborting exception. Allowing a flush here gets
// very messy because we are only invoked when handling
// exceptions so to do this properly, while handling an
// exception we'd have to go off and flush new deletes
// which is risky (likely would hit some other
// confounding exception).
}
/**
* Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread}
*/
public int getNumDocsInRAM() {
// public for FlushPolicy
return numDocsInRAM;
}
/**
* Prepares this DWPT for flushing. This method will freeze and return the
* {@link DocumentsWriterDeleteQueue}s global buffer and apply all pending
* deletes to this DWPT.
*/
FrozenBufferedUpdates prepareFlush() {
assert numDocsInRAM > 0;
final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
/* deleteSlice can possibly be null if we have hit non-aborting exceptions during indexing and never succeeded
adding a document. */
if (deleteSlice != null) {
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingUpdates, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice.reset();
}
return globalUpdates;
}
/** Flush all pending docs to a new segment */
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert flushPending.get() == Boolean.TRUE;
assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM);
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, lastCommittedBytesUsed)));
final double startMBUsed = lastCommittedBytesUsed / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
// doc, eg if analyzer has some problem w/ the text):
if (numDeletedDocIds > 0) {
flushState.liveDocs = new FixedBitSet(numDocsInRAM);
flushState.liveDocs.set(0, numDocsInRAM);
for (int i = 0; i < numDeletedDocIds; i++) {
flushState.liveDocs.clear(deleteDocIDs[i]);
}
flushState.delCountOnFlush = numDeletedDocIds;
deleteDocIDs = new int[0];
}
if (aborted) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: skip because aborting is set");
}
return null;
}
long t0 = System.nanoTime();
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
final Sorter.DocMap sortMap;
try {
DocIdSetIterator softDeletedDocs;
if (indexWriterConfig.getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
} else {
softDeletedDocs = null;
}
sortMap = consumer.flush(flushState);
if (softDeletedDocs == null) {
flushState.softDelCountOnFlush = 0;
} else {
flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs);
assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush;
}
// We clear this here because we already resolved them (private to this segment) when writing postings:
pendingUpdates.clearDeleteTerms();
segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L, StringHelper.randomId());
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs");
infoStream.message("DWPT", "new segment has " + flushState.softDelCountOnFlush + " soft-deleted docs");
infoStream.message("DWPT", "new segment has " +
(flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
(flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
(flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs"));
infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files());
infoStream.message("DWPT", "flushed codec=" + codec);
}
final BufferedUpdates segmentDeletes;
if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numFieldUpdates.get() == 0) {
pendingUpdates.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingUpdates;
}
if (infoStream.isEnabled("DWPT")) {
final double newSegmentSize = segmentInfoPerCommit.sizeInBytes() / 1024. / 1024.;
infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
" docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize));
}
assert segmentInfo != null;
FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, sortMap);
sealFlushedSegment(fs, sortMap, flushNotifications);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
}
return fs;
} catch (Throwable t) {
onAbortingException(t);
throw t;
} finally {
maybeAbort("flush", flushNotifications);
hasFlushed.set(Boolean.TRUE);
}
}
private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
if (abortingException != null && aborted == false) {
// if we are already aborted don't do anything here
try {
abort();
} finally {
// whatever we do here we have to fire this tragic event up.
flushNotifications.onTragicEvent(abortingException, location);
}
}
}
private final Set<String> filesToDelete = new HashSet<>();
Set<String> pendingFilesToDelete() {
return filesToDelete;
}
private FixedBitSet sortLiveDocs(Bits liveDocs, Sorter.DocMap sortMap) {
assert liveDocs != null && sortMap != null;
FixedBitSet sortedLiveDocs = new FixedBitSet(liveDocs.length());
sortedLiveDocs.set(0, liveDocs.length());
for (int i = 0; i < liveDocs.length(); i++) {
if (liveDocs.get(i) == false) {
sortedLiveDocs.clear(sortMap.oldToNew(i));
}
}
return sortedLiveDocs;
}
/**
* Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link FixedBitSet}.
*/
void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert flushedSegment != null;
SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH);
IOContext context = new IOContext(new FlushInfo(newSegment.info.maxDoc(), newSegment.sizeInBytes()));
boolean success = false;
try {
if (indexWriterConfig.getUseCompoundFile()) {
Set<String> originalFiles = newSegment.info.files();
// TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup...
IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);
filesToDelete.addAll(originalFiles);
newSegment.info.setUseCompoundFile(true);
}
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
// above:
codec.segmentInfoFormat().write(directory, newSegment.info, context);
// TODO: ideally we would freeze newSegment here!!
// because any changes after writing the .si will be
// lost...
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
if (flushedSegment.liveDocs != null) {
final int delCount = flushedSegment.delCount;
assert delCount > 0;
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen());
}
// TODO: we should prune the segment if it's 100%
// deleted... but merge will also catch it.
// TODO: in the NRT case it'd be better to hand
// this del vector over to the
// shortly-to-be-opened SegmentReader and let it
// carry the changes; there's no reason to use
// filesystem as intermediary here.
SegmentCommitInfo info = flushedSegment.segmentInfo;
Codec codec = info.info.getCodec();
final FixedBitSet bits;
if (sortMap == null) {
bits = flushedSegment.liveDocs;
} else {
bits = sortLiveDocs(flushedSegment.liveDocs, sortMap);
}
codec.liveDocsFormat().writeLiveDocs(bits, directory, info, delCount, context);
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT",
"hit exception creating compound file for newly flushed segment " + newSegment.info.name);
}
}
}
}
/** Get current segment info we are writing. */
SegmentInfo getSegmentInfo() {
return segmentInfo;
}
@Override
public long ramBytesUsed() {
assert lock.isHeldByCurrentThread();
return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed();
}
@Override
public Collection<Accountable> getChildResources() {
assert lock.isHeldByCurrentThread();
return Collections.unmodifiableList(Arrays.asList(pendingUpdates, consumer));
}
@Override
public String toString() {
return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + ", " + numDeletedDocIds + " deleted docIds" + "]";
}
/**
* Returns true iff this DWPT is marked as flush pending
*/
boolean isFlushPending() {
return flushPending.get() == Boolean.TRUE;
}
/**
* Sets this DWPT as flush pending. This can only be set once.
*/
void setFlushPending() {
flushPending.set(Boolean.TRUE);
}
/**
* Returns the last committed bytes for this DWPT. This method can be called
* without acquiring the DWPTs lock.
*/
long getLastCommittedBytesUsed() {
return lastCommittedBytesUsed;
}
/**
* Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse.
* The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()}
* @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
*/
long commitLastBytesUsed() {
assert isHeldByCurrentThread();
long delta = ramBytesUsed() - lastCommittedBytesUsed;
lastCommittedBytesUsed += delta;
return delta;
}
/**
* Locks this DWPT for exclusive access.
* @see ReentrantLock#lock()
*/
void lock() {
lock.lock();
}
/**
* Acquires the DWPT's lock only if it is not held by another thread at the time
* of invocation.
* @return true if the lock was acquired.
* @see ReentrantLock#tryLock()
*/
boolean tryLock() {
return lock.tryLock();
}
/**
* Returns true if the DWPT's lock is held by the current thread
* @see ReentrantLock#isHeldByCurrentThread()
*/
boolean isHeldByCurrentThread() {
return lock.isHeldByCurrentThread();
}
/**
* Unlocks the DWPT's lock
* @see ReentrantLock#unlock()
*/
void unlock() {
lock.unlock();
}
/**
* Returns <code>true</code> iff this DWPT has been flushed
*/
boolean hasFlushed() {
return hasFlushed.get() == Boolean.TRUE;
}
}