| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.backward_codecs.lucene50.compressing; |
| |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.FLAGS_BITS; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.MAX_DOCUMENTS_PER_CHUNK; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.OFFSETS; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.PACKED_BLOCK_SIZE; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.PAYLOADS; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.POSITIONS; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.VECTORS_EXTENSION; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.VECTORS_INDEX_CODEC_NAME; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.VECTORS_INDEX_EXTENSION; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.VECTORS_META_EXTENSION; |
| import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.VERSION_CURRENT; |
| |
| import java.io.IOException; |
| import java.util.ArrayDeque; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Deque; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import org.apache.lucene.codecs.CodecUtil; |
| import org.apache.lucene.codecs.TermVectorsReader; |
| import org.apache.lucene.codecs.TermVectorsWriter; |
| import org.apache.lucene.codecs.compressing.CompressionMode; |
| import org.apache.lucene.codecs.compressing.Compressor; |
| import org.apache.lucene.codecs.compressing.MatchingReaders; |
| import org.apache.lucene.index.CorruptIndexException; |
| import org.apache.lucene.index.FieldInfo; |
| import org.apache.lucene.index.FieldInfos; |
| import org.apache.lucene.index.Fields; |
| import org.apache.lucene.index.IndexFileNames; |
| import org.apache.lucene.index.MergeState; |
| import org.apache.lucene.index.SegmentInfo; |
| import org.apache.lucene.store.ByteBuffersDataOutput; |
| import org.apache.lucene.store.DataInput; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexInput; |
| import org.apache.lucene.store.IndexOutput; |
| import org.apache.lucene.util.Accountable; |
| import org.apache.lucene.util.ArrayUtil; |
| import org.apache.lucene.util.Bits; |
| import org.apache.lucene.util.BytesRef; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.lucene.util.StringHelper; |
| import org.apache.lucene.util.packed.BlockPackedWriter; |
| import org.apache.lucene.util.packed.PackedInts; |
| |
| /** |
| * {@link TermVectorsWriter} for {@link Lucene50CompressingTermVectorsFormat}. |
| * |
| * @lucene.experimental |
| */ |
| public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWriter { |
| |
| private final String segment; |
| private FieldsIndexWriter indexWriter; |
| private IndexOutput metaStream, vectorsStream; |
| |
| private final CompressionMode compressionMode; |
| private final Compressor compressor; |
| private final int chunkSize; |
| |
| private long numDirtyChunks; // number of incomplete compressed blocks written |
| private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks |
| |
| /** a pending doc */ |
| private class DocData { |
| final int numFields; |
| final Deque<FieldData> fields; |
| final int posStart, offStart, payStart; |
| |
| DocData(int numFields, int posStart, int offStart, int payStart) { |
| this.numFields = numFields; |
| this.fields = new ArrayDeque<>(numFields); |
| this.posStart = posStart; |
| this.offStart = offStart; |
| this.payStart = payStart; |
| } |
| |
| FieldData addField( |
| int fieldNum, int numTerms, boolean positions, boolean offsets, boolean payloads) { |
| final FieldData field; |
| if (fields.isEmpty()) { |
| field = |
| new FieldData( |
| fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart); |
| } else { |
| final FieldData last = fields.getLast(); |
| final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0); |
| final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0); |
| final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0); |
| field = |
| new FieldData( |
| fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart); |
| } |
| fields.add(field); |
| return field; |
| } |
| } |
| |
| private DocData addDocData(int numVectorFields) { |
| FieldData last = null; |
| for (Iterator<DocData> it = pendingDocs.descendingIterator(); it.hasNext(); ) { |
| final DocData doc = it.next(); |
| if (!doc.fields.isEmpty()) { |
| last = doc.fields.getLast(); |
| break; |
| } |
| } |
| final DocData doc; |
| if (last == null) { |
| doc = new DocData(numVectorFields, 0, 0, 0); |
| } else { |
| final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0); |
| final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0); |
| final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0); |
| doc = new DocData(numVectorFields, posStart, offStart, payStart); |
| } |
| pendingDocs.add(doc); |
| return doc; |
| } |
| |
| /** a pending field */ |
| private class FieldData { |
| final boolean hasPositions, hasOffsets, hasPayloads; |
| final int fieldNum, flags, numTerms; |
| final int[] freqs, prefixLengths, suffixLengths; |
| final int posStart, offStart, payStart; |
| int totalPositions; |
| int ord; |
| |
| FieldData( |
| int fieldNum, |
| int numTerms, |
| boolean positions, |
| boolean offsets, |
| boolean payloads, |
| int posStart, |
| int offStart, |
| int payStart) { |
| this.fieldNum = fieldNum; |
| this.numTerms = numTerms; |
| this.hasPositions = positions; |
| this.hasOffsets = offsets; |
| this.hasPayloads = payloads; |
| this.flags = |
| (positions ? POSITIONS : 0) | (offsets ? OFFSETS : 0) | (payloads ? PAYLOADS : 0); |
| this.freqs = new int[numTerms]; |
| this.prefixLengths = new int[numTerms]; |
| this.suffixLengths = new int[numTerms]; |
| this.posStart = posStart; |
| this.offStart = offStart; |
| this.payStart = payStart; |
| totalPositions = 0; |
| ord = 0; |
| } |
| |
| void addTerm(int freq, int prefixLength, int suffixLength) { |
| freqs[ord] = freq; |
| prefixLengths[ord] = prefixLength; |
| suffixLengths[ord] = suffixLength; |
| ++ord; |
| } |
| |
| void addPosition(int position, int startOffset, int length, int payloadLength) { |
| if (hasPositions) { |
| if (posStart + totalPositions == positionsBuf.length) { |
| positionsBuf = ArrayUtil.grow(positionsBuf); |
| } |
| positionsBuf[posStart + totalPositions] = position; |
| } |
| if (hasOffsets) { |
| if (offStart + totalPositions == startOffsetsBuf.length) { |
| final int newLength = ArrayUtil.oversize(offStart + totalPositions, 4); |
| startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength); |
| lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength); |
| } |
| startOffsetsBuf[offStart + totalPositions] = startOffset; |
| lengthsBuf[offStart + totalPositions] = length; |
| } |
| if (hasPayloads) { |
| if (payStart + totalPositions == payloadLengthsBuf.length) { |
| payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf); |
| } |
| payloadLengthsBuf[payStart + totalPositions] = payloadLength; |
| } |
| ++totalPositions; |
| } |
| } |
| |
| private int numDocs; // total number of docs seen |
| private final Deque<DocData> pendingDocs; // pending docs |
| private DocData curDoc; // current document |
| private FieldData curField; // current field |
| private final BytesRef lastTerm; |
| private int[] positionsBuf, startOffsetsBuf, lengthsBuf, payloadLengthsBuf; |
| private final ByteBuffersDataOutput termSuffixes; // buffered term suffixes |
| private final ByteBuffersDataOutput payloadBytes; // buffered term payloads |
| private final BlockPackedWriter writer; |
| |
| /** Sole constructor. */ |
| Lucene50CompressingTermVectorsWriter( |
| Directory directory, |
| SegmentInfo si, |
| String segmentSuffix, |
| IOContext context, |
| String formatName, |
| CompressionMode compressionMode, |
| int chunkSize, |
| int blockShift) |
| throws IOException { |
| assert directory != null; |
| this.segment = si.name; |
| this.compressionMode = compressionMode; |
| this.compressor = compressionMode.newCompressor(); |
| this.chunkSize = chunkSize; |
| |
| numDocs = 0; |
| pendingDocs = new ArrayDeque<>(); |
| termSuffixes = ByteBuffersDataOutput.newResettableInstance(); |
| payloadBytes = ByteBuffersDataOutput.newResettableInstance(); |
| lastTerm = new BytesRef(ArrayUtil.oversize(30, 1)); |
| |
| boolean success = false; |
| try { |
| metaStream = |
| directory.createOutput( |
| IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_META_EXTENSION), |
| context); |
| CodecUtil.writeIndexHeader( |
| metaStream, |
| VECTORS_INDEX_CODEC_NAME + "Meta", |
| VERSION_CURRENT, |
| si.getId(), |
| segmentSuffix); |
| assert CodecUtil.indexHeaderLength(VECTORS_INDEX_CODEC_NAME + "Meta", segmentSuffix) |
| == metaStream.getFilePointer(); |
| |
| vectorsStream = |
| directory.createOutput( |
| IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_EXTENSION), context); |
| CodecUtil.writeIndexHeader( |
| vectorsStream, formatName, VERSION_CURRENT, si.getId(), segmentSuffix); |
| assert CodecUtil.indexHeaderLength(formatName, segmentSuffix) |
| == vectorsStream.getFilePointer(); |
| |
| indexWriter = |
| new FieldsIndexWriter( |
| directory, |
| segment, |
| segmentSuffix, |
| VECTORS_INDEX_EXTENSION, |
| VECTORS_INDEX_CODEC_NAME, |
| si.getId(), |
| blockShift, |
| context); |
| |
| metaStream.writeVInt(PackedInts.VERSION_CURRENT); |
| metaStream.writeVInt(chunkSize); |
| writer = new BlockPackedWriter(vectorsStream, PACKED_BLOCK_SIZE); |
| |
| positionsBuf = new int[1024]; |
| startOffsetsBuf = new int[1024]; |
| lengthsBuf = new int[1024]; |
| payloadLengthsBuf = new int[1024]; |
| |
| success = true; |
| } finally { |
| if (!success) { |
| IOUtils.closeWhileHandlingException(metaStream, vectorsStream, indexWriter, indexWriter); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| IOUtils.close(metaStream, vectorsStream, indexWriter); |
| } finally { |
| metaStream = null; |
| vectorsStream = null; |
| indexWriter = null; |
| } |
| } |
| |
| @Override |
| public void startDocument(int numVectorFields) throws IOException { |
| curDoc = addDocData(numVectorFields); |
| } |
| |
| @Override |
| public void finishDocument() throws IOException { |
| // append the payload bytes of the doc after its terms |
| payloadBytes.copyTo(termSuffixes); |
| payloadBytes.reset(); |
| ++numDocs; |
| if (triggerFlush()) { |
| flush(); |
| } |
| curDoc = null; |
| } |
| |
| @Override |
| public void startField( |
| FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads) |
| throws IOException { |
| curField = curDoc.addField(info.number, numTerms, positions, offsets, payloads); |
| lastTerm.length = 0; |
| } |
| |
| @Override |
| public void finishField() throws IOException { |
| curField = null; |
| } |
| |
| @Override |
| public void startTerm(BytesRef term, int freq) throws IOException { |
| assert freq >= 1; |
| final int prefix; |
| if (lastTerm.length == 0) { |
| // no previous term: no bytes to write |
| prefix = 0; |
| } else { |
| prefix = StringHelper.bytesDifference(lastTerm, term); |
| } |
| curField.addTerm(freq, prefix, term.length - prefix); |
| termSuffixes.writeBytes(term.bytes, term.offset + prefix, term.length - prefix); |
| // copy last term |
| if (lastTerm.bytes.length < term.length) { |
| lastTerm.bytes = new byte[ArrayUtil.oversize(term.length, 1)]; |
| } |
| lastTerm.offset = 0; |
| lastTerm.length = term.length; |
| System.arraycopy(term.bytes, term.offset, lastTerm.bytes, 0, term.length); |
| } |
| |
| @Override |
| public void addPosition(int position, int startOffset, int endOffset, BytesRef payload) |
| throws IOException { |
| assert curField.flags != 0; |
| curField.addPosition( |
| position, startOffset, endOffset - startOffset, payload == null ? 0 : payload.length); |
| if (curField.hasPayloads && payload != null) { |
| payloadBytes.writeBytes(payload.bytes, payload.offset, payload.length); |
| } |
| } |
| |
| private boolean triggerFlush() { |
| return termSuffixes.size() >= chunkSize || pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK; |
| } |
| |
| private void flush() throws IOException { |
| final int chunkDocs = pendingDocs.size(); |
| assert chunkDocs > 0 : chunkDocs; |
| |
| // write the index file |
| indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer()); |
| |
| final int docBase = numDocs - chunkDocs; |
| vectorsStream.writeVInt(docBase); |
| vectorsStream.writeVInt(chunkDocs); |
| |
| // total number of fields of the chunk |
| final int totalFields = flushNumFields(chunkDocs); |
| |
| if (totalFields > 0) { |
| // unique field numbers (sorted) |
| final int[] fieldNums = flushFieldNums(); |
| // offsets in the array of unique field numbers |
| flushFields(totalFields, fieldNums); |
| // flags (does the field have positions, offsets, payloads?) |
| flushFlags(totalFields, fieldNums); |
| // number of terms of each field |
| flushNumTerms(totalFields); |
| // prefix and suffix lengths for each field |
| flushTermLengths(); |
| // term freqs - 1 (because termFreq is always >=1) for each term |
| flushTermFreqs(); |
| // positions for all terms, when enabled |
| flushPositions(); |
| // offsets for all terms, when enabled |
| flushOffsets(fieldNums); |
| // payload lengths for all terms, when enabled |
| flushPayloadLengths(); |
| |
| // compress terms and payloads and write them to the output |
| // |
| // TODO: We could compress in the slices we already have in the buffer (min/max slice |
| // can be set on the buffer itself). |
| byte[] content = termSuffixes.toArrayCopy(); |
| compressor.compress(content, 0, content.length, vectorsStream); |
| } |
| |
| // reset |
| pendingDocs.clear(); |
| curDoc = null; |
| curField = null; |
| termSuffixes.reset(); |
| } |
| |
| private int flushNumFields(int chunkDocs) throws IOException { |
| if (chunkDocs == 1) { |
| final int numFields = pendingDocs.getFirst().numFields; |
| vectorsStream.writeVInt(numFields); |
| return numFields; |
| } else { |
| writer.reset(vectorsStream); |
| int totalFields = 0; |
| for (DocData dd : pendingDocs) { |
| writer.add(dd.numFields); |
| totalFields += dd.numFields; |
| } |
| writer.finish(); |
| return totalFields; |
| } |
| } |
| |
| /** Returns a sorted array containing unique field numbers */ |
| private int[] flushFieldNums() throws IOException { |
| SortedSet<Integer> fieldNums = new TreeSet<>(); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| fieldNums.add(fd.fieldNum); |
| } |
| } |
| |
| final int numDistinctFields = fieldNums.size(); |
| assert numDistinctFields > 0; |
| final int bitsRequired = PackedInts.bitsRequired(fieldNums.last()); |
| final int token = (Math.min(numDistinctFields - 1, 0x07) << 5) | bitsRequired; |
| vectorsStream.writeByte((byte) token); |
| if (numDistinctFields - 1 >= 0x07) { |
| vectorsStream.writeVInt(numDistinctFields - 1 - 0x07); |
| } |
| final PackedInts.Writer writer = |
| PackedInts.getWriterNoHeader( |
| vectorsStream, PackedInts.Format.PACKED, fieldNums.size(), bitsRequired, 1); |
| for (Integer fieldNum : fieldNums) { |
| writer.add(fieldNum); |
| } |
| writer.finish(); |
| |
| int[] fns = new int[fieldNums.size()]; |
| int i = 0; |
| for (Integer key : fieldNums) { |
| fns[i++] = key; |
| } |
| return fns; |
| } |
| |
| private void flushFields(int totalFields, int[] fieldNums) throws IOException { |
| final PackedInts.Writer writer = |
| PackedInts.getWriterNoHeader( |
| vectorsStream, |
| PackedInts.Format.PACKED, |
| totalFields, |
| PackedInts.bitsRequired(fieldNums.length - 1), |
| 1); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| final int fieldNumIndex = Arrays.binarySearch(fieldNums, fd.fieldNum); |
| assert fieldNumIndex >= 0; |
| writer.add(fieldNumIndex); |
| } |
| } |
| writer.finish(); |
| } |
| |
| private void flushFlags(int totalFields, int[] fieldNums) throws IOException { |
| // check if fields always have the same flags |
| boolean nonChangingFlags = true; |
| int[] fieldFlags = new int[fieldNums.length]; |
| Arrays.fill(fieldFlags, -1); |
| outer: |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); |
| assert fieldNumOff >= 0; |
| if (fieldFlags[fieldNumOff] == -1) { |
| fieldFlags[fieldNumOff] = fd.flags; |
| } else if (fieldFlags[fieldNumOff] != fd.flags) { |
| nonChangingFlags = false; |
| break outer; |
| } |
| } |
| } |
| |
| if (nonChangingFlags) { |
| // write one flag per field num |
| vectorsStream.writeVInt(0); |
| final PackedInts.Writer writer = |
| PackedInts.getWriterNoHeader( |
| vectorsStream, PackedInts.Format.PACKED, fieldFlags.length, FLAGS_BITS, 1); |
| for (int flags : fieldFlags) { |
| assert flags >= 0; |
| writer.add(flags); |
| } |
| assert writer.ord() == fieldFlags.length - 1; |
| writer.finish(); |
| } else { |
| // write one flag for every field instance |
| vectorsStream.writeVInt(1); |
| final PackedInts.Writer writer = |
| PackedInts.getWriterNoHeader( |
| vectorsStream, PackedInts.Format.PACKED, totalFields, FLAGS_BITS, 1); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| writer.add(fd.flags); |
| } |
| } |
| assert writer.ord() == totalFields - 1; |
| writer.finish(); |
| } |
| } |
| |
| private void flushNumTerms(int totalFields) throws IOException { |
| int maxNumTerms = 0; |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| maxNumTerms |= fd.numTerms; |
| } |
| } |
| final int bitsRequired = PackedInts.bitsRequired(maxNumTerms); |
| vectorsStream.writeVInt(bitsRequired); |
| final PackedInts.Writer writer = |
| PackedInts.getWriterNoHeader( |
| vectorsStream, PackedInts.Format.PACKED, totalFields, bitsRequired, 1); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| writer.add(fd.numTerms); |
| } |
| } |
| assert writer.ord() == totalFields - 1; |
| writer.finish(); |
| } |
| |
| private void flushTermLengths() throws IOException { |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| for (int i = 0; i < fd.numTerms; ++i) { |
| writer.add(fd.prefixLengths[i]); |
| } |
| } |
| } |
| writer.finish(); |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| for (int i = 0; i < fd.numTerms; ++i) { |
| writer.add(fd.suffixLengths[i]); |
| } |
| } |
| } |
| writer.finish(); |
| } |
| |
| private void flushTermFreqs() throws IOException { |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| for (int i = 0; i < fd.numTerms; ++i) { |
| writer.add(fd.freqs[i] - 1); |
| } |
| } |
| } |
| writer.finish(); |
| } |
| |
| private void flushPositions() throws IOException { |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| if (fd.hasPositions) { |
| int pos = 0; |
| for (int i = 0; i < fd.numTerms; ++i) { |
| int previousPosition = 0; |
| for (int j = 0; j < fd.freqs[i]; ++j) { |
| final int position = positionsBuf[fd.posStart + pos++]; |
| writer.add(position - previousPosition); |
| previousPosition = position; |
| } |
| } |
| assert pos == fd.totalPositions; |
| } |
| } |
| } |
| writer.finish(); |
| } |
| |
| private void flushOffsets(int[] fieldNums) throws IOException { |
| boolean hasOffsets = false; |
| long[] sumPos = new long[fieldNums.length]; |
| long[] sumOffsets = new long[fieldNums.length]; |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| hasOffsets |= fd.hasOffsets; |
| if (fd.hasOffsets && fd.hasPositions) { |
| final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); |
| int pos = 0; |
| for (int i = 0; i < fd.numTerms; ++i) { |
| sumPos[fieldNumOff] += positionsBuf[fd.posStart + fd.freqs[i] - 1 + pos]; |
| sumOffsets[fieldNumOff] += startOffsetsBuf[fd.offStart + fd.freqs[i] - 1 + pos]; |
| pos += fd.freqs[i]; |
| } |
| assert pos == fd.totalPositions; |
| } |
| } |
| } |
| |
| if (!hasOffsets) { |
| // nothing to do |
| return; |
| } |
| |
| final float[] charsPerTerm = new float[fieldNums.length]; |
| for (int i = 0; i < fieldNums.length; ++i) { |
| charsPerTerm[i] = |
| (sumPos[i] <= 0 || sumOffsets[i] <= 0) ? 0 : (float) ((double) sumOffsets[i] / sumPos[i]); |
| } |
| |
| // start offsets |
| for (int i = 0; i < fieldNums.length; ++i) { |
| vectorsStream.writeInt(Float.floatToRawIntBits(charsPerTerm[i])); |
| } |
| |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| if ((fd.flags & OFFSETS) != 0) { |
| final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum); |
| final float cpt = charsPerTerm[fieldNumOff]; |
| int pos = 0; |
| for (int i = 0; i < fd.numTerms; ++i) { |
| int previousPos = 0; |
| int previousOff = 0; |
| for (int j = 0; j < fd.freqs[i]; ++j) { |
| final int position = fd.hasPositions ? positionsBuf[fd.posStart + pos] : 0; |
| final int startOffset = startOffsetsBuf[fd.offStart + pos]; |
| writer.add(startOffset - previousOff - (int) (cpt * (position - previousPos))); |
| previousPos = position; |
| previousOff = startOffset; |
| ++pos; |
| } |
| } |
| } |
| } |
| } |
| writer.finish(); |
| |
| // lengths |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| if ((fd.flags & OFFSETS) != 0) { |
| int pos = 0; |
| for (int i = 0; i < fd.numTerms; ++i) { |
| for (int j = 0; j < fd.freqs[i]; ++j) { |
| writer.add( |
| lengthsBuf[fd.offStart + pos++] - fd.prefixLengths[i] - fd.suffixLengths[i]); |
| } |
| } |
| assert pos == fd.totalPositions; |
| } |
| } |
| } |
| writer.finish(); |
| } |
| |
| private void flushPayloadLengths() throws IOException { |
| writer.reset(vectorsStream); |
| for (DocData dd : pendingDocs) { |
| for (FieldData fd : dd.fields) { |
| if (fd.hasPayloads) { |
| for (int i = 0; i < fd.totalPositions; ++i) { |
| writer.add(payloadLengthsBuf[fd.payStart + i]); |
| } |
| } |
| } |
| } |
| writer.finish(); |
| } |
| |
| @Override |
| public void finish(FieldInfos fis, int numDocs) throws IOException { |
| if (!pendingDocs.isEmpty()) { |
| numDirtyChunks++; // incomplete: we had to force this flush |
| final long expectedChunkDocs = |
| Math.min( |
| MAX_DOCUMENTS_PER_CHUNK, |
| (long) ((double) chunkSize / termSuffixes.size() * pendingDocs.size())); |
| numDirtyDocs += expectedChunkDocs - pendingDocs.size(); |
| flush(); |
| } |
| if (numDocs != this.numDocs) { |
| throw new RuntimeException( |
| "Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs); |
| } |
| indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream); |
| metaStream.writeVLong(numDirtyChunks); |
| metaStream.writeVLong(numDirtyDocs); |
| CodecUtil.writeFooter(metaStream); |
| CodecUtil.writeFooter(vectorsStream); |
| } |
| |
| @Override |
| public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException { |
| assert (curField.hasPositions) == (positions != null); |
| assert (curField.hasOffsets) == (offsets != null); |
| |
| if (curField.hasPositions) { |
| final int posStart = curField.posStart + curField.totalPositions; |
| if (posStart + numProx > positionsBuf.length) { |
| positionsBuf = ArrayUtil.grow(positionsBuf, posStart + numProx); |
| } |
| int position = 0; |
| if (curField.hasPayloads) { |
| final int payStart = curField.payStart + curField.totalPositions; |
| if (payStart + numProx > payloadLengthsBuf.length) { |
| payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf, payStart + numProx); |
| } |
| for (int i = 0; i < numProx; ++i) { |
| final int code = positions.readVInt(); |
| if ((code & 1) != 0) { |
| // This position has a payload |
| final int payloadLength = positions.readVInt(); |
| payloadLengthsBuf[payStart + i] = payloadLength; |
| payloadBytes.copyBytes(positions, payloadLength); |
| } else { |
| payloadLengthsBuf[payStart + i] = 0; |
| } |
| position += code >>> 1; |
| positionsBuf[posStart + i] = position; |
| } |
| } else { |
| for (int i = 0; i < numProx; ++i) { |
| position += (positions.readVInt() >>> 1); |
| positionsBuf[posStart + i] = position; |
| } |
| } |
| } |
| |
| if (curField.hasOffsets) { |
| final int offStart = curField.offStart + curField.totalPositions; |
| if (offStart + numProx > startOffsetsBuf.length) { |
| final int newLength = ArrayUtil.oversize(offStart + numProx, 4); |
| startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength); |
| lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength); |
| } |
| int lastOffset = 0, startOffset, endOffset; |
| for (int i = 0; i < numProx; ++i) { |
| startOffset = lastOffset + offsets.readVInt(); |
| endOffset = startOffset + offsets.readVInt(); |
| lastOffset = endOffset; |
| startOffsetsBuf[offStart + i] = startOffset; |
| lengthsBuf[offStart + i] = endOffset - startOffset; |
| } |
| } |
| |
| curField.totalPositions += numProx; |
| } |
| |
| // bulk merge is scary: its caused corruption bugs in the past. |
| // we try to be extra safe with this impl, but add an escape hatch to |
| // have a workaround for undiscovered bugs. |
| static final String BULK_MERGE_ENABLED_SYSPROP = |
| Lucene50CompressingTermVectorsWriter.class.getName() + ".enableBulkMerge"; |
| static final boolean BULK_MERGE_ENABLED; |
| |
| static { |
| boolean v = true; |
| try { |
| v = Boolean.parseBoolean(System.getProperty(BULK_MERGE_ENABLED_SYSPROP, "true")); |
| } catch (SecurityException ignored) { |
| } |
| BULK_MERGE_ENABLED = v; |
| } |
| |
| @Override |
| public int merge(MergeState mergeState) throws IOException { |
| if (mergeState.needsIndexSort) { |
| // TODO: can we gain back some optos even if index is sorted? E.g. if sort results in large |
| // chunks of contiguous docs from one sub |
| // being copied over...? |
| return super.merge(mergeState); |
| } |
| int docCount = 0; |
| int numReaders = mergeState.maxDocs.length; |
| |
| MatchingReaders matching = new MatchingReaders(mergeState); |
| |
| for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) { |
| Lucene50CompressingTermVectorsReader matchingVectorsReader = null; |
| final TermVectorsReader vectorsReader = mergeState.termVectorsReaders[readerIndex]; |
| if (matching.matchingReaders[readerIndex]) { |
| // we can only bulk-copy if the matching reader is also a CompressingTermVectorsReader |
| if (vectorsReader != null |
| && vectorsReader instanceof Lucene50CompressingTermVectorsReader) { |
| matchingVectorsReader = (Lucene50CompressingTermVectorsReader) vectorsReader; |
| } |
| } |
| |
| final int maxDoc = mergeState.maxDocs[readerIndex]; |
| final Bits liveDocs = mergeState.liveDocs[readerIndex]; |
| |
| if (matchingVectorsReader != null |
| && matchingVectorsReader.getCompressionMode() == compressionMode |
| && matchingVectorsReader.getChunkSize() == chunkSize |
| && matchingVectorsReader.getVersion() == VERSION_CURRENT |
| && matchingVectorsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT |
| && BULK_MERGE_ENABLED |
| && liveDocs == null |
| && !tooDirty(matchingVectorsReader)) { |
| // optimized merge, raw byte copy |
| // its not worth fine-graining this if there are deletions. |
| |
| matchingVectorsReader.checkIntegrity(); |
| |
| // flush any pending chunks |
| if (!pendingDocs.isEmpty()) { |
| flush(); |
| numDirtyChunks++; // incomplete: we had to force this flush |
| } |
| |
| // iterate over each chunk. we use the vectors index to find chunk boundaries, |
| // read the docstart + doccount from the chunk header (we write a new header, since doc |
| // numbers will change), |
| // and just copy the bytes directly. |
| IndexInput rawDocs = matchingVectorsReader.getVectorsStream(); |
| FieldsIndex index = matchingVectorsReader.getIndexReader(); |
| rawDocs.seek(index.getStartPointer(0)); |
| int docID = 0; |
| while (docID < maxDoc) { |
| // read header |
| int base = rawDocs.readVInt(); |
| if (base != docID) { |
| throw new CorruptIndexException( |
| "invalid state: base=" + base + ", docID=" + docID, rawDocs); |
| } |
| int bufferedDocs = rawDocs.readVInt(); |
| |
| // write a new index entry and new header for this chunk. |
| indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer()); |
| vectorsStream.writeVInt(docCount); // rebase |
| vectorsStream.writeVInt(bufferedDocs); |
| docID += bufferedDocs; |
| docCount += bufferedDocs; |
| numDocs += bufferedDocs; |
| |
| if (docID > maxDoc) { |
| throw new CorruptIndexException( |
| "invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc, |
| rawDocs); |
| } |
| |
| // copy bytes until the next chunk boundary (or end of chunk data). |
| // using the stored fields index for this isn't the most efficient, but fast enough |
| // and is a source of redundancy for detecting bad things. |
| final long end; |
| if (docID == maxDoc) { |
| end = matchingVectorsReader.getMaxPointer(); |
| } else { |
| end = index.getStartPointer(docID); |
| } |
| vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer()); |
| } |
| |
| if (rawDocs.getFilePointer() != matchingVectorsReader.getMaxPointer()) { |
| throw new CorruptIndexException( |
| "invalid state: pos=" |
| + rawDocs.getFilePointer() |
| + ", max=" |
| + matchingVectorsReader.getMaxPointer(), |
| rawDocs); |
| } |
| |
| // since we bulk merged all chunks, we inherit any dirty ones from this segment. |
| numDirtyChunks += matchingVectorsReader.getNumDirtyChunks(); |
| numDirtyDocs += matchingVectorsReader.getNumDirtyDocs(); |
| } else { |
| // naive merge... |
| if (vectorsReader != null) { |
| vectorsReader.checkIntegrity(); |
| } |
| for (int i = 0; i < maxDoc; i++) { |
| if (liveDocs != null && liveDocs.get(i) == false) { |
| continue; |
| } |
| Fields vectors; |
| if (vectorsReader == null) { |
| vectors = null; |
| } else { |
| vectors = vectorsReader.get(i); |
| } |
| addAllDocVectors(vectors, mergeState); |
| ++docCount; |
| } |
| } |
| } |
| finish(mergeState.mergeFieldInfos, docCount); |
| return docCount; |
| } |
| |
| /** |
| * Returns true if we should recompress this reader, even though we could bulk merge compressed |
| * data |
| * |
| * <p>The last chunk written for a segment is typically incomplete, so without recompressing, in |
| * some worst-case situations (e.g. frequent reopen with tiny flushes), over time the compression |
| * ratio can degrade. This is a safety switch. |
| */ |
| boolean tooDirty(Lucene50CompressingTermVectorsReader candidate) { |
| // more than 1% dirty, or more than hard limit of 1024 dirty chunks |
| return candidate.getNumDirtyChunks() > 1024 |
| || candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs(); |
| } |
| |
| @Override |
| public long ramBytesUsed() { |
| return positionsBuf.length |
| + startOffsetsBuf.length |
| + lengthsBuf.length |
| + payloadLengthsBuf.length |
| + termSuffixes.ramBytesUsed() |
| + payloadBytes.ramBytesUsed() |
| + lastTerm.bytes.length; |
| } |
| |
| @Override |
| public Collection<Accountable> getChildResources() { |
| return List.of(termSuffixes, payloadBytes); |
| } |
| } |