| #if FEATURE_CONCURRENTMERGESCHEDULER |
| using J2N.Threading; |
| using Lucene.Net.Diagnostics; |
| using System; |
| using System.Collections.Generic; |
| using System.Runtime.CompilerServices; |
| using System.Security; |
| using System.Text; |
| using System.Threading; |
| |
| namespace Lucene.Net.Index |
| { |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using CollectionUtil = Lucene.Net.Util.CollectionUtil; |
| using Directory = Lucene.Net.Store.Directory; |
| |
| /// <summary> |
| /// A <see cref="MergeScheduler"/> that runs each merge using a |
| /// separate thread. |
| /// |
| /// <para>Specify the max number of threads that may run at |
| /// once, and the maximum number of simultaneous merges |
| /// with <see cref="SetMaxMergesAndThreads"/>.</para> |
| /// |
| /// <para>If the number of merges exceeds the max number of threads |
| /// then the largest merges are paused until one of the smaller |
| /// merges completes.</para> |
| /// |
| /// <para>If more than <see cref="MaxMergeCount"/> merges are |
| /// requested then this class will forcefully throttle the |
| /// incoming threads by pausing until one more more merges |
| /// complete.</para> |
| /// </summary> |
| public class ConcurrentMergeScheduler : MergeScheduler, IConcurrentMergeScheduler |
| { |
| private int mergeThreadPriority = -1; |
| |
| /// <summary> |
| /// List of currently active <see cref="MergeThread"/>s. </summary> |
| protected internal IList<MergeThread> m_mergeThreads = new List<MergeThread>(); |
| |
| /// <summary> |
| /// Default <see cref="MaxThreadCount"/>. |
| /// We default to 1: tests on spinning-magnet drives showed slower |
| /// indexing performance if more than one merge thread runs at |
| /// once (though on an SSD it was faster) |
| /// </summary> |
| public const int DEFAULT_MAX_THREAD_COUNT = 1; |
| |
| /// <summary> |
| /// Default <see cref="MaxMergeCount"/>. </summary> |
| public const int DEFAULT_MAX_MERGE_COUNT = 2; |
| |
| // Max number of merge threads allowed to be running at |
| // once. When there are more merges then this, we |
| // forcefully pause the larger ones, letting the smaller |
| // ones run, up until maxMergeCount merges at which point |
| // we forcefully pause incoming threads (that presumably |
| // are the ones causing so much merging). |
| private int maxThreadCount = DEFAULT_MAX_THREAD_COUNT; |
| |
| // Max number of merges we accept before forcefully |
| // throttling the incoming threads |
| private int maxMergeCount = DEFAULT_MAX_MERGE_COUNT; |
| |
| /// <summary> |
| /// <see cref="Directory"/> that holds the index. </summary> |
| protected internal Directory m_dir; |
| |
| /// <summary> |
| /// <see cref="IndexWriter"/> that owns this instance. </summary> |
| protected internal IndexWriter m_writer; |
| |
| /// <summary> |
| /// How many <see cref="MergeThread"/>s have kicked off (this is use |
| /// to name them). |
| /// </summary> |
| protected internal int m_mergeThreadCount; |
| |
| /// <summary> |
| /// Sole constructor, with all settings set to default |
| /// values. |
| /// </summary> |
| public ConcurrentMergeScheduler() |
| { |
| } |
| |
| /// <summary> |
| /// Sets the maximum number of merge threads and simultaneous merges allowed. |
| /// </summary> |
| /// <param name="maxMergeCount"> the max # simultaneous merges that are allowed. |
| /// If a merge is necessary yet we already have this many |
| /// threads running, the incoming thread (that is calling |
| /// add/updateDocument) will block until a merge thread |
| /// has completed. Note that we will only run the |
| /// smallest <paramref name="maxThreadCount"/> merges at a time. </param> |
| /// <param name="maxThreadCount"> The max # simultaneous merge threads that should |
| /// be running at once. This must be <= <paramref name="maxMergeCount"/> </param> |
| public virtual void SetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount) |
| { |
| if (maxThreadCount < 1) |
| { |
| throw new ArgumentException("maxThreadCount should be at least 1"); |
| } |
| if (maxMergeCount < 1) |
| { |
| throw new ArgumentException("maxMergeCount should be at least 1"); |
| } |
| if (maxThreadCount > maxMergeCount) |
| { |
| throw new ArgumentException("maxThreadCount should be <= maxMergeCount (= " + maxMergeCount + ")"); |
| } |
| this.maxThreadCount = maxThreadCount; |
| this.maxMergeCount = maxMergeCount; |
| } |
| |
| /// <summary> |
| /// Returns <see cref="maxThreadCount"/>. |
| /// </summary> |
| /// <seealso cref="SetMaxMergesAndThreads(int, int)"/> |
| public virtual int MaxThreadCount => maxThreadCount; |
| |
| /// <summary> |
| /// See <see cref="SetMaxMergesAndThreads(int, int)"/>. </summary> |
| public virtual int MaxMergeCount => maxMergeCount; |
| |
| /// <summary> |
| /// Return the priority that merge threads run at. By |
| /// default the priority is 1 plus the priority of (ie, |
| /// slightly higher priority than) the first thread that |
| /// calls merge. |
| /// </summary> |
| public virtual int MergeThreadPriority |
| { |
| get |
| { |
| lock (this) |
| { |
| InitMergeThreadPriority(); |
| return mergeThreadPriority; |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Set the base priority that merge threads run at. |
| /// Note that CMS may increase priority of some merge |
| /// threads beyond this base priority. It's best not to |
| /// set this any higher than |
| /// <see cref="ThreadPriority.Highest"/>(4)-maxThreadCount, so that CMS has |
| /// room to set relative priority among threads. |
| /// </summary> |
| public virtual void SetMergeThreadPriority(int priority) |
| { |
| lock (this) |
| { |
| if (priority > (int)ThreadPriority.Highest || priority < (int)ThreadPriority.Lowest) |
| { |
| throw new ArgumentException("priority must be in range " + (int)ThreadPriority.Highest + " .. " + (int)ThreadPriority.Lowest + " inclusive"); |
| } |
| mergeThreadPriority = priority; |
| UpdateMergeThreads(); |
| } |
| } |
| |
| /// <summary> |
| /// Sorts <see cref="MergeThread"/>s; larger merges come first. </summary> |
| protected internal static readonly IComparer<MergeThread> compareByMergeDocCount = Comparer<MergeThread>.Create((t1, t2) => |
| { |
| MergePolicy.OneMerge m1 = t1.CurrentMerge; |
| MergePolicy.OneMerge m2 = t2.CurrentMerge; |
| |
| int c1 = m1 == null ? int.MaxValue : m1.TotalDocCount; |
| int c2 = m2 == null ? int.MaxValue : m2.TotalDocCount; |
| |
| return c2 - c1; |
| }); |
| |
| /// <summary> |
| /// Called whenever the running merges have changed, to pause & unpause |
| /// threads. This method sorts the merge threads by their merge size in |
| /// descending order and then pauses/unpauses threads from first to last -- |
| /// that way, smaller merges are guaranteed to run before larger ones. |
| /// </summary> |
| protected virtual void UpdateMergeThreads() |
| { |
| lock (this) |
| { |
| // Only look at threads that are alive & not in the |
| // process of stopping (ie have an active merge): |
| IList<MergeThread> activeMerges = new List<MergeThread>(); |
| |
| int threadIdx = 0; |
| while (threadIdx < m_mergeThreads.Count) |
| { |
| MergeThread mergeThread = m_mergeThreads[threadIdx]; |
| if (!mergeThread.IsAlive) |
| { |
| // Prune any dead threads |
| m_mergeThreads.RemoveAt(threadIdx); |
| continue; |
| } |
| if (mergeThread.CurrentMerge != null) |
| { |
| activeMerges.Add(mergeThread); |
| } |
| threadIdx++; |
| } |
| |
| // Sort the merge threads in descending order. |
| CollectionUtil.TimSort(activeMerges, compareByMergeDocCount); |
| |
| int pri = mergeThreadPriority; |
| int activeMergeCount = activeMerges.Count; |
| for (threadIdx = 0; threadIdx < activeMergeCount; threadIdx++) |
| { |
| MergeThread mergeThread = activeMerges[threadIdx]; |
| MergePolicy.OneMerge merge = mergeThread.CurrentMerge; |
| if (merge == null) |
| { |
| continue; |
| } |
| |
| // pause the thread if maxThreadCount is smaller than the number of merge threads. |
| bool doPause = threadIdx < activeMergeCount - maxThreadCount; |
| |
| if (IsVerbose) |
| { |
| if (doPause != merge.IsPaused) |
| { |
| if (doPause) |
| { |
| Message("pause thread " + mergeThread.Name); |
| } |
| else |
| { |
| Message("unpause thread " + mergeThread.Name); |
| } |
| } |
| } |
| if (doPause != merge.IsPaused) |
| { |
| merge.SetPause(doPause); |
| } |
| |
| if (!doPause) |
| { |
| if (IsVerbose) |
| { |
| Message("set priority of merge thread " + mergeThread.Name + " to " + pri); |
| } |
| mergeThread.SetThreadPriority((ThreadPriority)pri); |
| pri = Math.Min((int)ThreadPriority.Highest, 1 + pri); |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Returns <c>true</c> if verbosing is enabled. This method is usually used in |
| /// conjunction with <see cref="Message(String)"/>, like that: |
| /// |
| /// <code> |
| /// if (IsVerbose) |
| /// { |
| /// Message("your message"); |
| /// } |
| /// </code> |
| /// </summary> |
| protected virtual bool IsVerbose => m_writer != null && m_writer.infoStream.IsEnabled("CMS"); |
| |
| /// <summary> |
| /// Outputs the given message - this method assumes <see cref="IsVerbose"/> was |
| /// called and returned <c>true</c>. |
| /// </summary> |
| protected internal virtual void Message(string message) |
| { |
| m_writer.infoStream.Message("CMS", message); |
| } |
| |
| private void InitMergeThreadPriority() |
| { |
| lock (this) |
| { |
| if (mergeThreadPriority == -1) |
| { |
| // Default to slightly higher priority than our |
| // calling thread |
| mergeThreadPriority = 1 + (int)ThreadJob.CurrentThread.Priority; |
| if (mergeThreadPriority > (int)ThreadPriority.Highest) |
| { |
| mergeThreadPriority = (int)ThreadPriority.Highest; |
| } |
| } |
| } |
| } |
| |
| protected override void Dispose(bool disposing) |
| { |
| Sync(); |
| } |
| |
| /// <summary> |
| /// Wait for any running merge threads to finish. This call is not interruptible as used by <see cref="Dispose(bool)"/>. </summary> |
| public virtual void Sync() |
| { |
| bool interrupted = false; |
| try |
| { |
| while (true) |
| { |
| MergeThread toSync = null; |
| lock (this) |
| { |
| foreach (MergeThread t in m_mergeThreads) |
| { |
| if (t != null && t.IsAlive) |
| { |
| toSync = t; |
| break; |
| } |
| } |
| } |
| if (toSync != null) |
| { |
| try |
| { |
| toSync.Join(); |
| } |
| #pragma warning disable 168 |
| catch (ThreadInterruptedException ie) |
| #pragma warning restore 168 |
| { |
| // ignore this Exception, we will retry until all threads are dead |
| interrupted = true; |
| } |
| } |
| else |
| { |
| break; |
| } |
| } |
| } |
| finally |
| { |
| // finally, restore interrupt status: |
| if (interrupted) |
| { |
| Thread.CurrentThread.Interrupt(); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Returns the number of merge threads that are alive. Note that this number |
| /// is <= <see cref="m_mergeThreads"/> size. |
| /// </summary> |
| protected virtual int MergeThreadCount |
| { |
| get |
| { |
| lock (this) |
| { |
| int count = 0; |
| foreach (MergeThread mt in m_mergeThreads) |
| { |
| if (mt.IsAlive && mt.CurrentMerge != null) |
| { |
| count++; |
| } |
| } |
| return count; |
| } |
| } |
| } |
| |
| [MethodImpl(MethodImplOptions.NoInlining)] |
| public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound) |
| { |
| lock (this) |
| { |
| if (Debugging.AssertsEnabled) Debugging.Assert(!Monitor.IsEntered(writer)); |
| |
| this.m_writer = writer; |
| |
| InitMergeThreadPriority(); |
| |
| m_dir = writer.Directory; |
| |
| // First, quickly run through the newly proposed merges |
| // and add any orthogonal merges (ie a merge not |
| // involving segments already pending to be merged) to |
| // the queue. If we are way behind on merging, many of |
| // these newly proposed merges will likely already be |
| // registered. |
| |
| if (IsVerbose) |
| { |
| Message("now merge"); |
| Message(" index: " + writer.SegString()); |
| } |
| |
| // Iterate, pulling from the IndexWriter's queue of |
| // pending merges, until it's empty: |
| while (true) |
| { |
| long startStallTime = 0; |
| while (writer.HasPendingMerges() && MergeThreadCount >= maxMergeCount) |
| { |
| // this means merging has fallen too far behind: we |
| // have already created maxMergeCount threads, and |
| // now there's at least one more merge pending. |
| // Note that only maxThreadCount of |
| // those created merge threads will actually be |
| // running; the rest will be paused (see |
| // updateMergeThreads). We stall this producer |
| // thread to prevent creation of new segments, |
| // until merging has caught up: |
| startStallTime = Environment.TickCount; |
| if (IsVerbose) |
| { |
| Message(" too many merges; stalling..."); |
| } |
| //try |
| //{ |
| Monitor.Wait(this); |
| //} |
| //catch (ThreadInterruptedException ie) // LUCENENET NOTE: Senseless to catch and rethrow the same exception type |
| //{ |
| // throw new ThreadInterruptedException(ie.ToString(), ie); |
| //} |
| } |
| |
| if (IsVerbose) |
| { |
| if (startStallTime != 0) |
| { |
| Message(" stalled for " + (Environment.TickCount - startStallTime) + " msec"); |
| } |
| } |
| |
| MergePolicy.OneMerge merge = writer.NextMerge(); |
| if (merge == null) |
| { |
| if (IsVerbose) |
| { |
| Message(" no more merges pending; now return"); |
| } |
| return; |
| } |
| |
| bool success = false; |
| try |
| { |
| if (IsVerbose) |
| { |
| Message(" consider merge " + writer.SegString(merge.Segments)); |
| } |
| |
| // OK to spawn a new merge thread to handle this |
| // merge: |
| MergeThread merger = GetMergeThread(writer, merge); |
| m_mergeThreads.Add(merger); |
| if (IsVerbose) |
| { |
| Message(" launch new thread [" + merger.Name + "]"); |
| } |
| |
| merger.Start(); |
| |
| // Must call this after starting the thread else |
| // the new thread is removed from mergeThreads |
| // (since it's not alive yet): |
| UpdateMergeThreads(); |
| |
| success = true; |
| } |
| finally |
| { |
| if (!success) |
| { |
| writer.MergeFinish(merge); |
| } |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/> </summary> |
| [MethodImpl(MethodImplOptions.NoInlining)] |
| protected virtual void DoMerge(MergePolicy.OneMerge merge) |
| { |
| m_writer.Merge(merge); |
| } |
| |
| /// <summary> |
| /// Create and return a new <see cref="MergeThread"/> </summary> |
| protected virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) |
| { |
| lock (this) |
| { |
| MergeThread thread = new MergeThread(this, writer, merge); |
| thread.SetThreadPriority((ThreadPriority)mergeThreadPriority); |
| thread.IsBackground = true; |
| thread.Name = "Lucene Merge Thread #" + m_mergeThreadCount++; |
| return thread; |
| } |
| } |
| |
| /// <summary> |
| /// Runs a merge thread, which may run one or more merges |
| /// in sequence. |
| /// </summary> |
| protected internal class MergeThread : ThreadJob |
| { |
| private readonly ConcurrentMergeScheduler outerInstance; |
| |
| internal IndexWriter tWriter; |
| internal MergePolicy.OneMerge startMerge; |
| internal MergePolicy.OneMerge runningMerge; |
| private volatile bool done; |
| |
| /// <summary> |
| /// Sole constructor. </summary> |
| public MergeThread(ConcurrentMergeScheduler outerInstance, IndexWriter writer, MergePolicy.OneMerge startMerge) |
| { |
| this.outerInstance = outerInstance; |
| this.tWriter = writer; |
| this.startMerge = startMerge; |
| } |
| |
| /// <summary> |
| /// Record the currently running merge. </summary> |
| public virtual MergePolicy.OneMerge RunningMerge |
| { |
| set |
| { |
| lock (this) |
| { |
| runningMerge = value; |
| } |
| } |
| get |
| { |
| lock (this) |
| { |
| return runningMerge; |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Return the current merge, or <c>null</c> if this |
| /// <see cref="MergeThread"/> is done. |
| /// </summary> |
| public virtual MergePolicy.OneMerge CurrentMerge |
| { |
| get |
| { |
| lock (this) |
| { |
| if (done) |
| { |
| return null; |
| } |
| else if (runningMerge != null) |
| { |
| return runningMerge; |
| } |
| else |
| { |
| return startMerge; |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Set the priority of this thread. </summary> |
| public virtual void SetThreadPriority(ThreadPriority priority) |
| { |
| try |
| { |
| Priority = priority; |
| } |
| #pragma warning disable 168 |
| catch (NullReferenceException npe) |
| { |
| // Strangely, Sun's JDK 1.5 on Linux sometimes |
| // throws NPE out of here... |
| } |
| catch (SecurityException se) |
| #pragma warning restore 168 |
| { |
| // Ignore this because we will still run fine with |
| // normal thread priority |
| } |
| } |
| |
| public override void Run() |
| { |
| // First time through the while loop we do the merge |
| // that we were started with: |
| MergePolicy.OneMerge merge = this.startMerge; |
| |
| try |
| { |
| if (outerInstance.IsVerbose) |
| { |
| outerInstance.Message(" merge thread: start"); |
| } |
| |
| while (true) |
| { |
| RunningMerge = merge; |
| outerInstance.DoMerge(merge); |
| |
| // Subsequent times through the loop we do any new |
| // merge that writer says is necessary: |
| merge = tWriter.NextMerge(); |
| |
| // Notify here in case any threads were stalled; |
| // they will notice that the pending merge has |
| // been pulled and possibly resume: |
| lock (outerInstance) |
| { |
| Monitor.PulseAll(outerInstance); |
| } |
| |
| if (merge != null) |
| { |
| outerInstance.UpdateMergeThreads(); |
| if (outerInstance.IsVerbose) |
| { |
| outerInstance.Message(" merge thread: do another merge " + tWriter.SegString(merge.Segments)); |
| } |
| } |
| else |
| { |
| break; |
| } |
| } |
| |
| if (outerInstance.IsVerbose) |
| { |
| outerInstance.Message(" merge thread: done"); |
| } |
| } |
| catch (Exception exc) |
| { |
| // Ignore the exception if it was due to abort: |
| if (!(exc is MergePolicy.MergeAbortedException)) |
| { |
| //System.out.println(Thread.currentThread().getName() + ": CMS: exc"); |
| //exc.printStackTrace(System.out); |
| if (!outerInstance.suppressExceptions) |
| { |
| // suppressExceptions is normally only set during |
| // testing. |
| outerInstance.HandleMergeException(exc); |
| } |
| } |
| } |
| finally |
| { |
| done = true; |
| lock (outerInstance) |
| { |
| outerInstance.UpdateMergeThreads(); |
| Monitor.PulseAll(outerInstance); |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Called when an exception is hit in a background merge |
| /// thread |
| /// </summary> |
| protected virtual void HandleMergeException(Exception exc) |
| { |
| //try |
| //{ |
| // When an exception is hit during merge, IndexWriter |
| // removes any partial files and then allows another |
| // merge to run. If whatever caused the error is not |
| // transient then the exception will keep happening, |
| // so, we sleep here to avoid saturating CPU in such |
| // cases: |
| Thread.Sleep(250); |
| //} |
| //catch (ThreadInterruptedException ie) // LUCENENET NOTE: Senseless to catch and rethrow the same exception type |
| //{ |
| // throw new ThreadInterruptedException("Thread Interrupted Exception", ie); |
| //} |
| throw new MergePolicy.MergeException(exc, m_dir); |
| } |
| |
| private bool suppressExceptions; |
| |
| /// <summary> |
| /// Used for testing </summary> |
| public virtual void SetSuppressExceptions() |
| { |
| suppressExceptions = true; |
| } |
| |
| /// <summary> |
| /// Used for testing </summary> |
| public virtual void ClearSuppressExceptions() |
| { |
| suppressExceptions = false; |
| } |
| |
| public override string ToString() |
| { |
| StringBuilder sb = new StringBuilder(this.GetType().Name + ": "); |
| sb.Append("maxThreadCount=").Append(maxThreadCount).Append(", "); |
| sb.Append("maxMergeCount=").Append(maxMergeCount).Append(", "); |
| sb.Append("mergeThreadPriority=").Append(mergeThreadPriority); |
| return sb.ToString(); |
| } |
| |
| public override object Clone() |
| { |
| ConcurrentMergeScheduler clone = (ConcurrentMergeScheduler)base.Clone(); |
| clone.m_writer = null; |
| clone.m_dir = null; |
| clone.m_mergeThreads = new List<MergeThread>(); |
| return clone; |
| } |
| } |
| } |
| #endif |