blob: 938d4c07993c230cd2d4323a5f5ad50866450198 [file] [log] [blame]
package org.apache.lucene.codecs.lucene40;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
import static org.apache.lucene.codecs.lucene40.Lucene40TermVectorsReader.*;
// TODO: make a new 4.0 TV format that encodes better
// - use startOffset (not endOffset) as base for delta on
// next startOffset because today for syns or ngrams or
// WDF or shingles etc. we are encoding negative vints
// (= slow, 5 bytes per)
// - if doc has no term vectors, write 0 into the tvx
// file; saves a seek to tvd only to read a 0 vint (and
// saves a byte in tvd)
/**
* Lucene 4.0 Term Vectors writer.
* <p>
* It writes .tvd, .tvf, and .tvx files.
*
* @see Lucene40TermVectorsFormat
*/
public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
private final Directory directory;
private final String segment;
private IndexOutput tvx = null, tvd = null, tvf = null;
/** Sole constructor. */
public Lucene40TermVectorsWriter(Directory directory, String segment, IOContext context) throws IOException {
this.directory = directory;
this.segment = segment;
boolean success = false;
try {
// Open files for TermVector storage
tvx = directory.createOutput(IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_INDEX_EXTENSION), context);
CodecUtil.writeHeader(tvx, CODEC_NAME_INDEX, VERSION_CURRENT);
tvd = directory.createOutput(IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_DOCUMENTS_EXTENSION), context);
CodecUtil.writeHeader(tvd, CODEC_NAME_DOCS, VERSION_CURRENT);
tvf = directory.createOutput(IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_FIELDS_EXTENSION), context);
CodecUtil.writeHeader(tvf, CODEC_NAME_FIELDS, VERSION_CURRENT);
assert HEADER_LENGTH_INDEX == tvx.getFilePointer();
assert HEADER_LENGTH_DOCS == tvd.getFilePointer();
assert HEADER_LENGTH_FIELDS == tvf.getFilePointer();
success = true;
} finally {
if (!success) {
abort();
}
}
}
@Override
public void startDocument(int numVectorFields) throws IOException {
lastFieldName = null;
this.numVectorFields = numVectorFields;
tvx.writeLong(tvd.getFilePointer());
tvx.writeLong(tvf.getFilePointer());
tvd.writeVInt(numVectorFields);
fieldCount = 0;
fps = ArrayUtil.grow(fps, numVectorFields);
}
private long fps[] = new long[10]; // pointers to the tvf before writing each field
private int fieldCount = 0; // number of fields we have written so far for this document
private int numVectorFields = 0; // total number of fields we will write for this document
private String lastFieldName;
@Override
public void startField(FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads) throws IOException {
assert lastFieldName == null || info.name.compareTo(lastFieldName) > 0: "fieldName=" + info.name + " lastFieldName=" + lastFieldName;
lastFieldName = info.name;
this.positions = positions;
this.offsets = offsets;
this.payloads = payloads;
lastTerm.length = 0;
lastPayloadLength = -1; // force first payload to write its length
fps[fieldCount++] = tvf.getFilePointer();
tvd.writeVInt(info.number);
tvf.writeVInt(numTerms);
byte bits = 0x0;
if (positions)
bits |= Lucene40TermVectorsReader.STORE_POSITIONS_WITH_TERMVECTOR;
if (offsets)
bits |= Lucene40TermVectorsReader.STORE_OFFSET_WITH_TERMVECTOR;
if (payloads)
bits |= Lucene40TermVectorsReader.STORE_PAYLOAD_WITH_TERMVECTOR;
tvf.writeByte(bits);
}
@Override
public void finishDocument() throws IOException {
assert fieldCount == numVectorFields;
for (int i = 1; i < fieldCount; i++) {
tvd.writeVLong(fps[i] - fps[i-1]);
}
}
private final BytesRef lastTerm = new BytesRef(10);
// NOTE: we override addProx, so we don't need to buffer when indexing.
// we also don't buffer during bulk merges.
private int offsetStartBuffer[] = new int[10];
private int offsetEndBuffer[] = new int[10];
private BytesRef payloadData = new BytesRef(10);
private int bufferedIndex = 0;
private int bufferedFreq = 0;
private boolean positions = false;
private boolean offsets = false;
private boolean payloads = false;
@Override
public void startTerm(BytesRef term, int freq) throws IOException {
final int prefix = StringHelper.bytesDifference(lastTerm, term);
final int suffix = term.length - prefix;
tvf.writeVInt(prefix);
tvf.writeVInt(suffix);
tvf.writeBytes(term.bytes, term.offset + prefix, suffix);
tvf.writeVInt(freq);
lastTerm.copyBytes(term);
lastPosition = lastOffset = 0;
if (offsets && positions) {
// we might need to buffer if its a non-bulk merge
offsetStartBuffer = ArrayUtil.grow(offsetStartBuffer, freq);
offsetEndBuffer = ArrayUtil.grow(offsetEndBuffer, freq);
}
bufferedIndex = 0;
bufferedFreq = freq;
payloadData.length = 0;
}
int lastPosition = 0;
int lastOffset = 0;
int lastPayloadLength = -1; // force first payload to write its length
BytesRef scratch = new BytesRef(); // used only by this optimized flush below
@Override
public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException {
if (payloads) {
// TODO, maybe overkill and just call super.addProx() in this case?
// we do avoid buffering the offsets in RAM though.
for (int i = 0; i < numProx; i++) {
int code = positions.readVInt();
if ((code & 1) == 1) {
int length = positions.readVInt();
scratch.grow(length);
scratch.length = length;
positions.readBytes(scratch.bytes, scratch.offset, scratch.length);
writePosition(code >>> 1, scratch);
} else {
writePosition(code >>> 1, null);
}
}
tvf.writeBytes(payloadData.bytes, payloadData.offset, payloadData.length);
} else if (positions != null) {
// pure positions, no payloads
for (int i = 0; i < numProx; i++) {
tvf.writeVInt(positions.readVInt() >>> 1);
}
}
if (offsets != null) {
for (int i = 0; i < numProx; i++) {
tvf.writeVInt(offsets.readVInt());
tvf.writeVInt(offsets.readVInt());
}
}
}
@Override
public void addPosition(int position, int startOffset, int endOffset, BytesRef payload) throws IOException {
if (positions && (offsets || payloads)) {
// write position delta
writePosition(position - lastPosition, payload);
lastPosition = position;
// buffer offsets
if (offsets) {
offsetStartBuffer[bufferedIndex] = startOffset;
offsetEndBuffer[bufferedIndex] = endOffset;
}
bufferedIndex++;
} else if (positions) {
// write position delta
writePosition(position - lastPosition, payload);
lastPosition = position;
} else if (offsets) {
// write offset deltas
tvf.writeVInt(startOffset - lastOffset);
tvf.writeVInt(endOffset - startOffset);
lastOffset = endOffset;
}
}
@Override
public void finishTerm() throws IOException {
if (bufferedIndex > 0) {
// dump buffer
assert positions && (offsets || payloads);
assert bufferedIndex == bufferedFreq;
if (payloads) {
tvf.writeBytes(payloadData.bytes, payloadData.offset, payloadData.length);
}
if (offsets) {
for (int i = 0; i < bufferedIndex; i++) {
tvf.writeVInt(offsetStartBuffer[i] - lastOffset);
tvf.writeVInt(offsetEndBuffer[i] - offsetStartBuffer[i]);
lastOffset = offsetEndBuffer[i];
}
}
}
}
private void writePosition(int delta, BytesRef payload) throws IOException {
if (payloads) {
int payloadLength = payload == null ? 0 : payload.length;
if (payloadLength != lastPayloadLength) {
lastPayloadLength = payloadLength;
tvf.writeVInt((delta<<1)|1);
tvf.writeVInt(payloadLength);
} else {
tvf.writeVInt(delta << 1);
}
if (payloadLength > 0) {
if (payloadLength + payloadData.length < 0) {
// we overflowed the payload buffer, just throw UOE
// having > Integer.MAX_VALUE bytes of payload for a single term in a single doc is nuts.
throw new UnsupportedOperationException("A term cannot have more than Integer.MAX_VALUE bytes of payload data in a single document");
}
payloadData.append(payload);
}
} else {
tvf.writeVInt(delta);
}
}
@Override
public void abort() {
try {
close();
} catch (Throwable ignored) {}
IOUtils.deleteFilesIgnoringExceptions(directory, IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_INDEX_EXTENSION),
IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_DOCUMENTS_EXTENSION),
IndexFileNames.segmentFileName(segment, "", Lucene40TermVectorsReader.VECTORS_FIELDS_EXTENSION));
}
/**
* Do a bulk copy of numDocs documents from reader to our
* streams. This is used to expedite merging, if the
* field numbers are congruent.
*/
private void addRawDocuments(Lucene40TermVectorsReader reader, int[] tvdLengths, int[] tvfLengths, int numDocs) throws IOException {
long tvdPosition = tvd.getFilePointer();
long tvfPosition = tvf.getFilePointer();
long tvdStart = tvdPosition;
long tvfStart = tvfPosition;
for(int i=0;i<numDocs;i++) {
tvx.writeLong(tvdPosition);
tvdPosition += tvdLengths[i];
tvx.writeLong(tvfPosition);
tvfPosition += tvfLengths[i];
}
tvd.copyBytes(reader.getTvdStream(), tvdPosition-tvdStart);
tvf.copyBytes(reader.getTvfStream(), tvfPosition-tvfStart);
assert tvd.getFilePointer() == tvdPosition;
assert tvf.getFilePointer() == tvfPosition;
}
@Override
public final int merge(MergeState mergeState) throws IOException {
// Used for bulk-reading raw bytes for term vectors
int rawDocLengths[] = new int[MAX_RAW_MERGE_DOCS];
int rawDocLengths2[] = new int[MAX_RAW_MERGE_DOCS];
int idx = 0;
int numDocs = 0;
for (int i = 0; i < mergeState.readers.size(); i++) {
final AtomicReader reader = mergeState.readers.get(i);
final SegmentReader matchingSegmentReader = mergeState.matchingSegmentReaders[idx++];
Lucene40TermVectorsReader matchingVectorsReader = null;
if (matchingSegmentReader != null) {
TermVectorsReader vectorsReader = matchingSegmentReader.getTermVectorsReader();
if (vectorsReader != null && vectorsReader instanceof Lucene40TermVectorsReader) {
matchingVectorsReader = (Lucene40TermVectorsReader) vectorsReader;
}
}
if (reader.getLiveDocs() != null) {
numDocs += copyVectorsWithDeletions(mergeState, matchingVectorsReader, reader, rawDocLengths, rawDocLengths2);
} else {
numDocs += copyVectorsNoDeletions(mergeState, matchingVectorsReader, reader, rawDocLengths, rawDocLengths2);
}
}
finish(mergeState.fieldInfos, numDocs);
return numDocs;
}
/** Maximum number of contiguous documents to bulk-copy
when merging term vectors */
private final static int MAX_RAW_MERGE_DOCS = 4192;
private int copyVectorsWithDeletions(MergeState mergeState,
final Lucene40TermVectorsReader matchingVectorsReader,
final AtomicReader reader,
int rawDocLengths[],
int rawDocLengths2[])
throws IOException {
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int totalNumDocs = 0;
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
for (int docNum = 0; docNum < maxDoc;) {
if (!liveDocs.get(docNum)) {
// skip deleted docs
++docNum;
continue;
}
// We can optimize this case (doing a bulk byte copy) since the field
// numbers are identical
int start = docNum, numDocs = 0;
do {
docNum++;
numDocs++;
if (docNum >= maxDoc) break;
if (!liveDocs.get(docNum)) {
docNum++;
break;
}
} while(numDocs < MAX_RAW_MERGE_DOCS);
matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs);
addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs);
totalNumDocs += numDocs;
mergeState.checkAbort.work(300 * numDocs);
}
} else {
for (int docNum = 0; docNum < maxDoc; docNum++) {
if (!liveDocs.get(docNum)) {
// skip deleted docs
continue;
}
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
Fields vectors = reader.getTermVectors(docNum);
addAllDocVectors(vectors, mergeState);
totalNumDocs++;
mergeState.checkAbort.work(300);
}
}
return totalNumDocs;
}
private int copyVectorsNoDeletions(MergeState mergeState,
final Lucene40TermVectorsReader matchingVectorsReader,
final AtomicReader reader,
int rawDocLengths[],
int rawDocLengths2[])
throws IOException {
final int maxDoc = reader.maxDoc();
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
int docCount = 0;
while (docCount < maxDoc) {
int len = Math.min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, docCount, len);
addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, len);
docCount += len;
mergeState.checkAbort.work(300 * len);
}
} else {
for (int docNum = 0; docNum < maxDoc; docNum++) {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
Fields vectors = reader.getTermVectors(docNum);
addAllDocVectors(vectors, mergeState);
mergeState.checkAbort.work(300);
}
}
return maxDoc;
}
@Override
public void finish(FieldInfos fis, int numDocs) {
if (HEADER_LENGTH_INDEX+((long) numDocs)*16 != tvx.getFilePointer())
// This is most likely a bug in Sun JRE 1.6.0_04/_05;
// we detect that the bug has struck, here, and
// throw an exception to prevent the corruption from
// entering the index. See LUCENE-1282 for
// details.
throw new RuntimeException("tvx size mismatch: mergedDocs is " + numDocs + " but tvx size is " + tvx.getFilePointer() + " file=" + tvx.toString() + "; now aborting this merge to prevent index corruption");
}
/** Close all streams. */
@Override
public void close() throws IOException {
// make an effort to close all streams we can but remember and re-throw
// the first exception encountered in this process
IOUtils.close(tvx, tvd, tvf);
tvx = tvd = tvf = null;
}
@Override
public Comparator<BytesRef> getComparator() {
return BytesRef.getUTF8SortedAsUnicodeComparator();
}
}