blob: ee5e3837f7b3cdc74a8791748b2d7a5061c6d46a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
/**
* Holds buffered deletes and updates by term or query, once pushed. Pushed
* deletes/updates are write-once, so we shift to more memory efficient data
* structure to hold them. We don't hold docIDs because these are applied on
* flush.
*/
final class FrozenBufferedUpdates {
/* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs
* in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important
* we run this before applying the deletes/updates. */
/* Query we often undercount (say 24 bytes), plus int. */
final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + Integer.BYTES + 24;
// Terms, in sorted order:
final PrefixCodedTerms deleteTerms;
// Parallel array of deleted query, and the docIDUpto for each
final Query[] deleteQueries;
final int[] deleteQueryLimits;
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1);
private final ReentrantLock applyLock = new ReentrantLock();
private final Map<String, FieldUpdatesBuffer> fieldUpdates;
/** How many total documents were deleted/updated. */
public long totalDelCount;
private final int fieldUpdatesCount;
final int bytesUsed;
final int numTermDeletes;
private long delGen = -1; // assigned by BufferedUpdatesStream once pushed
final SegmentCommitInfo privateSegment; // non-null iff this frozen packet represents
// a segment private deletes. in that case is should
// only have Queries and doc values updates
private final InfoStream infoStream;
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
this.infoStream = infoStream;
this.privateSegment = privateSegment;
assert privateSegment == null || updates.deleteTerms.isEmpty() : "segment private packet should only have del queries";
Term termsArray[] = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]);
ArrayUtil.timSort(termsArray);
PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
for (Term term : termsArray) {
builder.add(term);
}
deleteTerms = builder.finish();
deleteQueries = new Query[updates.deleteQueries.size()];
deleteQueryLimits = new int[updates.deleteQueries.size()];
int upto = 0;
for(Map.Entry<Query,Integer> ent : updates.deleteQueries.entrySet()) {
deleteQueries[upto] = ent.getKey();
deleteQueryLimits[upto] = ent.getValue();
upto++;
}
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish);
this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates));
this.fieldUpdatesCount = updates.numFieldUpdates.get();
bytesUsed = (int) ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY)
+ updates.fieldUpdatesBytesUsed.get());
numTermDeletes = updates.numTermDeletes.get();
if (infoStream != null && infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
"compressed %d to %d bytes (%.2f%%) for deletes/updates; private segment %s",
updates.ramBytesUsed(), bytesUsed, 100.*bytesUsed/updates.ramBytesUsed(),
privateSegment));
}
}
/**
* Tries to lock this buffered update instance
* @return true if the lock was successfully acquired. otherwise false.
*/
boolean tryLock() {
return applyLock.tryLock();
}
/**
* locks this buffered update instance
*/
void lock() {
applyLock.lock();
}
/**
* Releases the lock of this buffered update instance
*/
void unlock() {
applyLock.unlock();
}
/**
* Returns true iff this buffered updates instance was already applied
*/
boolean isApplied() {
assert applyLock.isHeldByCurrentThread();
return applied.getCount() == 0;
}
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
* the number of new deleted or updated documents. */
long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
assert applyLock.isHeldByCurrentThread();
if (delGen == -1) {
// we were not yet pushed
throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");
}
assert applied.getCount() != 0;
if (privateSegment != null) {
assert segStates.length == 1;
assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
}
totalDelCount += applyTermDeletes(segStates);
totalDelCount += applyQueryDeletes(segStates);
totalDelCount += applyDocValuesUpdates(segStates);
return totalDelCount;
}
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (fieldUpdates.isEmpty()) {
return 0;
}
long startNS = System.nanoTime();
long updateCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
final boolean isSegmentPrivateDeletes = privateSegment != null;
if (fieldUpdates.isEmpty() == false) {
updateCount += applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes);
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d field updates; %d new updates",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
fieldUpdatesCount,
updateCount));
}
return updateCount;
}
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
Map<String, FieldUpdatesBuffer> updates,
long delGen,
boolean segmentPrivateDeletes) throws IOException {
// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
// marks which documents have already been updated. So e.g. if term T1
// updates doc 7, and then we process term T2 and it updates doc 7 as well,
// we don't apply the update since we know T1 came last and therefore wins
// the update.
// We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
// that these documents aren't even returned.
long updateCount = 0;
// We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
String updateField = fieldUpdate.getKey();
DocValuesFieldUpdates dvUpdates = null;
FieldUpdatesBuffer value = fieldUpdate.getValue();
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, iterator.isSortedTerms());
while ((bufferedUpdate = iterator.next()) != null) {
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
// same document, the last one that came in wins, irrespective of the
// terms lexical order.
// we can apply the updates in terms order if we keep an updatesGen (and
// increment it with every update) and attach it to each NumericUpdate. Note
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.
// TODO: we could at least *collate* by field?
final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
dvUpdates = new NumericDocValuesFieldUpdates
.SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(),
value.getNumericValue(0));
} else {
dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(),
value.getMaxNumeric(), segState.reader.maxDoc());
}
} else {
dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
}
resolvedUpdates.add(dvUpdates);
}
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (bufferedUpdate.hasValue == false) {
docIdConsumer = doc -> update.reset(doc);
} else if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
} else {
docIdConsumer = doc -> update.add(doc, binaryValue);
}
final Bits acceptDocs = segState.rld.getLiveDocs();
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
} else {
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
if (acceptDocs == null || acceptDocs.get(doc)) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
}
}
}
// now freeze & publish:
for (DocValuesFieldUpdates update : resolvedUpdates) {
if (update.any()) {
update.finish();
segState.rld.addDVUpdate(update);
}
}
return updateCount;
}
// Delete by query
private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteQueries.length == 0) {
return 0;
}
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
assert privateSegment != null;
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(docID) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyQueryDeletes took %.2f msec for %d segments and %d queries; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteQueries.length,
delCount));
}
return delCount;
}
private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteTerms.size() == 0) {
return 0;
}
// We apply segment-private deletes on flush:
assert privateSegment == null;
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
assert segState.delGen != delGen: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyTermDeletes took %.2f msec for %d segments and %d del terms; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteTerms.size(),
delCount));
}
return delCount;
}
public void setDelGen(long delGen) {
assert this.delGen == -1: "delGen was already previously set to " + this.delGen;
this.delGen = delGen;
deleteTerms.setDelGen(delGen);
}
public long delGen() {
assert delGen != -1;
return delGen;
}
@Override
public String toString() {
String s = "delGen=" + delGen;
if (numTermDeletes != 0) {
s += " numDeleteTerms=" + numTermDeletes;
if (numTermDeletes != deleteTerms.size()) {
s += " (" + deleteTerms.size() + " unique)";
}
}
if (deleteQueries.length != 0) {
s += " numDeleteQueries=" + deleteQueries.length;
}
if (fieldUpdates.size() > 0) {
s += " fieldUpdates=" + fieldUpdatesCount;
}
if (bytesUsed != 0) {
s += " bytesUsed=" + bytesUsed;
}
if (privateSegment != null) {
s += " privateSegment=" + privateSegment;
}
return s;
}
boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ;
}
/**
* This class helps iterating a term dictionary and consuming all the docs for each terms.
* It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry
* for the given value. It has an optimized way of iterating the term dictionary if the terms are
* passed in sorted order and makes sure terms and postings are reused as much as possible.
*/
static final class TermDocsIterator {
private final TermsProvider provider;
private String field;
private TermsEnum termsEnum;
private PostingsEnum postingsEnum;
private final boolean sortedTerms;
private BytesRef readerTerm;
private BytesRef lastTerm; // only set with asserts
@FunctionalInterface
interface TermsProvider {
Terms terms(String field) throws IOException;
}
TermDocsIterator(Fields fields, boolean sortedTerms) {
this(fields::terms, sortedTerms);
}
TermDocsIterator(LeafReader reader, boolean sortedTerms) {
this(reader::terms, sortedTerms);
}
private TermDocsIterator(TermsProvider provider, boolean sortedTerms) {
this.sortedTerms = sortedTerms;
this.provider = provider;
}
private void setField(String field) throws IOException {
if (this.field == null || this.field.equals(field) == false) {
this.field = field;
Terms terms = provider.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
if (sortedTerms) {
assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field
readerTerm = termsEnum.next();
}
} else {
termsEnum = null;
}
}
}
DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException {
setField(field);
if (termsEnum != null) {
if (sortedTerms) {
assert assertSorted(term);
// in the sorted case we can take advantage of the "seeking forward" property
// this allows us depending on the term dict impl to reuse data-structures internally
// which speed up iteration over terms and docs significantly.
int cmp = term.compareTo(readerTerm);
if (cmp < 0) {
return null; // requested term does not exist in this segment
} else if (cmp == 0) {
return getDocs();
} else {
TermsEnum.SeekStatus status = termsEnum.seekCeil(term);
switch (status) {
case FOUND:
return getDocs();
case NOT_FOUND:
readerTerm = termsEnum.term();
return null;
case END:
// no more terms in this segment
termsEnum = null;
return null;
default:
throw new AssertionError("unknown status");
}
}
} else if (termsEnum.seekExact(term)) {
return getDocs();
}
}
return null;
}
private boolean assertSorted(BytesRef term) {
assert sortedTerms;
assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString();
lastTerm = BytesRef.deepCopyOf(term);
return true;
}
private DocIdSetIterator getDocs() throws IOException {
assert termsEnum != null;
return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
}
}
}