blob: c2a3efe0844f1c3c0755545b7468b5f0dfe37e23 [file] [log] [blame]
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
namespace Lucene.Net.Index
using Lucene.Net.Support;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
using FlushedSegment = Lucene.Net.Index.DocumentsWriterPerThread.FlushedSegment;
/// <summary>
/// @lucene.internal
/// </summary>
internal class DocumentsWriterFlushQueue
private readonly LinkedList<FlushTicket> Queue = new LinkedList<FlushTicket>();
// we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it.
private readonly AtomicInteger TicketCount_Renamed = new AtomicInteger();
private readonly ReentrantLock PurgeLock = new ReentrantLock();
internal virtual void AddDeletes(DocumentsWriterDeleteQueue deleteQueue)
lock (this)
IncTickets(); // first inc the ticket count - freeze opens
// a window for #anyChanges to fail
bool success = false;
Queue.AddLast(new GlobalDeletesTicket(deleteQueue.FreezeGlobalBuffer(null)));
success = true;
if (!success)
private void IncTickets()
int numTickets = TicketCount_Renamed.IncrementAndGet();
Debug.Assert(numTickets > 0);
private void DecTickets()
int numTickets = TicketCount_Renamed.DecrementAndGet();
Debug.Assert(numTickets >= 0);
internal virtual SegmentFlushTicket AddFlushTicket(DocumentsWriterPerThread dwpt)
lock (this)
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
bool success = false;
// prepare flush freezes the global deletes - do in synced block!
SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.PrepareFlush());
success = true;
return ticket;
if (!success)
internal virtual void AddSegment(SegmentFlushTicket ticket, FlushedSegment segment)
lock (this)
// the actual flush is done asynchronously and once done the FlushedSegment
// is passed to the flush ticket
ticket.Segment = segment;
internal virtual void MarkTicketFailed(SegmentFlushTicket ticket)
lock (this)
// to free the queue we mark tickets as failed just to clean up the queue.
internal virtual bool HasTickets()
Debug.Assert(TicketCount_Renamed.Get() >= 0, "ticketCount should be >= 0 but was: " + TicketCount_Renamed.Get());
return TicketCount_Renamed.Get() != 0;
private int InnerPurge(IndexWriter writer)
int numPurged = 0;
while (true)
FlushTicket head;
bool canPublish;
lock (this)
head = Queue.Count <= 0 ? null : Queue.First.Value;
canPublish = head != null && head.CanPublish(); // do this synced
if (canPublish)
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* be a ticket still in the queue.
lock (this)
// finally remove the published ticket from the queue
FlushTicket poll = Queue.First.Value;
Debug.Assert(poll == head);
return numPurged;
internal virtual int ForcePurge(IndexWriter writer)
return InnerPurge(writer);
internal virtual int TryPurge(IndexWriter writer)
if (PurgeLock.TryLock())
return InnerPurge(writer);
return 0;
public virtual int TicketCount
return TicketCount_Renamed.Get();
internal virtual void Clear()
lock (this)
internal abstract class FlushTicket
protected internal FrozenBufferedUpdates FrozenUpdates;
protected internal bool Published = false;
protected internal FlushTicket(FrozenBufferedUpdates frozenUpdates)
Debug.Assert(frozenUpdates != null);
this.FrozenUpdates = frozenUpdates;
protected internal abstract void Publish(IndexWriter writer);
protected internal abstract bool CanPublish();
/// <summary>
/// Publishes the flushed segment, segment private deletes (if any) and its
/// associated global delete (if present) to IndexWriter. The actual
/// publishing operation is synced on IW -> BDS so that the <seealso cref="SegmentInfo"/>'s
/// delete generation is always GlobalPacket_deleteGeneration + 1
/// </summary>
protected internal void PublishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates globalPacket)
Debug.Assert(newSegment != null);
Debug.Assert(newSegment.SegmentInfo != null);
FrozenBufferedUpdates segmentUpdates = newSegment.SegmentUpdates;
//System.out.println("FLUSH: " +;
if (indexWriter.infoStream.IsEnabled("DW"))
indexWriter.infoStream.Message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);
if (segmentUpdates != null && indexWriter.infoStream.IsEnabled("DW"))
indexWriter.infoStream.Message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
// now publish!
indexWriter.PublishFlushedSegment(newSegment.SegmentInfo, segmentUpdates, globalPacket);
protected internal void FinishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates)
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null)
Debug.Assert(bufferedUpdates != null);
if (bufferedUpdates != null && bufferedUpdates.Any())
if (indexWriter.infoStream.IsEnabled("DW"))
indexWriter.infoStream.Message("DW", "flush: push buffered updates: " + bufferedUpdates);
PublishFlushedSegment(indexWriter, newSegment, bufferedUpdates);
internal sealed class GlobalDeletesTicket : FlushTicket
protected internal GlobalDeletesTicket(FrozenBufferedUpdates frozenUpdates)
: base(frozenUpdates)
protected internal override void Publish(IndexWriter writer)
Debug.Assert(!Published, "ticket was already publised - can not publish twice");
Published = true;
// its a global ticket - no segment to publish
FinishFlush(writer, null, FrozenUpdates);
protected internal override bool CanPublish()
return true;
internal sealed class SegmentFlushTicket : FlushTicket
internal FlushedSegment Segment_Renamed;
internal bool Failed = false;
protected internal SegmentFlushTicket(FrozenBufferedUpdates frozenDeletes)
: base(frozenDeletes)
protected internal override void Publish(IndexWriter writer)
Debug.Assert(!Published, "ticket was already publised - can not publish twice");
Published = true;
FinishFlush(writer, Segment_Renamed, FrozenUpdates);
protected internal FlushedSegment Segment
this.Segment_Renamed = value;
protected internal void SetFailed()
Debug.Assert(Segment_Renamed == null);
Failed = true;
protected internal override bool CanPublish()
return Segment_Renamed != null || Failed;