blob: 6a48765f9758b0585a0a0dd67077291b90f9ec21 [file] [log] [blame]
using J2N.Threading;
using Lucene.Net.Diagnostics;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading;
namespace Lucene.Net.Index
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
using CollectionUtil = Lucene.Net.Util.CollectionUtil;
using Directory = Lucene.Net.Store.Directory;
/// <summary>
/// A <see cref="MergeScheduler"/> that runs each merge using a
/// separate thread.
/// <para>Specify the max number of threads that may run at
/// once, and the maximum number of simultaneous merges
/// with <see cref="SetMaxMergesAndThreads"/>.</para>
/// <para>If the number of merges exceeds the max number of threads
/// then the largest merges are paused until one of the smaller
/// merges completes.</para>
/// <para>If more than <see cref="MaxMergeCount"/> merges are
/// requested then this class will forcefully throttle the
/// incoming threads by pausing until one more more merges
/// complete.</para>
/// </summary>
public class ConcurrentMergeScheduler : MergeScheduler, IConcurrentMergeScheduler
private int mergeThreadPriority = -1;
/// <summary>
/// List of currently active <see cref="MergeThread"/>s. </summary>
protected internal IList<MergeThread> m_mergeThreads = new List<MergeThread>();
/// <summary>
/// Default <see cref="MaxThreadCount"/>.
/// We default to 1: tests on spinning-magnet drives showed slower
/// indexing performance if more than one merge thread runs at
/// once (though on an SSD it was faster)
/// </summary>
public const int DEFAULT_MAX_THREAD_COUNT = 1;
/// <summary>
/// Default <see cref="MaxMergeCount"/>. </summary>
public const int DEFAULT_MAX_MERGE_COUNT = 2;
// Max number of merge threads allowed to be running at
// once. When there are more merges then this, we
// forcefully pause the larger ones, letting the smaller
// ones run, up until maxMergeCount merges at which point
// we forcefully pause incoming threads (that presumably
// are the ones causing so much merging).
private int maxThreadCount = DEFAULT_MAX_THREAD_COUNT;
// Max number of merges we accept before forcefully
// throttling the incoming threads
private int maxMergeCount = DEFAULT_MAX_MERGE_COUNT;
/// <summary>
/// <see cref="Directory"/> that holds the index. </summary>
protected internal Directory m_dir;
/// <summary>
/// <see cref="IndexWriter"/> that owns this instance. </summary>
protected internal IndexWriter m_writer;
/// <summary>
/// How many <see cref="MergeThread"/>s have kicked off (this is use
/// to name them).
/// </summary>
protected internal int m_mergeThreadCount;
/// <summary>
/// Sole constructor, with all settings set to default
/// values.
/// </summary>
public ConcurrentMergeScheduler()
/// <summary>
/// Sets the maximum number of merge threads and simultaneous merges allowed.
/// </summary>
/// <param name="maxMergeCount"> the max # simultaneous merges that are allowed.
/// If a merge is necessary yet we already have this many
/// threads running, the incoming thread (that is calling
/// add/updateDocument) will block until a merge thread
/// has completed. Note that we will only run the
/// smallest <paramref name="maxThreadCount"/> merges at a time. </param>
/// <param name="maxThreadCount"> The max # simultaneous merge threads that should
/// be running at once. This must be &lt;= <paramref name="maxMergeCount"/> </param>
public virtual void SetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
if (maxThreadCount < 1)
throw new ArgumentException("maxThreadCount should be at least 1");
if (maxMergeCount < 1)
throw new ArgumentException("maxMergeCount should be at least 1");
if (maxThreadCount > maxMergeCount)
throw new ArgumentException("maxThreadCount should be <= maxMergeCount (= " + maxMergeCount + ")");
this.maxThreadCount = maxThreadCount;
this.maxMergeCount = maxMergeCount;
/// <summary>
/// Returns <see cref="maxThreadCount"/>.
/// </summary>
/// <seealso cref="SetMaxMergesAndThreads(int, int)"/>
public virtual int MaxThreadCount => maxThreadCount;
/// <summary>
/// See <see cref="SetMaxMergesAndThreads(int, int)"/>. </summary>
public virtual int MaxMergeCount => maxMergeCount;
/// <summary>
/// Return the priority that merge threads run at. By
/// default the priority is 1 plus the priority of (ie,
/// slightly higher priority than) the first thread that
/// calls merge.
/// </summary>
public virtual int MergeThreadPriority
lock (this)
return mergeThreadPriority;
/// <summary>
/// Set the base priority that merge threads run at.
/// Note that CMS may increase priority of some merge
/// threads beyond this base priority. It's best not to
/// set this any higher than
/// <see cref="ThreadPriority.Highest"/>(4)-maxThreadCount, so that CMS has
/// room to set relative priority among threads.
/// </summary>
public virtual void SetMergeThreadPriority(int priority)
lock (this)
if (priority > (int)ThreadPriority.Highest || priority < (int)ThreadPriority.Lowest)
throw new ArgumentException("priority must be in range " + (int)ThreadPriority.Highest + " .. " + (int)ThreadPriority.Lowest + " inclusive");
mergeThreadPriority = priority;
/// <summary>
/// Sorts <see cref="MergeThread"/>s; larger merges come first. </summary>
protected internal static readonly IComparer<MergeThread> compareByMergeDocCount = Comparer<MergeThread>.Create((t1, t2) =>
MergePolicy.OneMerge m1 = t1.CurrentMerge;
MergePolicy.OneMerge m2 = t2.CurrentMerge;
int c1 = m1 == null ? int.MaxValue : m1.TotalDocCount;
int c2 = m2 == null ? int.MaxValue : m2.TotalDocCount;
return c2 - c1;
/// <summary>
/// Called whenever the running merges have changed, to pause &amp; unpause
/// threads. This method sorts the merge threads by their merge size in
/// descending order and then pauses/unpauses threads from first to last --
/// that way, smaller merges are guaranteed to run before larger ones.
/// </summary>
protected virtual void UpdateMergeThreads()
lock (this)
// Only look at threads that are alive & not in the
// process of stopping (ie have an active merge):
IList<MergeThread> activeMerges = new List<MergeThread>();
int threadIdx = 0;
while (threadIdx < m_mergeThreads.Count)
MergeThread mergeThread = m_mergeThreads[threadIdx];
if (!mergeThread.IsAlive)
// Prune any dead threads
if (mergeThread.CurrentMerge != null)
// Sort the merge threads in descending order.
CollectionUtil.TimSort(activeMerges, compareByMergeDocCount);
int pri = mergeThreadPriority;
int activeMergeCount = activeMerges.Count;
for (threadIdx = 0; threadIdx < activeMergeCount; threadIdx++)
MergeThread mergeThread = activeMerges[threadIdx];
MergePolicy.OneMerge merge = mergeThread.CurrentMerge;
if (merge == null)
// pause the thread if maxThreadCount is smaller than the number of merge threads.
bool doPause = threadIdx < activeMergeCount - maxThreadCount;
if (IsVerbose)
if (doPause != merge.IsPaused)
if (doPause)
Message("pause thread " + mergeThread.Name);
Message("unpause thread " + mergeThread.Name);
if (doPause != merge.IsPaused)
if (!doPause)
if (IsVerbose)
Message("set priority of merge thread " + mergeThread.Name + " to " + pri);
pri = Math.Min((int)ThreadPriority.Highest, 1 + pri);
/// <summary>
/// Returns <c>true</c> if verbosing is enabled. This method is usually used in
/// conjunction with <see cref="Message(String)"/>, like that:
/// <code>
/// if (IsVerbose)
/// {
/// Message(&quot;your message&quot;);
/// }
/// </code>
/// </summary>
protected virtual bool IsVerbose => m_writer != null && m_writer.infoStream.IsEnabled("CMS");
/// <summary>
/// Outputs the given message - this method assumes <see cref="IsVerbose"/> was
/// called and returned <c>true</c>.
/// </summary>
protected internal virtual void Message(string message)
m_writer.infoStream.Message("CMS", message);
private void InitMergeThreadPriority()
lock (this)
if (mergeThreadPriority == -1)
// Default to slightly higher priority than our
// calling thread
mergeThreadPriority = 1 + (int)ThreadJob.CurrentThread.Priority;
if (mergeThreadPriority > (int)ThreadPriority.Highest)
mergeThreadPriority = (int)ThreadPriority.Highest;
protected override void Dispose(bool disposing)
/// <summary>
/// Wait for any running merge threads to finish. This call is not interruptible as used by <see cref="Dispose(bool)"/>. </summary>
public virtual void Sync()
bool interrupted = false;
while (true)
MergeThread toSync = null;
lock (this)
foreach (MergeThread t in m_mergeThreads)
if (t != null && t.IsAlive)
toSync = t;
if (toSync != null)
#pragma warning disable 168
catch (ThreadInterruptedException ie)
#pragma warning restore 168
// ignore this Exception, we will retry until all threads are dead
interrupted = true;
// finally, restore interrupt status:
if (interrupted)
/// <summary>
/// Returns the number of merge threads that are alive. Note that this number
/// is &lt;= <see cref="m_mergeThreads"/> size.
/// </summary>
protected virtual int MergeThreadCount
lock (this)
int count = 0;
foreach (MergeThread mt in m_mergeThreads)
if (mt.IsAlive && mt.CurrentMerge != null)
return count;
public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound)
lock (this)
if (Debugging.AssertsEnabled) Debugging.Assert(!Monitor.IsEntered(writer));
this.m_writer = writer;
m_dir = writer.Directory;
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
if (IsVerbose)
Message("now merge");
Message(" index: " + writer.SegString());
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while (true)
long startStallTime = 0;
while (writer.HasPendingMerges() && MergeThreadCount >= maxMergeCount)
// this means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
startStallTime = Environment.TickCount;
if (IsVerbose)
Message(" too many merges; stalling...");
//catch (ThreadInterruptedException ie) // LUCENENET NOTE: Senseless to catch and rethrow the same exception type
// throw new ThreadInterruptedException(ie.ToString(), ie);
if (IsVerbose)
if (startStallTime != 0)
Message(" stalled for " + (Environment.TickCount - startStallTime) + " msec");
MergePolicy.OneMerge merge = writer.NextMerge();
if (merge == null)
if (IsVerbose)
Message(" no more merges pending; now return");
bool success = false;
if (IsVerbose)
Message(" consider merge " + writer.SegString(merge.Segments));
// OK to spawn a new merge thread to handle this
// merge:
MergeThread merger = GetMergeThread(writer, merge);
if (IsVerbose)
Message(" launch new thread [" + merger.Name + "]");
// Must call this after starting the thread else
// the new thread is removed from mergeThreads
// (since it's not alive yet):
success = true;
if (!success)
/// <summary>
/// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/> </summary>
protected virtual void DoMerge(MergePolicy.OneMerge merge)
/// <summary>
/// Create and return a new <see cref="MergeThread"/> </summary>
protected virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)
lock (this)
MergeThread thread = new MergeThread(this, writer, merge);
thread.IsBackground = true;
thread.Name = "Lucene Merge Thread #" + m_mergeThreadCount++;
return thread;
/// <summary>
/// Runs a merge thread, which may run one or more merges
/// in sequence.
/// </summary>
protected internal class MergeThread : ThreadJob
private readonly ConcurrentMergeScheduler outerInstance;
internal IndexWriter tWriter;
internal MergePolicy.OneMerge startMerge;
internal MergePolicy.OneMerge runningMerge;
private volatile bool done;
/// <summary>
/// Sole constructor. </summary>
public MergeThread(ConcurrentMergeScheduler outerInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)
this.outerInstance = outerInstance;
this.tWriter = writer;
this.startMerge = startMerge;
/// <summary>
/// Record the currently running merge. </summary>
public virtual MergePolicy.OneMerge RunningMerge
lock (this)
runningMerge = value;
lock (this)
return runningMerge;
/// <summary>
/// Return the current merge, or <c>null</c> if this
/// <see cref="MergeThread"/> is done.
/// </summary>
public virtual MergePolicy.OneMerge CurrentMerge
lock (this)
if (done)
return null;
else if (runningMerge != null)
return runningMerge;
return startMerge;
/// <summary>
/// Set the priority of this thread. </summary>
public virtual void SetThreadPriority(ThreadPriority priority)
Priority = priority;
#pragma warning disable 168
catch (NullReferenceException npe)
// Strangely, Sun's JDK 1.5 on Linux sometimes
// throws NPE out of here...
catch (SecurityException /*se*/) // LUCENENET: IDE0059: Remove unnecessary value assignment
#pragma warning restore 168
// Ignore this because we will still run fine with
// normal thread priority
public override void Run()
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.startMerge;
if (outerInstance.IsVerbose)
outerInstance.Message(" merge thread: start");
while (true)
RunningMerge = merge;
// Subsequent times through the loop we do any new
// merge that writer says is necessary:
merge = tWriter.NextMerge();
// Notify here in case any threads were stalled;
// they will notice that the pending merge has
// been pulled and possibly resume:
lock (outerInstance)
if (merge != null)
if (outerInstance.IsVerbose)
outerInstance.Message(" merge thread: do another merge " + tWriter.SegString(merge.Segments));
if (outerInstance.IsVerbose)
outerInstance.Message(" merge thread: done");
catch (Exception exc)
// Ignore the exception if it was due to abort:
if (!(exc is MergePolicy.MergeAbortedException))
//System.out.println(Thread.currentThread().getName() + ": CMS: exc");
if (!outerInstance.suppressExceptions)
// suppressExceptions is normally only set during
// testing.
done = true;
lock (outerInstance)
/// <summary>
/// Called when an exception is hit in a background merge
/// thread
/// </summary>
protected virtual void HandleMergeException(Exception exc)
// When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another
// merge to run. If whatever caused the error is not
// transient then the exception will keep happening,
// so, we sleep here to avoid saturating CPU in such
// cases:
//catch (ThreadInterruptedException ie) // LUCENENET NOTE: Senseless to catch and rethrow the same exception type
// throw new ThreadInterruptedException("Thread Interrupted Exception", ie);
throw new MergePolicy.MergeException(exc, m_dir);
private bool suppressExceptions;
/// <summary>
/// Used for testing </summary>
public virtual void SetSuppressExceptions()
suppressExceptions = true;
/// <summary>
/// Used for testing </summary>
public virtual void ClearSuppressExceptions()
suppressExceptions = false;
public override string ToString()
StringBuilder sb = new StringBuilder(this.GetType().Name + ": ");
sb.Append("maxThreadCount=").Append(maxThreadCount).Append(", ");
sb.Append("maxMergeCount=").Append(maxMergeCount).Append(", ");
return sb.ToString();
public override object Clone()
ConcurrentMergeScheduler clone = (ConcurrentMergeScheduler)base.Clone();
clone.m_writer = null;
clone.m_dir = null;
clone.m_mergeThreads = new List<MergeThread>();
return clone;