blob: 26e933b448a3a459078994c861494893e769b716 [file] [log] [blame]
using J2N.Collections.Generic.Extensions;
using Lucene.Net.Support.Threading;
using Lucene.Net.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Directory = Lucene.Net.Store.Directory;
namespace Lucene.Net.Index
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
/// <summary>
/// A <see cref="MergeScheduler"/> that runs each merge using
/// <see cref="Task"/>s on the default <see cref="TaskScheduler"/>.
/// <para>If more than <see cref="MaxMergeCount"/> merges are
/// requested then this class will forcefully throttle the
/// incoming threads by pausing until one more more merges
/// complete.</para>
/// LUCENENET specific
/// </summary>
public class TaskMergeScheduler : MergeScheduler, IConcurrentMergeScheduler
public const string COMPONENT_NAME = "CMS";
private readonly TaskScheduler _taskScheduler = TaskScheduler.Default;
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim();
/// <summary>
/// List of currently active <see cref="MergeThread"/>s.</summary>
private readonly IList<MergeThread> _mergeThreads = new List<MergeThread>();
/// <summary>
/// How many <see cref="MergeThread"/>s have kicked off (this is use
/// to name them).
/// </summary>
private int _mergeThreadCount;
/// <summary>
/// <see cref="Directory"/> that holds the index. </summary>
private Directory _directory;
/// <summary>
/// <see cref="IndexWriter"/> that owns this instance.
/// </summary>
private IndexWriter _writer;
/// <summary>
/// Sole constructor, with all settings set to default
/// values.
/// </summary>
public TaskMergeScheduler() : base()
MaxThreadCount = _taskScheduler.MaximumConcurrencyLevel;
MaxMergeCount = _taskScheduler.MaximumConcurrencyLevel;
/// <summary>
/// Sets the maximum number of merge threads and simultaneous merges allowed.
/// </summary>
/// <param name="maxMergeCount"> The max # simultaneous merges that are allowed.
/// If a merge is necessary yet we already have this many
/// threads running, the incoming thread (that is calling
/// add/updateDocument) will block until a merge thread
/// has completed. Note that we will only run the
/// smallest <paramref name="maxThreadCount"/> merges at a time. </param>
/// <param name="maxThreadCount"> The max # simultaneous merge threads that should
/// be running at once. This must be &lt;= <paramref name="maxMergeCount"/> </param>
public void SetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
// This is handled by TaskScheduler.Default.MaximumConcurrencyLevel
/// <summary>
/// Max number of merge threads allowed to be running at
/// once. When there are more merges then this, we
/// forcefully pause the larger ones, letting the smaller
/// ones run, up until <see cref="MaxMergeCount"/> merges at which point
/// we forcefully pause incoming threads (that presumably
/// are the ones causing so much merging).
/// </summary>
/// <seealso cref="SetMaxMergesAndThreads(int, int)"/>
public int MaxThreadCount { get; private set; }
/// <summary>
/// Max number of merges we accept before forcefully
/// throttling the incoming threads
/// </summary>
public int MaxMergeCount { get; private set; }
/// <summary>
/// Return the priority that merge threads run at. This is always the same.
/// </summary>
public int MergeThreadPriority
return 2;
return (int)ThreadPriority.Normal;
/// <summary>
/// This method has no effect in <see cref="TaskMergeScheduler"/> because the
/// <see cref="MergeThreadPriority"/> returns a constant value.
/// </summary>
public void SetMergeThreadPriority(int priority)
/// <summary>
/// Called whenever the running merges have changed, to pause &amp; unpause
/// threads. This method sorts the merge threads by their merge size in
/// descending order and then pauses/unpauses threads from first to last --
/// that way, smaller merges are guaranteed to run before larger ones.
/// </summary>
private void UpdateMergeThreads()
foreach (var merge in _mergeThreads.ToArray())
// Prune any dead threads
if (!merge.IsAlive)
/// <summary>
/// Returns <c>true</c> if verbosing is enabled. This method is usually used in
/// conjunction with <see cref="Message(string)"/>, like that:
/// <code>
/// if (Verbose) {
/// Message(&quot;your message&quot;);
/// }
/// </code>
/// </summary>
protected bool Verbose => _writer != null && _writer.infoStream.IsEnabled(COMPONENT_NAME);
/// <summary>
/// Outputs the given message - this method assumes <see cref="Verbose"/> was
/// called and returned <c>true</c>.
/// </summary>
protected virtual void Message(string message)
_writer.infoStream.Message(COMPONENT_NAME, message);
protected override void Dispose(bool disposing)
/// <summary>
/// Wait for any running merge threads to finish.
/// This call is not interruptible as used by <see cref="MergeScheduler.Dispose()"/>.
/// </summary>
public virtual void Sync()
foreach (var merge in _mergeThreads.ToArray())
if (merge == null || !merge.IsAlive)
catch (OperationCanceledException)
// expected when we cancel.
catch (AggregateException ae)
ae.Handle(ex =>
if (!(ex is OperationCanceledException))
return true;
return false;
/// <summary>
/// Returns the number of merge threads that are alive. Note that this number
/// is &lt;= <see cref="_mergeThreads"/> size.
/// </summary>
private int MergeThreadCount
get { return _mergeThreads.Count(x => x.IsAlive && x.CurrentMerge != null); }
public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound)
using (_lock.Write())
_writer = writer;
_directory = writer.Directory;
if (Verbose)
Message("now merge");
Message(" index: " + writer.SegString());
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while (true)
long startStallTime = 0;
while (writer.HasPendingMerges() && MergeThreadCount >= MaxMergeCount)
// this means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
startStallTime = Environment.TickCount;
if (Verbose)
Message(" too many merges; stalling...");
if (Verbose)
if (startStallTime != 0)
Message(" stalled for " + (Environment.TickCount - startStallTime) + " msec");
MergePolicy.OneMerge merge = writer.NextMerge();
if (merge == null)
if (Verbose)
Message(" no more merges pending; now return");
bool success = false;
if (Verbose)
Message(" consider merge " + writer.SegString(merge.Segments));
// OK to spawn a new merge thread to handle this
// merge:
var merger = CreateTask(writer, merge);
merger.MergeThreadCompleted += OnMergeThreadCompleted;
if (Verbose)
Message(" launch new thread [" + merger.Name + "]");
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
success = true;
if (!success)
/// <summary>
/// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/> </summary>
protected virtual void DoMerge(MergePolicy.OneMerge merge)
private void OnMergeThreadCompleted(object sender, EventArgs e)
var mergeThread = sender as MergeThread;
if (mergeThread == null)
mergeThread.MergeThreadCompleted -= OnMergeThreadCompleted;
using (_lock.Write())
/// <summary>
/// Create and return a new <see cref="MergeThread"/> </summary>
private MergeThread CreateTask(IndexWriter writer, MergePolicy.OneMerge merge)
var count = Interlocked.Increment(ref _mergeThreadCount);
var name = string.Format("Lucene Merge Task #{0}", count);
return new MergeThread(name, writer, merge, writer.infoStream, Verbose, _manualResetEvent, HandleMergeException, DoMerge);
/// <summary>
/// Called when an exception is hit in a background merge
/// thread
/// </summary>
protected virtual void HandleMergeException(Exception exc)
// suppressExceptions is normally only set during testing
if (suppressExceptions)
// try
// {
// When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another
// merge to run. If whatever caused the error is not
// transient then the exception will keep happening,
// so, we sleep here to avoid saturating CPU in such
// cases:
//#if FEATURE_THREAD_INTERRUPT // LUCENENET NOTE: Senseless to catch and rethrow the same exception type
// }
// catch (ThreadInterruptedException ie)
// {
// throw new ThreadInterruptedException("Thread Interrupted Exception", ie);
// }
throw new MergePolicy.MergeException(exc, _directory);
private bool suppressExceptions;
/// <summary>
/// Used for testing </summary>
public virtual void SetSuppressExceptions()
suppressExceptions = true;
/// <summary>
/// Used for testing </summary>
public virtual void ClearSuppressExceptions()
suppressExceptions = false;
public override string ToString()
StringBuilder sb = new StringBuilder(this.GetType().Name + ": ");
sb.AppendFormat("maxThreadCount={0}, ", MaxThreadCount);
sb.AppendFormat("maxMergeCount={0}", MaxMergeCount);
return sb.ToString();
public override object Clone()
TaskMergeScheduler clone = (TaskMergeScheduler)base.Clone();
clone._writer = null;
clone._directory = null;
return clone;
/// <summary>
/// Runs a merge thread, which may run one or more merges
/// in sequence.
/// </summary>
internal class MergeThread : IDisposable
public event EventHandler MergeThreadCompleted;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly ManualResetEventSlim _resetEvent;
private readonly Action<Exception> _exceptionHandler;
private readonly Action<MergePolicy.OneMerge> _doMerge;
private readonly InfoStream _logger;
private readonly IndexWriter _writer;
private readonly MergePolicy.OneMerge _startingMerge;
private readonly bool _isLoggingEnabled;
private Task _task;
private MergePolicy.OneMerge _runningMerge;
private volatile bool _isDisposed = false;
private volatile bool _isDone;
/// <summary>
/// Sole constructor. </summary>
public MergeThread(string name, IndexWriter writer, MergePolicy.OneMerge startMerge,
InfoStream logger, bool isLoggingEnabled,
ManualResetEventSlim resetEvent, Action<Exception> exceptionHandler, Action<MergePolicy.OneMerge> doMerge)
Name = name;
_cancellationTokenSource = new CancellationTokenSource();
_writer = writer;
_startingMerge = startMerge;
_logger = logger;
_isLoggingEnabled = isLoggingEnabled;
_resetEvent = resetEvent;
_exceptionHandler = exceptionHandler;
_doMerge = doMerge;
public string Name { get; private set; }
public Task Instance
using (_lock.Read())
return _task;
/// <summary>
/// Record the currently running merge. </summary>
public virtual MergePolicy.OneMerge RunningMerge
using (_lock.Read())
return _runningMerge;
set => Interlocked.Exchange(ref _runningMerge, value);
/// <summary>
/// Return the current merge, or <c>null</c> if this
/// <see cref="MergeThread"/> is done.
/// </summary>
public virtual MergePolicy.OneMerge CurrentMerge
using (_lock.Read())
if (_isDone)
return null;
return _runningMerge ?? _startingMerge;
public bool IsAlive
if (_isDisposed || _isDone)
return false;
using (_lock.Read())
return _task != null
&& (_task.Status != TaskStatus.Canceled
|| _task.Status != TaskStatus.Faulted
|| _task.Status != TaskStatus.RanToCompletion);
public void Start(TaskScheduler taskScheduler)
using (_lock.Write())
if (_task == null && !_cancellationTokenSource.IsCancellationRequested)
_task = Task.Factory.StartNew(() => Run(_cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.None, taskScheduler);
public void Wait()
if (!IsAlive)
public void Cancel()
if (!IsAlive)
using (_lock.Write())
if (!_cancellationTokenSource.IsCancellationRequested)
private void Run(CancellationToken cancellationToken)
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = _startingMerge;
if (_isLoggingEnabled)
_logger.Message(COMPONENT_NAME, " merge thread: start");
while (true && !cancellationToken.IsCancellationRequested)
RunningMerge = merge;
// LUCENENET NOTE: We MUST call DoMerge(merge) instead of
// _writer.Merge(merge) because the tests specifically look
// for the method name DoMerge in the stack trace.
// Subsequent times through the loop we do any new
// merge that writer says is necessary:
merge = _writer.NextMerge();
// Notify here in case any threads were stalled;
// they will notice that the pending merge has
// been pulled and possibly resume:
if (merge != null)
if (_isLoggingEnabled)
_logger.Message(COMPONENT_NAME, " merge thread: do another merge " + _writer.SegString(merge.Segments));
if (_isLoggingEnabled)
_logger.Message(COMPONENT_NAME, " merge thread: done");
catch (Exception exc)
// Ignore the exception if it was due to abort:
if (!(exc is MergePolicy.MergeAbortedException))
//System.out.println(Thread.currentThread().getName() + ": CMS: exc");
_isDone = true;
if (MergeThreadCompleted != null)
MergeThreadCompleted(this, EventArgs.Empty);
public void Dispose()
if (_isDisposed)
_isDisposed = true;
public override string ToString()
return _task == null
? string.Format("Task[{0}], Task has not been started yet.", Name)
: string.Format("Task[{0}], Id[{1}], Status[{2}]", Name, _task.Id, _task.Status);
public override bool Equals(object obj)
var compared = obj as MergeThread;
if (compared == null
|| (Instance == null && compared.Instance != null)
|| (Instance != null && compared.Instance == null))
return false;
return Instance.Id == compared.Instance.Id;
public override int GetHashCode()
return Instance == null
? base.GetHashCode()
: Instance.GetHashCode();