blob: efe6cff1ded806ade8a5d9c7c5aac019117ffc7c [file] [log] [blame]
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
import java.io.IOException;
import java.io.PrintStream;
import java.text.NumberFormat;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.RamUsageEstimator;
public class DocumentsWriterPerThread {
/**
* The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
* which returns the DocConsumer that the DocumentsWriter calls to process the
* documents.
*/
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
}
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@Override
DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
/*
This is the current indexing chain:
DocConsumer / DocConsumerPerThread
--> code: DocFieldProcessor / DocFieldProcessorPerThread
--> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
--> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
--> code: DocInverter / DocInverterPerThread / DocInverterPerField
--> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
--> code: TermsHash / TermsHashPerThread / TermsHashPerField
--> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
--> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
--> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
--> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
--> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
--> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
*/
// Build up indexing chain:
final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
final InvertedDocConsumer termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter, true,
new TermsHash(documentsWriterPerThread, termVectorsWriter, false, null));
final NormsWriter normsWriter = new NormsWriter();
final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
return new DocFieldProcessor(documentsWriterPerThread, docInverter);
}
};
static class DocState {
final DocumentsWriterPerThread docWriter;
Analyzer analyzer;
PrintStream infoStream;
SimilarityProvider similarityProvider;
int docID;
Document doc;
String maxTermPrefix;
DocState(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
}
// Only called by asserts
public boolean testPoint(String name) {
return docWriter.writer.testPoint(name);
}
public void clear() {
// don't hold onto doc nor analyzer, in case it is
// largish:
doc = null;
analyzer = null;
}
}
static class FlushedSegment {
final SegmentInfo segmentInfo;
final BufferedDeletes segmentDeletes;
final BitVector deletedDocuments;
private FlushedSegment(SegmentInfo segmentInfo,
BufferedDeletes segmentDeletes, BitVector deletedDocuments) {
this.segmentInfo = segmentInfo;
this.segmentDeletes = segmentDeletes;
this.deletedDocuments = deletedDocuments;
}
}
/** Called if we hit an exception at a bad time (when
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() throws IOException {
hasAborted = aborting = true;
try {
if (infoStream != null) {
message("docWriter: now abort");
}
try {
consumer.abort();
} catch (Throwable t) {
}
pendingDeletes.clear();
deleteSlice = deleteQueue.newSlice();
// Reset all postings data
doAfterFlush();
} finally {
aborting = false;
if (infoStream != null) {
message("docWriter: done abort");
}
}
}
final DocumentsWriter parent;
final IndexWriter writer;
final Directory directory;
final DocState docState;
final DocConsumer consumer;
final AtomicLong bytesUsed;
SegmentWriteState flushState;
//Deletes for our still-in-RAM (to be flushed next) segment
BufferedDeletes pendingDeletes;
String segment; // Current segment we are working on
boolean aborting = false; // True if an abort is pending
boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
private FieldInfos fieldInfos;
private PrintStream infoStream;
private int numDocsInRAM;
private int flushedDocCount;
DocumentsWriterDeleteQueue deleteQueue;
DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance();
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
FieldInfos fieldInfos, IndexingChain indexingChain) {
this.directory = directory;
this.parent = parent;
this.fieldInfos = fieldInfos;
this.writer = parent.indexWriter;
this.infoStream = parent.infoStream;
this.docState = new DocState(this);
this.docState.similarityProvider = parent.indexWriter.getConfig()
.getSimilarityProvider();
consumer = indexingChain.getChain(this);
bytesUsed = new AtomicLong(0);
pendingDeletes = new BufferedDeletes(false);
initialize();
}
public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) {
this(other.directory, other.parent, fieldInfos, other.parent.chain);
}
void initialize() {
deleteQueue = parent.deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingDeletes.clear();
deleteSlice = null;
}
void setAborting() {
aborting = true;
}
boolean checkAndResetHasAborted() {
final boolean retval = hasAborted;
hasAborted = false;
return retval;
}
public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
assert writer.testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (segment == null) {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
}
boolean success = false;
try {
try {
consumer.processDocument(fieldInfos);
} finally {
docState.clear();
}
success = true;
} finally {
if (!success) {
if (!aborting) {
// mark document as deleted
deleteDocID(docState.docID);
numDocsInRAM++;
fieldInfos.revertUncommitted();
} else {
abort();
}
}
}
success = false;
try {
consumer.finishDocument();
success = true;
} finally {
if (!success) {
abort();
}
}
finishDocument(delTerm);
}
public int updateDocuments(Iterable<Document> docs, Analyzer analyzer, Term delTerm) throws IOException {
assert writer.testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (segment == null) {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
}
int docCount = 0;
try {
for(Document doc : docs) {
docState.doc = doc;
docState.docID = numDocsInRAM;
docCount++;
boolean success = false;
try {
consumer.processDocument(fieldInfos);
success = true;
} finally {
if (!success) {
// An exc is being thrown...
if (!aborting) {
// One of the documents hit a non-aborting
// exception (eg something happened during
// analysis). We now go and mark any docs
// from this batch that we had already indexed
// as deleted:
int docID = docState.docID;
final int endDocID = docID - docCount;
while (docID > endDocID) {
deleteDocID(docID);
docID--;
}
// Incr here because finishDocument will not
// be called (because an exc is being thrown):
numDocsInRAM++;
fieldInfos.revertUncommitted();
} else {
abort();
}
}
}
success = false;
try {
consumer.finishDocument();
success = true;
} finally {
if (!success) {
abort();
}
}
finishDocument(null);
}
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM-docCount);
}
} finally {
docState.clear();
}
return docCount;
}
private void finishDocument(Term delTerm) throws IOException {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
* id.
*
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
if (deleteSlice == null) {
deleteSlice = deleteQueue.newSlice();
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
deleteSlice.reset();
}
} else {
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else if (deleteQueue.updateSlice(deleteSlice)) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
}
}
++numDocsInRAM;
}
// Buffer a specific docID for deletion. Currently only
// used when we hit a exception when adding a document
void deleteDocID(int docIDUpto) {
pendingDeletes.addDocID(docIDUpto);
// NOTE: we do not trigger flush here. This is
// potentially a RAM leak, if you have an app that tries
// to add docs but every single doc always hits a
// non-aborting exception. Allowing a flush here gets
// very messy because we are only invoked when handling
// exceptions so to do this properly, while handling an
// exception we'd have to go off and flush new deletes
// which is risky (likely would hit some other
// confounding exception).
}
/**
* Returns the number of delete terms in this {@link DocumentsWriterPerThread}
*/
public int numDeleteTerms() {
// public for FlushPolicy
return pendingDeletes.numTermDeletes.get();
}
/**
* Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread}
*/
public int getNumDocsInRAM() {
// public for FlushPolicy
return numDocsInRAM;
}
SegmentCodecs getCodec() {
return flushState.segmentCodecs;
}
/** Reset after a flush */
private void doAfterFlush() throws IOException {
segment = null;
consumer.doAfterFlush();
fieldInfos = new FieldInfos(fieldInfos);
parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
/**
* Prepares this DWPT for flushing. This method will freeze and return the
* {@link DocumentsWriterDeleteQueue}s global buffer and apply all pending
* deletes to this DWPT.
*/
FrozenBufferedDeletes prepareFlush() {
assert numDocsInRAM > 0;
final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice);
/* deleteSlice can possibly be null if we have hit non-aborting exceptions during indexing and never succeeded
adding a document. */
if (deleteSlice != null) {
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingDeletes, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice = null;
}
return globalDeletes;
}
/** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice == null : "all deletes must be applied in prepareFlush";
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
numDocsInRAM, writer.getConfig().getTermIndexInterval(),
fieldInfos.buildSegmentCodecs(true), pendingDeletes);
final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
// doc, eg if analyzer has some problem w/ the text):
if (pendingDeletes.docIDs.size() > 0) {
flushState.deletedDocs = new BitVector(numDocsInRAM);
for(int delDocID : pendingDeletes.docIDs) {
flushState.deletedDocs.set(delDocID);
}
pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
pendingDeletes.docIDs.clear();
}
if (infoStream != null) {
message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
}
if (aborting) {
if (infoStream != null) {
message("flush: skip because aborting is set");
}
return null;
}
boolean success = false;
try {
consumer.flush(flushState);
pendingDeletes.terms.clear();
final SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos.asReadOnly());
if (infoStream != null) {
message("new segment has " + (flushState.deletedDocs == null ? 0 : flushState.deletedDocs.count()) + " deleted docs");
message("new segment has " + (newSegment.getHasVectors() ? "vectors" : "no vectors"));
message("flushedFiles=" + newSegment.files());
message("flushed codecs=" + newSegment.getSegmentCodecs());
}
flushedDocCount += flushState.numDocs;
final BufferedDeletes segmentDeletes;
if (pendingDeletes.queries.isEmpty()) {
pendingDeletes.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
pendingDeletes = new BufferedDeletes(false);
}
if (infoStream != null) {
final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
message("flushed: segment=" + newSegment +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
" (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
" docs/MB=" + nf.format(flushedDocCount / newSegmentSize) +
" new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
}
doAfterFlush();
success = true;
return new FlushedSegment(newSegment, segmentDeletes, flushState.deletedDocs);
} finally {
if (!success) {
if (segment != null) {
synchronized(parent.indexWriter) {
parent.indexWriter.deleter.refresh(segment);
}
}
abort();
}
}
}
/** Get current segment name we are writing. */
String getSegment() {
return segment;
}
long bytesUsed() {
return bytesUsed.get() + pendingDeletes.bytesUsed.get();
}
void message(String message) {
writer.message("DWPT: " + message);
}
/* Initial chunks size of the shared byte[] blocks used to
store postings data */
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
/* if you increase this, you must fix field cache impl for
* getTerms/getTermsIndex requires <= 32768 */
final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
/* Initial chunks size of the shared int[] blocks used to
store postings data */
final static int INT_BLOCK_SHIFT = 13;
final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
/* Allocate another int[] from the shared pool */
int[] getIntBlock() {
int[] b = new int[INT_BLOCK_SIZE];
bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
return b;
}
void recycleIntBlocks(int[][] blocks, int offset, int length) {
bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
}
final Allocator byteBlockAllocator = new DirectTrackingAllocator();
private class DirectTrackingAllocator extends Allocator {
public DirectTrackingAllocator() {
this(BYTE_BLOCK_SIZE);
}
public DirectTrackingAllocator(int blockSize) {
super(blockSize);
}
public byte[] getByteBlock() {
bytesUsed.addAndGet(blockSize);
return new byte[blockSize];
}
@Override
public void recycleByteBlocks(byte[][] blocks, int start, int end) {
bytesUsed.addAndGet(-((end-start)* blockSize));
for (int i = start; i < end; i++) {
blocks[i] = null;
}
}
};
void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
docState.infoStream = infoStream;
}
}