blob: 143ecf9bcf60eb4d6bcc5c30039e38ab98a5c001 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
/* Tracks the stream of {@link BufferedDeletes}.
* When DocumentsWriterPerThread flushes, its buffered
* deletes and updates are appended to this stream. We later
* apply them (resolve them to the actual
* docIDs, per segment) when a merge is started
* (only to the to-be-merged segments). We
* also apply to all segments when NRT reader is pulled,
* commit/close is called, or when too many deletes or updates are
* buffered and must be flushed (by RAM usage or by count).
*
* Each packet is assigned a generation, and each flushed or
* merged segment is also assigned a generation, so we can
* track which BufferedDeletes packets to apply to any given
* segment. */
class BufferedUpdatesStream implements Accountable {
// TODO: maybe linked list?
private final List<FrozenBufferedUpdates> updates = new ArrayList<>();
// Starts at 1 so that SegmentInfos that have never had
// deletes applied (whose bufferedDelGen defaults to 0)
// will be correct:
private long nextGen = 1;
// used only by assert
private Term lastDeleteTerm;
private final InfoStream infoStream;
private final AtomicLong bytesUsed = new AtomicLong();
private final AtomicInteger numTerms = new AtomicInteger();
public BufferedUpdatesStream(InfoStream infoStream) {
this.infoStream = infoStream;
}
// Appends a new packet of buffered deletes to the stream,
// setting its generation:
public synchronized long push(FrozenBufferedUpdates packet) {
/*
* The insert operation must be atomic. If we let threads increment the gen
* and push the packet afterwards we risk that packets are out of order.
* With DWPT this is possible if two or more flushes are racing for pushing
* updates. If the pushed packets get our of order would loose documents
* since deletes are applied to the wrong segments.
*/
packet.setDelGen(nextGen++);
assert packet.any();
assert checkDeleteStats();
assert packet.delGen() < nextGen;
assert updates.isEmpty() || updates.get(updates.size()-1).delGen() < packet.delGen() : "Delete packets must be in order";
updates.add(packet);
numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + updates.size() + " totBytesUsed=" + bytesUsed.get());
}
assert checkDeleteStats();
return packet.delGen();
}
public synchronized void clear() {
updates.clear();
nextGen = 1;
numTerms.set(0);
bytesUsed.set(0);
}
public boolean any() {
return bytesUsed.get() != 0;
}
public int numTerms() {
return numTerms.get();
}
@Override
public long ramBytesUsed() {
return bytesUsed.get();
}
@Override
public Iterable<? extends Accountable> getChildResources() {
return Collections.emptyList();
}
public static class ApplyDeletesResult {
// True if any actual deletes took place:
public final boolean anyDeletes;
// Current gen, for the merged segment:
public final long gen;
// If non-null, contains segments that are 100% deleted
public final List<SegmentCommitInfo> allDeleted;
ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentCommitInfo> allDeleted) {
this.anyDeletes = anyDeletes;
this.gen = gen;
this.allDeleted = allDeleted;
}
}
// Sorts SegmentInfos from smallest to biggest bufferedDelGen:
private static final Comparator<SegmentCommitInfo> sortSegInfoByDelGen = new Comparator<SegmentCommitInfo>() {
@Override
public int compare(SegmentCommitInfo si1, SegmentCommitInfo si2) {
return Long.compare(si1.getBufferedDeletesGen(), si2.getBufferedDeletesGen());
}
};
/** Resolves the buffered deleted Term/Query/docIDs, into
* actual deleted docIDs in the liveDocs MutableBits for
* each SegmentReader. */
public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool readerPool, List<SegmentCommitInfo> infos) throws IOException {
final long t0 = System.currentTimeMillis();
if (infos.size() == 0) {
return new ApplyDeletesResult(false, nextGen++, null);
}
assert checkDeleteStats();
if (!any()) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes: no deletes; skipping");
}
return new ApplyDeletesResult(false, nextGen++, null);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + updates.size());
}
final long gen = nextGen++;
List<SegmentCommitInfo> infos2 = new ArrayList<>();
infos2.addAll(infos);
Collections.sort(infos2, sortSegInfoByDelGen);
CoalescedUpdates coalescedDeletes = null;
boolean anyNewDeletes = false;
int infosIDX = infos2.size()-1;
int delIDX = updates.size()-1;
List<SegmentCommitInfo> allDeleted = null;
while (infosIDX >= 0) {
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
final SegmentCommitInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen();
if (packet != null && segGen < packet.delGen()) {
// System.out.println(" coalesce");
if (coalescedDeletes == null) {
coalescedDeletes = new CoalescedUpdates();
}
if (!packet.isSegmentPrivate) {
/*
* Only coalesce if we are NOT on a segment private del packet: the segment private del packet
* must only applied to segments with the same delGen. Yet, if a segment is already deleted
* from the SI since it had no more documents remaining after some del packets younger than
* its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
* removed.
*/
coalescedDeletes.update(packet);
}
delIDX--;
} else if (packet != null && segGen == packet.delGen()) {
assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen;
//System.out.println(" eq");
// Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info);
final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
final boolean segAllDeletes;
try {
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
if (coalescedDeletes != null) {
//System.out.println(" del coalesced");
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
}
//System.out.println(" del exact");
// Don't delete by Term here; DocumentsWriterPerThread
// already did that on flush:
delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader);
applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), rld, reader, dvUpdates);
applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), rld, reader, dvUpdates);
if (dvUpdates.any()) {
rld.writeFieldUpdates(info.info.dir, dvUpdates);
}
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
assert fullDelCount <= rld.info.info.getDocCount();
segAllDeletes = fullDelCount == rld.info.info.getDocCount();
} finally {
rld.release(reader);
readerPool.release(rld);
}
anyNewDeletes |= delCount > 0;
if (segAllDeletes) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(info);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
}
if (coalescedDeletes == null) {
coalescedDeletes = new CoalescedUpdates();
}
/*
* Since we are on a segment private del packet we must not
* update the coalescedDeletes here! We can simply advance to the
* next packet and seginfo.
*/
delIDX--;
infosIDX--;
info.setBufferedDeletesGen(gen);
} else {
//System.out.println(" gt");
if (coalescedDeletes != null) {
// Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info);
final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
final boolean segAllDeletes;
try {
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
if (dvUpdates.any()) {
rld.writeFieldUpdates(info.info.dir, dvUpdates);
}
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
assert fullDelCount <= rld.info.info.getDocCount();
segAllDeletes = fullDelCount == rld.info.info.getDocCount();
} finally {
rld.release(reader);
readerPool.release(rld);
}
anyNewDeletes |= delCount > 0;
if (segAllDeletes) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(info);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
}
}
info.setBufferedDeletesGen(gen);
infosIDX--;
}
}
assert checkDeleteStats();
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
}
// assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted);
}
synchronized long getNextGen() {
return nextGen++;
}
// Lock order IW -> BD
/* Removes any BufferedDeletes that we no longer need to
* store because all segments in the index have had the
* deletes applied. */
public synchronized void prune(SegmentInfos segmentInfos) {
assert checkDeleteStats();
long minGen = Long.MAX_VALUE;
for(SegmentCommitInfo info : segmentInfos) {
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
}
if (infoStream.isEnabled("BD")) {
Directory dir;
if (segmentInfos.size() > 0) {
dir = segmentInfos.info(0).info.dir;
} else {
dir = null;
}
infoStream.message("BD", "prune sis=" + segmentInfos.toString(dir) + " minGen=" + minGen + " packetCount=" + updates.size());
}
final int limit = updates.size();
for(int delIDX=0;delIDX<limit;delIDX++) {
if (updates.get(delIDX).delGen() >= minGen) {
prune(delIDX);
assert checkDeleteStats();
return;
}
}
// All deletes pruned
prune(limit);
assert !any();
assert checkDeleteStats();
}
private synchronized void prune(int count) {
if (count > 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "pruneDeletes: prune " + count + " packets; " + (updates.size() - count) + " packets remain");
}
for(int delIDX=0;delIDX<count;delIDX++) {
final FrozenBufferedUpdates packet = updates.get(delIDX);
numTerms.addAndGet(-packet.numTermDeletes);
assert numTerms.get() >= 0;
bytesUsed.addAndGet(-packet.bytesUsed);
assert bytesUsed.get() >= 0;
}
updates.subList(0, count).clear();
}
}
// Delete by Term
private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
long delCount = 0;
Fields fields = reader.fields();
if (fields == null) {
// This reader has no postings
return 0;
}
TermsEnum termsEnum = null;
String currentField = null;
DocsEnum docs = null;
assert checkDeleteTerm(null);
boolean any = false;
//System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
for (Term term : termsIter) {
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
// forwards
if (!term.field().equals(currentField)) {
assert currentField == null || currentField.compareTo(term.field()) < 0;
currentField = term.field();
Terms terms = fields.terms(currentField);
if (terms != null) {
termsEnum = terms.iterator(termsEnum);
} else {
termsEnum = null;
}
}
if (termsEnum == null) {
continue;
}
assert checkDeleteTerm(term);
// System.out.println(" term=" + term);
if (termsEnum.seekExact(term.bytes())) {
// we don't need term frequencies for this
DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, DocsEnum.FLAG_NONE);
//System.out.println("BDS: got docsEnum=" + docsEnum);
if (docsEnum != null) {
while (true) {
final int docID = docsEnum.nextDoc();
//System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID);
if (docID == DocIdSetIterator.NO_MORE_DOCS) {
break;
}
if (!any) {
rld.initWritableLiveDocs();
any = true;
}
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (rld.delete(docID)) {
delCount++;
}
}
}
}
}
return delCount;
}
// DocValues updates
private synchronized void applyDocValuesUpdates(Iterable<? extends DocValuesUpdate> updates,
ReadersAndUpdates rld, SegmentReader reader, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
Fields fields = reader.fields();
if (fields == null) {
// This reader has no postings
return;
}
// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
// marks which documents have already been updated. So e.g. if term T1
// updates doc 7, and then we process term T2 and it updates doc 7 as well,
// we don't apply the update since we know T1 came last and therefore wins
// the update.
// We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
// that these documents aren't even returned.
String currentField = null;
TermsEnum termsEnum = null;
DocsEnum docs = null;
//System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader);
for (DocValuesUpdate update : updates) {
Term term = update.term;
int limit = update.docIDUpto;
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms udpate the
// same document, the last one that came in wins, irrespective of the
// terms lexical order.
// we can apply the updates in terms order if we keep an updatesGen (and
// increment it with every update) and attach it to each NumericUpdate. Note
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.
if (!term.field().equals(currentField)) {
// if we change the code to process updates in terms order, enable this assert
// assert currentField == null || currentField.compareTo(term.field()) < 0;
currentField = term.field();
Terms terms = fields.terms(currentField);
if (terms != null) {
termsEnum = terms.iterator(termsEnum);
} else {
termsEnum = null;
continue; // no terms in that field
}
}
if (termsEnum == null) {
continue;
}
// System.out.println(" term=" + term);
if (termsEnum.seekExact(term.bytes())) {
// we don't need term frequencies for this
DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, DocsEnum.FLAG_NONE);
//System.out.println("BDS: got docsEnum=" + docsEnum);
DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type);
if (dvUpdates == null) {
dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, reader.maxDoc());
}
int doc;
while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
//System.out.println(Thread.currentThread().getName() + " numericDVUpdate term=" + term + " doc=" + docID);
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
dvUpdates.add(doc, update.value);
}
}
}
}
public static class QueryAndLimit {
public final Query query;
public final int limit;
public QueryAndLimit(Query query, int limit) {
this.query = query;
this.limit = limit;
}
}
// Delete by query
private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, ReadersAndUpdates rld, final SegmentReader reader) throws IOException {
long delCount = 0;
final LeafReaderContext readerContext = reader.getContext();
boolean any = false;
for (QueryAndLimit ent : queriesIter) {
Query query = ent.query;
int limit = ent.limit;
final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(readerContext, reader.getLiveDocs());
if (docs != null) {
final DocIdSetIterator it = docs.iterator();
if (it != null) {
while(true) {
int doc = it.nextDoc();
if (doc >= limit) {
break;
}
if (!any) {
rld.initWritableLiveDocs();
any = true;
}
if (rld.delete(doc)) {
delCount++;
}
}
}
}
}
return delCount;
}
// used only by assert
private boolean checkDeleteTerm(Term term) {
if (term != null) {
assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
}
// TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
lastDeleteTerm = term == null ? null : new Term(term.field(), BytesRef.deepCopyOf(term.bytes));
return true;
}
// only for assert
private boolean checkDeleteStats() {
int numTerms2 = 0;
long bytesUsed2 = 0;
for(FrozenBufferedUpdates packet : updates) {
numTerms2 += packet.numTermDeletes;
bytesUsed2 += packet.bytesUsed;
}
assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
return true;
}
}