blob: b36531fdaf7711ec5cea19e8c63102f6bd718c43 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.search.Query;
/**
* {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
* queue. In contrast to other queue implementation we only maintain the
* tail of the queue. A delete queue is always used in a context of a set of
* DWPTs and a global delete pool. Each of the DWPT and the global pool need to
* maintain their 'own' head of the queue (as a DeleteSlice instance per DWPT).
* The difference between the DWPT and the global pool is that the DWPT starts
* maintaining a head once it has added its first document since for its segments
* private deletes only the deletes after that document are relevant. The global
* pool instead starts maintaining the head once this instance is created by
* taking the sentinel instance as its initial head.
* <p>
* Since each {@link DeleteSlice} maintains its own head and the list is only
* single linked the garbage collector takes care of pruning the list for us.
* All nodes in the list that are still relevant should be either directly or
* indirectly referenced by one of the DWPT's private {@link DeleteSlice} or by
* the global {@link BufferedUpdates} slice.
* <p>
* Each DWPT as well as the global delete pool maintain their private
* DeleteSlice instance. In the DWPT case updating a slice is equivalent to
* atomically finishing the document. The slice update guarantees a "happens
* before" relationship to all other updates in the same indexing session. When a
* DWPT updates a document it:
*
* <ol>
* <li>consumes a document and finishes its processing</li>
* <li>updates its private {@link DeleteSlice} either by calling
* {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the
* document has a delTerm)</li>
* <li>applies all deletes in the slice to its private {@link BufferedUpdates}
* and resets it</li>
* <li>increments its internal document id</li>
* </ol>
*
* The DWPT also doesn't apply its current documents delete term until it has
* updated its delete slice which ensures the consistency of the update. If the
* update fails before the DeleteSlice could have been updated the deleteTerm
* will also not be added to its private deletes neither to the global deletes.
*
*/
final class DocumentsWriterDeleteQueue {
private volatile Node<?> tail;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue,Node> tailUpdater = AtomicReferenceFieldUpdater
.newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
/* only acquired to update the global deletes */
private final ReentrantLock globalBufferLock = new ReentrantLock();
final long generation;
DocumentsWriterDeleteQueue() {
this(0);
}
DocumentsWriterDeleteQueue(long generation) {
this(new BufferedUpdates(), generation);
}
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
*/
tail = new Node<Object>(null); // sentinel
globalSlice = new DeleteSlice(tail);
}
void addDelete(Query... queries) {
add(new QueryArrayNode(queries));
tryApplyGlobalSlice();
}
void addDelete(Term... terms) {
add(new TermArrayNode(terms));
tryApplyGlobalSlice();
}
void addNumericUpdate(NumericUpdate update) {
add(new NumericUpdateNode(update));
tryApplyGlobalSlice();
}
/**
* invariant for document update
*/
void add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
add(termNode);
/*
* this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic
* with regards to the given delete slice. This means if two threads try to
* update the same document with in turn the same delTerm one of them must
* win. By taking the node we have created for our del term as the new tail
* it is guaranteed that if another thread adds the same right after us we
* will apply this delete next time we update our slice and one of the two
* competing updates wins!
*/
slice.sliceTail = termNode;
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so?
}
void add(Node<?> item) {
/*
* this non-blocking / 'wait-free' linked list add was inspired by Apache
* Harmony's ConcurrentLinkedQueue Implementation.
*/
while (true) {
final Node<?> currentTail = this.tail;
final Node<?> tailNext = currentTail.next;
if (tail == currentTail) {
if (tailNext != null) {
/*
* we are in intermediate state here. the tails next pointer has been
* advanced but the tail itself might not be updated yet. help to
* advance the tail and try again updating it.
*/
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
* we are in quiescent state and can try to insert the item to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
if (currentTail.casNext(null, item)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
tailUpdater.compareAndSet(this, currentTail, item);
return;
}
}
}
}
}
boolean anyChanges() {
globalBufferLock.lock();
try {
/*
* check if all items in the global slice were applied
* and if the global slice is up-to-date
* and if globalBufferedUpdates has changes
*/
return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail
|| tail.next != null;
} finally {
globalBufferLock.unlock();
}
}
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
/*
* The global buffer must be locked but we don't need to update them if
* there is an update going on right now. It is sufficient to apply the
* deletes that have been added after the current in-flight global slices
* tail the next time we can get the lock!
*/
try {
if (updateSlice(globalSlice)) {
// System.out.println(Thread.currentThread() + ": apply globalSlice");
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
} finally {
globalBufferLock.unlock();
}
}
}
FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) {
globalBufferLock.lock();
/*
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node<?> currentTail = tail; // take the current tail make this local any
// Changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// Update the callers slices so we are on the same page
callerSlice.sliceTail = currentTail;
}
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(
globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
} finally {
globalBufferLock.unlock();
}
}
DeleteSlice newSlice() {
return new DeleteSlice(tail);
}
boolean updateSlice(DeleteSlice slice) {
if (slice.sliceTail != tail) { // If we are the same just
slice.sliceTail = tail;
return true;
}
return false;
}
static class DeleteSlice {
// No need to be volatile, slices are thread captive (only accessed by one thread)!
Node<?> sliceHead; // we don't apply this one
Node<?> sliceTail;
DeleteSlice(Node<?> currentTail) {
assert currentTail != null;
/*
* Initially this is a 0 length slice pointing to the 'current' tail of
* the queue. Once we update the slice we only need to assign the tail and
* have a new slice
*/
sliceHead = sliceTail = currentTail;
}
void apply(BufferedUpdates del, int docIDUpto) {
if (sliceHead == sliceTail) {
// 0 length slice
return;
}
/*
* When we apply a slice we take the head and get its next as our first
* item to apply and continue until we applied the tail. If the head and
* tail in this slice are not equal then there will be at least one more
* non-null node in the slice!
*/
Node<?> current = sliceHead;
do {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail);
reset();
}
void reset() {
// Reset to a 0 length slice
sliceHead = sliceTail;
}
/**
* Returns <code>true</code> iff the given item is identical to the item
* hold by the slices tail, otherwise <code>false</code>.
*/
boolean isTailItem(Object item) {
return sliceTail.item == item;
}
boolean isEmpty() {
return sliceHead == sliceTail;
}
}
public int numGlobalTermDeletes() {
return globalBufferedUpdates.numTermDeletes.get();
}
void clear() {
globalBufferLock.lock();
try {
final Node<?> currentTail = tail;
globalSlice.sliceHead = globalSlice.sliceTail = currentTail;
globalBufferedUpdates.clear();
} finally {
globalBufferLock.unlock();
}
}
private static class Node<T> {
volatile Node<?> next;
final T item;
Node(T item) {
this.item = item;
}
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<Node,Node> nextUpdater = AtomicReferenceFieldUpdater
.newUpdater(Node.class, Node.class, "next");
void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
throw new IllegalStateException("sentinel item must never be applied");
}
boolean casNext(Node<?> cmp, Node<?> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
}
private static final class TermNode extends Node<Term> {
TermNode(Term term) {
super(term);
}
@Override
void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm(item, docIDUpto);
}
@Override
public String toString() {
return "del=" + item;
}
}
private static final class QueryArrayNode extends Node<Query[]> {
QueryArrayNode(Query[] query) {
super(query);
}
@Override
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (Query query : item) {
bufferedUpdates.addQuery(query, docIDUpto);
}
}
}
private static final class TermArrayNode extends Node<Term[]> {
TermArrayNode(Term[] term) {
super(term);
}
@Override
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (Term term : item) {
bufferedUpdates.addTerm(term, docIDUpto);
}
}
@Override
public String toString() {
return "dels=" + Arrays.toString(item);
}
}
private static final class NumericUpdateNode extends Node<NumericUpdate> {
NumericUpdateNode(NumericUpdate update) {
super(update);
}
@Override
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
bufferedUpdates.addNumericUpdate(item, docIDUpto);
}
@Override
public String toString() {
return "update=" + item;
}
}
private boolean forceApplyGlobalSlice() {
globalBufferLock.lock();
final Node<?> currentTail = tail;
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
return globalBufferedUpdates.any();
} finally {
globalBufferLock.unlock();
}
}
public int getBufferedUpdatesTermsSize() {
globalBufferLock.lock();
try {
forceApplyGlobalSlice();
return globalBufferedUpdates.terms.size();
} finally {
globalBufferLock.unlock();
}
}
public long bytesUsed() {
return globalBufferedUpdates.bytesUsed.get();
}
@Override
public String toString() {
return "DWDQ: [ generation: " + generation + " ]";
}
}