blob: a1c3d70d6f1e038423384d8a6aa16836aa1365bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.uniformsplit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.codecs.BlockTermState;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
/**
* Writes blocks in the block file.
* <p>
* According the Uniform Split technique, the writing combines three steps
* per block, and it is repeated for all the field blocks:
* <ol>
* <li>Select the term with the shortest {@link TermBytes minimal distinguishing prefix}
* (MDP) in the neighborhood of the {@link #targetNumBlockLines target block size}
* (+- {@link #deltaNumLines delta size})</li>
* <li>The selected term becomes the first term of the next block, and its
* MDP is the next block key.</li>
* <li>The current block is written to the {@link UniformSplitPostingsFormat#TERMS_BLOCKS_EXTENSION block file}.
* And its block key is {@link IndexDictionary.Builder#add(BytesRef, long) added}
* to the {@link IndexDictionary index dictionary}.</li>
* </ol>
* <p>
* This stateful {@link BlockWriter} is called repeatedly to
* {@link #addLine(BytesRef, BlockTermState, IndexDictionary.Builder) add}
* all the {@link BlockLine} terms of a field. Then {@link #finishLastBlock}
* is called. And then this {@link BlockWriter} can be reused to add the terms
* of another field.
*
* @lucene.experimental
*/
public class BlockWriter {
protected final int targetNumBlockLines;
protected final int deltaNumLines;
protected final List<BlockLine> blockLines;
protected final IndexOutput blockOutput;
protected final ByteBuffersDataOutput blockLinesWriteBuffer;
protected final ByteBuffersDataOutput termStatesWriteBuffer;
protected final BlockHeader.Serializer blockHeaderWriter;
protected final BlockLine.Serializer blockLineWriter;
protected final DeltaBaseTermStateSerializer termStateSerializer;
protected final BlockEncoder blockEncoder;
protected final ByteBuffersDataOutput blockWriteBuffer;
protected FieldMetadata fieldMetadata;
protected BytesRef lastTerm;
protected final BlockHeader reusableBlockHeader;
protected BytesRef scratchBytesRef;
protected BlockWriter(IndexOutput blockOutput, int targetNumBlockLines, int deltaNumLines, BlockEncoder blockEncoder) {
assert blockOutput != null;
assert targetNumBlockLines > 0;
assert deltaNumLines >= 0;
assert deltaNumLines < targetNumBlockLines;
this.blockOutput = blockOutput;
this.targetNumBlockLines = targetNumBlockLines;
this.deltaNumLines = deltaNumLines;
this.blockEncoder = blockEncoder;
this.blockLines = new ArrayList<>(targetNumBlockLines);
this.blockHeaderWriter = createBlockHeaderSerializer();
this.blockLineWriter = createBlockLineSerializer();
this.termStateSerializer = createDeltaBaseTermStateSerializer();
this.blockLinesWriteBuffer = ByteBuffersDataOutput.newResettableInstance();
this.termStatesWriteBuffer = ByteBuffersDataOutput.newResettableInstance();
this.blockWriteBuffer = ByteBuffersDataOutput.newResettableInstance();
this.reusableBlockHeader = new BlockHeader();
this.scratchBytesRef = new BytesRef();
}
protected BlockHeader.Serializer createBlockHeaderSerializer() {
return new BlockHeader.Serializer();
}
protected BlockLine.Serializer createBlockLineSerializer() {
return new BlockLine.Serializer();
}
protected DeltaBaseTermStateSerializer createDeltaBaseTermStateSerializer() {
return new DeltaBaseTermStateSerializer();
}
/**
* Adds a new {@link BlockLine} term for the current field.
* <p>
* This method determines whether the new term is part of the current block,
* or if it is part of the next block. In the latter case, a new block is started
* (including one or more of the lastly added lines), the current block is
* written to the block file, and the current block key is added to the
* {@link IndexDictionary.Builder}.
*
* @param term The block line term. The {@link BytesRef} instance is used directly,
* the caller is responsible to make a deep copy if needed. This is required
* because we keep a list of block lines until we decide to write the
* current block, and each line must have a different term instance.
* @param blockTermState Block line details.
* @param dictionaryBuilder to which the block keys are added.
*/
protected void addLine(BytesRef term, BlockTermState blockTermState, IndexDictionary.Builder dictionaryBuilder) throws IOException {
assert term != null;
assert blockTermState != null;
int mdpLength = TermBytes.computeMdpLength(lastTerm, term);
blockLines.add(new BlockLine(new TermBytes(mdpLength, term), blockTermState));
lastTerm = term;
if (blockLines.size() >= targetNumBlockLines + deltaNumLines) {
splitAndWriteBlock(dictionaryBuilder);
}
}
/**
* This method is called when there is no more term for the field. It writes
* the remaining lines added with {@link #addLine} as the last block of the
* field and resets this {@link BlockWriter} state. Then this {@link BlockWriter}
* can be used for another field.
*/
protected void finishLastBlock(IndexDictionary.Builder dictionaryBuilder) throws IOException {
while (!blockLines.isEmpty()) {
splitAndWriteBlock(dictionaryBuilder);
}
fieldMetadata = null;
lastTerm = null;
}
/**
* Defines the new block start according to {@link #targetNumBlockLines}
* and {@link #deltaNumLines}.
* The new block is started (including one or more of the lastly added lines),
* the current block is written to the block file, and the current block key
* is added to the {@link IndexDictionary.Builder}.
*/
protected void splitAndWriteBlock(IndexDictionary.Builder dictionaryBuilder) throws IOException {
assert !blockLines.isEmpty();
int numLines = blockLines.size();
if (numLines <= targetNumBlockLines - deltaNumLines) {
writeBlock(blockLines, dictionaryBuilder);
blockLines.clear();
return;
}
int deltaStart = numLines - deltaNumLines * 2;
assert deltaStart >= 1 : "blockLines size: " + numLines;
int minMdpLength = Integer.MAX_VALUE;
int minMdpEndIndex = 0;
for (int i = deltaStart; i < numLines; i++) {
TermBytes term = blockLines.get(i).getTermBytes();
int mdpLength = term.getMdpLength();
if (mdpLength <= minMdpLength) {
minMdpLength = mdpLength;
minMdpEndIndex = i;
}
}
List<BlockLine> subList = blockLines.subList(0, minMdpEndIndex);
writeBlock(subList, dictionaryBuilder);
// Clear the written block lines to keep only the lines composing the next block.
// ArrayList.subList().clear() is O(N) but still fast since we work on a small list.
// It is internally an array copy and an iteration to set array refs to null.
// For clarity we keep that until the day a CircularArrayList is available in the jdk.
subList.clear();
}
/**
* Writes a block and adds its block key to the dictionary builder.
*/
protected void writeBlock(List<BlockLine> blockLines, IndexDictionary.Builder dictionaryBuilder) throws IOException {
long blockStartFP = blockOutput.getFilePointer();
addBlockKey(blockLines, dictionaryBuilder);
int middle = blockLines.size() >> 1;
int middleOffset = -1;
BlockLine previousLine = null;
for (int i = 0, size = blockLines.size(); i < size; i++) {
boolean isIncrementalEncodingSeed = i == 0;
if (i == middle) {
middleOffset = Math.toIntExact(blockLinesWriteBuffer.size());
isIncrementalEncodingSeed = true;
}
BlockLine line = blockLines.get(i);
writeBlockLine(isIncrementalEncodingSeed, line, previousLine);
previousLine = line;
}
reusableBlockHeader.reset(blockLines.size(), termStateSerializer.getBaseDocStartFP(), termStateSerializer.getBasePosStartFP(),
termStateSerializer.getBasePayStartFP(), Math.toIntExact(blockLinesWriteBuffer.size()), middleOffset);
blockHeaderWriter.write(blockWriteBuffer, reusableBlockHeader);
blockLinesWriteBuffer.copyTo(blockWriteBuffer);
termStatesWriteBuffer.copyTo(blockWriteBuffer);
if (blockEncoder == null) {
blockOutput.writeVInt(Math.toIntExact(blockWriteBuffer.size()));
blockWriteBuffer.copyTo(blockOutput);
} else {
BlockEncoder.WritableBytes encodedBytes = blockEncoder.encode(blockWriteBuffer.toDataInput(), blockWriteBuffer.size());
blockOutput.writeVInt(Math.toIntExact(encodedBytes.size()));
encodedBytes.writeTo(blockOutput);
}
blockLinesWriteBuffer.reset();
termStatesWriteBuffer.reset();
blockWriteBuffer.reset();
termStateSerializer.resetBaseStartFP();
updateFieldMetadata(blockStartFP);
}
/**
* updates the field metadata after all lines were written for the block.
*/
protected void updateFieldMetadata(long blockStartFP) {
assert fieldMetadata != null;
if (fieldMetadata.getFirstBlockStartFP() == -1) {
fieldMetadata.setFirstBlockStartFP(blockStartFP);
}
fieldMetadata.setLastBlockStartFP(blockStartFP);
}
void setField(FieldMetadata fieldMetadata) {
this.fieldMetadata = fieldMetadata;
}
protected void writeBlockLine(boolean isIncrementalEncodingSeed, BlockLine line, BlockLine previousLine) throws IOException {
assert fieldMetadata != null;
blockLineWriter.writeLine(blockLinesWriteBuffer, line, previousLine, Math.toIntExact(termStatesWriteBuffer.size()), isIncrementalEncodingSeed);
blockLineWriter.writeLineTermState(termStatesWriteBuffer, line, fieldMetadata.getFieldInfo(), termStateSerializer);
}
/**
* Adds a new block key with its corresponding block file pointer to the
* {@link IndexDictionary.Builder} .
* The block key is the MDP (see {@link TermBytes}) of the block first term.
*/
protected void addBlockKey(List<BlockLine> blockLines, IndexDictionary.Builder dictionaryBuilder) throws IOException {
assert !blockLines.isEmpty();
assert dictionaryBuilder != null;
TermBytes firstTerm = blockLines.get(0).getTermBytes();
assert firstTerm.getTerm().offset == 0;
assert scratchBytesRef.offset == 0;
scratchBytesRef.bytes = firstTerm.getTerm().bytes;
scratchBytesRef.length = firstTerm.getMdpLength();
dictionaryBuilder.add(scratchBytesRef, blockOutput.getFilePointer());
}
}