blob: f154d02fd30ba81fd0617f2ffdcac66b90369be0 [file] [log] [blame]
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace Lucene.Net.Index
{
using Lucene.Net.Support;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// <summary>
/// Controls the health status of a <seealso cref="DocumentsWriter"/> sessions. this class
/// used to block incoming indexing threads if flushing significantly slower than
/// indexing to ensure the <seealso cref="DocumentsWriter"/>s healthiness. If flushing is
/// significantly slower than indexing the net memory used within an
/// <seealso cref="IndexWriter"/> session can increase very quickly and easily exceed the
/// JVM's available memory.
/// <p>
/// To prevent OOM Errors and ensure IndexWriter's stability this class blocks
/// incoming threads from indexing once 2 x number of available
/// <seealso cref="ThreadState"/>s in <seealso cref="DocumentsWriterPerThreadPool"/> is exceeded.
/// Once flushing catches up and the number of flushing DWPT is equal or lower
/// than the number of active <seealso cref="ThreadState"/>s threads are released and can
/// continue indexing.
/// </summary>
public sealed class DocumentsWriterStallControl
{
private volatile bool Stalled;
private int NumWaiting; // only with assert
private bool WasStalled_Renamed; // only with assert
private readonly IDictionary<ThreadClass, bool?> Waiting = new IdentityHashMap<ThreadClass, bool?>(); // only with assert
/// <summary>
/// Update the stalled flag status. this method will set the stalled flag to
/// <code>true</code> iff the number of flushing
/// <seealso cref="DocumentsWriterPerThread"/> is greater than the number of active
/// <seealso cref="DocumentsWriterPerThread"/>. Otherwise it will reset the
/// <seealso cref="DocumentsWriterStallControl"/> to healthy and release all threads
/// waiting on <seealso cref="#waitIfStalled()"/>
/// </summary>
public void UpdateStalled(bool stalled)
{
lock (this)
{
this.Stalled = stalled;
if (stalled)
{
WasStalled_Renamed = true;
}
Monitor.PulseAll(this);
}
}
/// <summary>
/// Blocks if documents writing is currently in a stalled state.
///
/// </summary>
public void WaitIfStalled()
{
if (Stalled)
{
lock (this)
{
if (Stalled) // react on the first wakeup call!
{
// don't loop here, higher level logic will re-stall!
try
{
// make sure not to run IncWaiters / DecrWaiters in Debug.Assert as that gets
// removed at compile time if built in Release mode
var result = IncWaiters();
Debug.Assert(result);
Monitor.Wait(this);
result = DecrWaiters();
Debug.Assert(result);
}
catch (ThreadInterruptedException e)
{
throw new ThreadInterruptedException("Thread Interrupted Exception", e);
}
}
}
}
}
public bool AnyStalledThreads()
{
return Stalled;
}
private bool IncWaiters()
{
NumWaiting++;
bool existed = Waiting.ContainsKey(ThreadClass.Current());
Debug.Assert(!existed);
Waiting[ThreadClass.Current()] = true;
return NumWaiting > 0;
}
private bool DecrWaiters()
{
NumWaiting--;
bool removed = Waiting.Remove(ThreadClass.Current());
Debug.Assert(removed);
return NumWaiting >= 0;
}
public bool HasBlocked() // for tests
{
lock (this)
{
return NumWaiting > 0;
}
}
public bool Healthy
{
get
{
return !Stalled; // volatile read!
}
}
public bool IsThreadQueued(ThreadClass t) // for tests
{
lock (this)
{
return Waiting.ContainsKey(t);
}
}
public bool WasStalled() // for tests
{
lock (this)
{
return WasStalled_Renamed;
}
}
}
}