blob: 132b496bcfc610630b6ae98c199e7cf891fec91f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOUtils;
/**
* @lucene.internal
*/
final class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<>();
// we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it.
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
boolean success = false;
try {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) { // no need to publish anything if we don't have any frozen updates
queue.add(new FlushTicket(frozenBufferedUpdates, false));
success = true;
}
} finally {
if (!success) {
decTickets();
}
}
return success;
}
private void incTickets() {
int numTickets = ticketCount.incrementAndGet();
assert numTickets > 0;
}
private void decTickets() {
int numTickets = ticketCount.decrementAndGet();
assert numTickets >= 0;
}
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
assert ticket.hasSegment;
// the actual flush is done asynchronously and once done the FlushedSegment
// is passed to the flush ticket
ticket.setSegment(segment);
}
synchronized void markTicketFailed(FlushTicket ticket) {
assert ticket.hasSegment;
// to free the queue we mark tickets as failed just to clean up the queue.
ticket.setFailed();
}
boolean hasTickets() {
assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get();
return ticketCount.get() != 0;
}
private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
while (true) {
final FlushTicket head;
final boolean canPublish;
synchronized (this) {
head = queue.peek();
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since there could
* be a ticket still in the queue.
*/
consumer.accept(head);
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
decTickets();
// we hold the purgeLock so no other thread should have polled:
assert poll == head;
}
}
} else {
break;
}
}
}
void forcePurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
purgeLock.lock();
try {
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
if (purgeLock.tryLock()) {
try {
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
}
int getTicketCount() {
return ticketCount.get();
}
static final class FlushTicket {
private final FrozenBufferedUpdates frozenUpdates;
private final boolean hasSegment;
private FlushedSegment segment;
private boolean failed = false;
private boolean published = false;
FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
this.frozenUpdates = frozenUpdates;
this.hasSegment = hasSegment;
}
boolean canPublish() {
return hasSegment == false || segment != null || failed;
}
synchronized void markPublished() {
assert published == false: "ticket was already published - can not publish twice";
published = true;
}
private void setSegment(FlushedSegment segment) {
assert !failed;
this.segment = segment;
}
private void setFailed() {
assert segment == null;
failed = true;
}
/**
* Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the
* case if this ticket represents a flushed global frozen updates package.
*/
FlushedSegment getFlushedSegment() {
return segment;
}
/**
* Returns a frozen global deletes package.
*/
FrozenBufferedUpdates getFrozenUpdates() {
return frozenUpdates;
}
}
}