blob: ca15d6a74da9c27682c1db0acb7dcbffff8a5343 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.uniformsplit.sharedterms;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.lucene.codecs.BlockTermState;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PostingsWriterBase;
import org.apache.lucene.codecs.uniformsplit.BlockEncoder;
import org.apache.lucene.codecs.uniformsplit.FSTDictionary;
import org.apache.lucene.codecs.uniformsplit.FieldMetadata;
import org.apache.lucene.codecs.uniformsplit.IndexDictionary;
import org.apache.lucene.codecs.uniformsplit.UniformSplitTermsWriter;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import static org.apache.lucene.codecs.uniformsplit.sharedterms.STUniformSplitPostingsFormat.NAME;
import static org.apache.lucene.codecs.uniformsplit.sharedterms.STUniformSplitPostingsFormat.TERMS_BLOCKS_EXTENSION;
import static org.apache.lucene.codecs.uniformsplit.sharedterms.STUniformSplitPostingsFormat.TERMS_DICTIONARY_EXTENSION;
import static org.apache.lucene.codecs.uniformsplit.sharedterms.STUniformSplitPostingsFormat.VERSION_CURRENT;
/**
* Extends {@link UniformSplitTermsWriter} by sharing all the fields terms
* in the same dictionary and by writing all the fields of a term in the same
* block line.
* <p>
* The {@link STUniformSplitPostingsFormat#TERMS_BLOCKS_EXTENSION block file}
* contains all the term blocks for all fields. Each block line, for a single term,
* may have multiple fields {@link org.apache.lucene.index.TermState}. The
* block file also contains the fields metadata at the end of the file.
* <p>
* The {@link STUniformSplitPostingsFormat#TERMS_DICTIONARY_EXTENSION dictionary file}
* contains a single trie ({@link org.apache.lucene.util.fst.FST} bytes) for all
* fields.
* <p>
* This structure is adapted when there are lots of fields. In this case the shared-terms
* dictionary trie is much smaller.
* <p>
* This {@link org.apache.lucene.codecs.FieldsConsumer} requires a custom
* {@link #merge(MergeState, NormsProducer)} method for efficiency. The
* regular merge would scan all the fields sequentially, which internally would
* scan the whole shared-terms dictionary as many times as there are fields.
* Whereas the custom merge directly scans the internal shared-terms dictionary
* of all segments to merge, thus scanning once whatever the number of fields is.
*
* @lucene.experimental
*/
public class STUniformSplitTermsWriter extends UniformSplitTermsWriter {
public STUniformSplitTermsWriter(PostingsWriterBase postingsWriter, SegmentWriteState state,
BlockEncoder blockEncoder) throws IOException {
this(postingsWriter, state, DEFAULT_TARGET_NUM_BLOCK_LINES, DEFAULT_DELTA_NUM_LINES, blockEncoder);
}
public STUniformSplitTermsWriter(PostingsWriterBase postingsWriter, SegmentWriteState state,
int targetNumBlockLines, int deltaNumLines, BlockEncoder blockEncoder) throws IOException {
this(postingsWriter, state, targetNumBlockLines, deltaNumLines, blockEncoder, FieldMetadata.Serializer.INSTANCE,
NAME, VERSION_CURRENT, TERMS_BLOCKS_EXTENSION, TERMS_DICTIONARY_EXTENSION);
}
protected STUniformSplitTermsWriter(PostingsWriterBase postingsWriter, SegmentWriteState state,
int targetNumBlockLines, int deltaNumLines, BlockEncoder blockEncoder, FieldMetadata.Serializer fieldMetadataWriter,
String codecName, int versionCurrent, String termsBlocksExtension, String dictionaryExtension) throws IOException {
super(postingsWriter, state, targetNumBlockLines, deltaNumLines, blockEncoder, fieldMetadataWriter, codecName, versionCurrent, termsBlocksExtension, dictionaryExtension);
}
@Override
public void write(Fields fields, NormsProducer normsProducer) throws IOException {
writeSegment((blockWriter, dictionaryBuilder) -> writeSingleSegment(fields, normsProducer, blockWriter, dictionaryBuilder));
}
/**
* Writes the new segment with the provided {@link SharedTermsWriter},
* which can be either a single segment writer, or a multiple segment merging writer.
*/
private void writeSegment(SharedTermsWriter termsWriter) throws IOException {
STBlockWriter blockWriter = new STBlockWriter(blockOutput, targetNumBlockLines, deltaNumLines, blockEncoder);
IndexDictionary.Builder dictionaryBuilder = new FSTDictionary.Builder();
Collection<FieldMetadata> fieldMetadataList = termsWriter.writeSharedTerms(blockWriter, dictionaryBuilder);
blockWriter.finishLastBlock(dictionaryBuilder);
int fieldsNumber = writeFieldMetadataList(fieldMetadataList);
writeDictionary(fieldsNumber, dictionaryBuilder);
}
private Collection<FieldMetadata> writeSingleSegment(Fields fields, NormsProducer normsProducer, STBlockWriter blockWriter, IndexDictionary.Builder dictionaryBuilder) throws IOException {
List<FieldMetadata> fieldMetadataList = createFieldMetadataList(new FieldsIterator(fields, fieldInfos), maxDoc);
TermIteratorQueue<FieldTerms> fieldTermsQueue = createFieldTermsQueue(fields, fieldMetadataList);
List<TermIterator<FieldTerms>> groupedFieldTerms = new ArrayList<>(fieldTermsQueue.size());
List<FieldMetadataTermState> termStates = new ArrayList<>(fieldTermsQueue.size());
while (fieldTermsQueue.size() != 0) {
TermIterator<FieldTerms> topFieldTerms = fieldTermsQueue.popTerms();
BytesRef term = BytesRef.deepCopyOf(topFieldTerms.term);
groupByTerm(fieldTermsQueue, topFieldTerms, groupedFieldTerms);
writePostingLines(term, groupedFieldTerms, normsProducer, termStates);
blockWriter.addLine(term, termStates, dictionaryBuilder);
nextTermForIterators(groupedFieldTerms, fieldTermsQueue);
}
return fieldMetadataList;
}
private List<FieldMetadata> createFieldMetadataList(Iterator<FieldInfo> fieldInfos, int maxDoc) {
List<FieldMetadata> fieldMetadataList = new ArrayList<>();
while (fieldInfos.hasNext()) {
FieldMetadata fieldMetadata = new FieldMetadata(fieldInfos.next(), maxDoc);
fieldMetadata.setDictionaryStartFP(dictionaryOutput.getFilePointer());
fieldMetadataList.add(fieldMetadata);
}
return fieldMetadataList;
}
private TermIteratorQueue<FieldTerms> createFieldTermsQueue(Fields fields, List<FieldMetadata> fieldMetadataList) throws IOException {
TermIteratorQueue<FieldTerms> fieldQueue = new TermIteratorQueue<>(fieldMetadataList.size());
for (FieldMetadata fieldMetadata : fieldMetadataList) {
Terms terms = fields.terms(fieldMetadata.getFieldInfo().name);
if (terms != null) {
FieldTerms fieldTerms = new FieldTerms(fieldMetadata, terms.iterator());
if (fieldTerms.nextTerm()) {
// There is at least one term for the field.
fieldQueue.add(fieldTerms);
}
}
}
return fieldQueue;
}
private <T> void groupByTerm(TermIteratorQueue<T> termIteratorQueue, TermIterator<T> topTermIterator, List<TermIterator<T>> groupedTermIterators) {
groupedTermIterators.clear();
groupedTermIterators.add(topTermIterator);
while (termIteratorQueue.size() != 0) {
TermIterator<T> termIterator = termIteratorQueue.top();
if (topTermIterator.term.compareTo(termIterator.term) != 0) {
return;
}
// Same term for another iterator. Combine the iterators.
groupedTermIterators.add(termIterator);
termIteratorQueue.pop();
}
}
private void writePostingLines(BytesRef term, List<? extends TermIterator<FieldTerms>> groupedFieldTerms,
NormsProducer normsProducer, List<FieldMetadataTermState> termStates) throws IOException {
termStates.clear();
for (TermIterator<FieldTerms> fieldTermIterator : groupedFieldTerms) {
FieldTerms fieldTerms = (FieldTerms) fieldTermIterator;
postingsWriter.setField(fieldTerms.fieldMetadata.getFieldInfo());
BlockTermState blockTermState = writePostingLine(fieldTerms.termsEnum, fieldTerms.fieldMetadata, normsProducer);
if (blockTermState != null) {
fieldTerms.fieldMetadata.setLastTerm(term);
termStates.add(new FieldMetadataTermState(fieldTerms.fieldMetadata, blockTermState));
}
}
}
private <T> void nextTermForIterators(List<? extends TermIterator<T>> termIterators,
TermIteratorQueue<T> termIteratorQueue) throws IOException {
for (TermIterator<T> termIterator : termIterators) {
if (termIterator.nextTerm()) {
// There is a next term for the iterator. Add it to the priority queue.
termIteratorQueue.add(termIterator);
}
}
}
private int writeFieldMetadataList(Collection<FieldMetadata> fieldMetadataList) throws IOException {
ByteBuffersDataOutput fieldsOutput = new ByteBuffersDataOutput();
int fieldsNumber = 0;
for (FieldMetadata fieldMetadata : fieldMetadataList) {
if (fieldMetadata.getNumTerms() > 0) {
fieldMetadataWriter.write(fieldsOutput, fieldMetadata);
fieldsNumber++;
}
}
writeFieldsMetadata(fieldsNumber, fieldsOutput);
return fieldsNumber;
}
protected void writeDictionary(int fieldsNumber, IndexDictionary.Builder dictionaryBuilder) throws IOException {
if (fieldsNumber > 0) {
writeDictionary(dictionaryBuilder);
}
CodecUtil.writeFooter(dictionaryOutput);
}
@Override
public void merge(MergeState mergeState, NormsProducer normsProducer) throws IOException {
if (mergeState.needsIndexSort) {
// This custom merging does not support sorted index.
// Fall back to the default merge, which is inefficient for this postings format.
super.merge(mergeState, normsProducer);
return;
}
FieldsProducer[] fieldsProducers = mergeState.fieldsProducers;
List<TermIterator<SegmentTerms>> segmentTermsList = new ArrayList<>(fieldsProducers.length);
for (int segmentIndex = 0; segmentIndex < fieldsProducers.length; segmentIndex++) {
FieldsProducer fieldsProducer = fieldsProducers[segmentIndex];
// Iterate the FieldInfo provided by mergeState.fieldInfos because they may be
// filtered by PerFieldMergeState.
for (FieldInfo fieldInfo : mergeState.fieldInfos[segmentIndex]) {
// Iterate all fields only the get the *first* Terms instanceof STUniformSplitTerms.
// See the break below.
Terms terms = fieldsProducer.terms(fieldInfo.name);
if (terms != null) {
if (!(terms instanceof STUniformSplitTerms)) {
// Terms is not directly an instance of STUniformSplitTerms, it is wrapped/filtered.
// Fall back to the default merge, which is inefficient for this postings format.
super.merge(mergeState, normsProducer);
return;
}
STUniformSplitTerms sharedTerms = (STUniformSplitTerms) terms;
segmentTermsList.add(new SegmentTerms(
segmentIndex, sharedTerms.createMergingBlockReader(), mergeState.docMaps[segmentIndex]));
// We have the STUniformSplitTerms for the segment. Break the field
// loop to iterate the next segment.
break;
}
}
}
writeSegment((blockWriter, dictionaryBuilder) -> mergeSegments(mergeState, normsProducer, segmentTermsList, blockWriter, dictionaryBuilder));
}
private Collection<FieldMetadata> mergeSegments(MergeState mergeState, NormsProducer normsProducer,
List<TermIterator<SegmentTerms>> segmentTermsList,
STBlockWriter blockWriter, IndexDictionary.Builder dictionaryBuilder) throws IOException {
List<FieldMetadata> fieldMetadataList = createFieldMetadataList(mergeState.mergeFieldInfos.iterator(), mergeState.segmentInfo.maxDoc());
Map<String, MergingFieldTerms> fieldTermsMap = createMergingFieldTermsMap(fieldMetadataList, mergeState.fieldsProducers.length);
TermIteratorQueue<SegmentTerms> segmentTermsQueue = createSegmentTermsQueue(segmentTermsList);
List<TermIterator<SegmentTerms>> groupedSegmentTerms = new ArrayList<>(segmentTermsList.size());
Map<String, List<SegmentPostings>> fieldPostingsMap = new HashMap<>(mergeState.fieldInfos.length);
List<MergingFieldTerms> groupedFieldTerms = new ArrayList<>(mergeState.fieldInfos.length);
List<FieldMetadataTermState> termStates = new ArrayList<>(mergeState.fieldInfos.length);
while (segmentTermsQueue.size() != 0) {
TermIterator<SegmentTerms> topSegmentTerms = segmentTermsQueue.popTerms();
BytesRef term = BytesRef.deepCopyOf(topSegmentTerms.term);
groupByTerm(segmentTermsQueue, topSegmentTerms, groupedSegmentTerms);
combineSegmentsFields(groupedSegmentTerms, fieldPostingsMap);
combinePostingsPerField(term, fieldTermsMap, fieldPostingsMap, groupedFieldTerms);
writePostingLines(term, groupedFieldTerms, normsProducer, termStates);
blockWriter.addLine(term, termStates, dictionaryBuilder);
nextTermForIterators(groupedSegmentTerms, segmentTermsQueue);
}
return fieldMetadataList;
}
private Map<String, MergingFieldTerms> createMergingFieldTermsMap(List<FieldMetadata> fieldMetadataList, int numSegments) {
Map<String, MergingFieldTerms> fieldTermsMap = new HashMap<>(fieldMetadataList.size() * 2);
for (FieldMetadata fieldMetadata : fieldMetadataList) {
FieldInfo fieldInfo = fieldMetadata.getFieldInfo();
fieldTermsMap.put(fieldInfo.name, new MergingFieldTerms(fieldMetadata, new STMergingTermsEnum(fieldInfo.name, numSegments)));
}
return fieldTermsMap;
}
private TermIteratorQueue<SegmentTerms> createSegmentTermsQueue(List<TermIterator<SegmentTerms>> segmentTermsList) throws IOException {
TermIteratorQueue<SegmentTerms> segmentQueue = new TermIteratorQueue<>(segmentTermsList.size());
for (TermIterator<SegmentTerms> segmentTerms : segmentTermsList) {
if (segmentTerms.nextTerm()) {
// There is at least one term in the segment
segmentQueue.add(segmentTerms);
}
}
return segmentQueue;
}
private void combineSegmentsFields(List<TermIterator<SegmentTerms>> groupedSegmentTerms, Map<String, List<SegmentPostings>> fieldPostingsMap) {
fieldPostingsMap.clear();
for (TermIterator<SegmentTerms> segmentTermIterator : groupedSegmentTerms) {
SegmentTerms segmentTerms = (SegmentTerms) segmentTermIterator;
for (Map.Entry<String, BlockTermState> fieldTermState : segmentTerms.fieldTermStatesMap.entrySet()) {
List<SegmentPostings> segmentPostingsList = fieldPostingsMap.get(fieldTermState.getKey());
if (segmentPostingsList == null) {
segmentPostingsList = new ArrayList<>(groupedSegmentTerms.size());
fieldPostingsMap.put(fieldTermState.getKey(), segmentPostingsList);
}
segmentPostingsList.add(new SegmentPostings(segmentTerms.segmentIndex, fieldTermState.getValue(), segmentTerms.mergingBlockReader, segmentTerms.docMap));
}
}
}
private void combinePostingsPerField(BytesRef term,
Map<String, MergingFieldTerms> fieldTermsMap,
Map<String, List<SegmentPostings>> fieldPostingsMap,
List<MergingFieldTerms> groupedFieldTerms) {
groupedFieldTerms.clear();
for (Map.Entry<String, List<SegmentPostings>> fieldPostingsEntry : fieldPostingsMap.entrySet()) {
// The field defined in fieldPostingsMap comes from the FieldInfos of the SegmentReadState.
// The fieldTermsMap contains entries for fields coming from the SegmentMergeSate.
// So it is possible that the field is not present in fieldTermsMap because it is removed.
MergingFieldTerms fieldTerms = fieldTermsMap.get(fieldPostingsEntry.getKey());
if (fieldTerms != null) {
fieldTerms.resetIterator(term, fieldPostingsEntry.getValue());
groupedFieldTerms.add(fieldTerms);
}
}
// Keep the fields ordered by their number in the target merge segment.
groupedFieldTerms.sort(Comparator.comparingInt(fieldTerms -> fieldTerms.fieldMetadata.getFieldInfo().number));
}
private interface SharedTermsWriter {
Collection<FieldMetadata> writeSharedTerms(STBlockWriter blockWriter, IndexDictionary.Builder dictionaryBuilder) throws IOException;
}
protected class SegmentPostings {
final int segmentIndex;
final BlockTermState termState;
final STMergingBlockReader mergingBlockReader;
final MergeState.DocMap docMap;
SegmentPostings(int segmentIndex, BlockTermState termState, STMergingBlockReader mergingBlockReader, MergeState.DocMap docMap) {
this.segmentIndex = segmentIndex;
this.termState = termState;
this.mergingBlockReader = mergingBlockReader;
this.docMap = docMap;
}
PostingsEnum getPostings(String fieldName, PostingsEnum reuse, int flags) throws IOException {
return mergingBlockReader.postings(fieldName, termState, reuse, flags);
}
}
private class TermIteratorQueue<T> extends PriorityQueue<TermIterator<T>> {
TermIteratorQueue(int numFields) {
super(numFields);
}
@Override
protected boolean lessThan(TermIterator<T> a, TermIterator<T> b) {
return a.compareTo(b) < 0;
}
TermIterator<T> popTerms() {
TermIterator<T> topTerms = pop();
assert topTerms != null;
assert topTerms.term != null;
return topTerms;
}
}
private abstract class TermIterator<T> implements Comparable<TermIterator<T>> {
BytesRef term;
abstract boolean nextTerm() throws IOException;
@Override
public int compareTo(TermIterator<T> other) {
assert term != null : "Should not be compared when the iterator is exhausted";
int comparison = term.compareTo(other.term);
if (comparison == 0) {
return compareSecondary(other);
}
return comparison;
}
abstract int compareSecondary(TermIterator<T> other);
}
private class FieldTerms extends TermIterator<FieldTerms> {
final FieldMetadata fieldMetadata;
final TermsEnum termsEnum;
FieldTerms(FieldMetadata fieldMetadata, TermsEnum termsEnum) {
this.fieldMetadata = fieldMetadata;
this.termsEnum = termsEnum;
}
@Override
boolean nextTerm() throws IOException {
term = termsEnum.next();
return term != null;
}
@Override
int compareSecondary(TermIterator<FieldTerms> other) {
return Integer.compare(fieldMetadata.getFieldInfo().number, ((FieldTerms) other).fieldMetadata.getFieldInfo().number);
}
}
private class MergingFieldTerms extends FieldTerms {
MergingFieldTerms(FieldMetadata fieldMetadata, STMergingTermsEnum termsEnum) {
super(fieldMetadata, termsEnum);
}
void resetIterator(BytesRef term, List<SegmentPostings> segmentPostingsList) {
((STMergingTermsEnum) termsEnum).reset(term, segmentPostingsList);
}
}
private class SegmentTerms extends TermIterator<SegmentTerms> {
private final Integer segmentIndex;
private final STMergingBlockReader mergingBlockReader;
private final Map<String, BlockTermState> fieldTermStatesMap;
private final MergeState.DocMap docMap;
SegmentTerms(int segmentIndex, STMergingBlockReader mergingBlockReader, MergeState.DocMap docMap) {
this.segmentIndex = segmentIndex;
this.mergingBlockReader = mergingBlockReader;
this.docMap = docMap;
this.fieldTermStatesMap = new HashMap<>();
}
@Override
boolean nextTerm() throws IOException {
term = mergingBlockReader.next();
if (term == null) {
return false;
}
mergingBlockReader.readFieldTermStatesMap(fieldTermStatesMap);
return true;
}
@Override
int compareSecondary(TermIterator<SegmentTerms> other) {
return Integer.compare(segmentIndex, ((SegmentTerms) other).segmentIndex);
}
}
private static class FieldsIterator implements Iterator<FieldInfo> {
private final Iterator<String> fieldNames;
private final FieldInfos fieldInfos;
FieldsIterator(Fields fields, FieldInfos fieldInfos) {
this.fieldNames = fields.iterator();
this.fieldInfos = fieldInfos;
}
@Override
public boolean hasNext() {
return fieldNames.hasNext();
}
@Override
public FieldInfo next() {
return fieldInfos.fieldInfo(fieldNames.next());
}
}
}