blob: 6faf12d15ddaaef496116f64803e505af78c90cc [file] [log] [blame]
using J2N.Threading;
using J2N.Threading.Atomic;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Threading;
using Assert = Lucene.Net.TestFramework.Assert;
using Console = Lucene.Net.Util.SystemConsole;
namespace Lucene.Net.Index
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using LuceneTestCase = Lucene.Net.Util.LuceneTestCase;
/// <summary>
/// Tests for <seealso cref="DocumentsWriterStallControl"/>
/// </summary>
[TestFixture]
[Timeout(900000)]
public class TestDocumentsWriterStallControl : LuceneTestCase
{
[Test]
public virtual void TestSimpleStall()
{
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.UpdateStalled(false);
ThreadJob[] waitThreads = WaitThreads(AtLeast(1), ctrl);
Start(waitThreads);
Assert.IsFalse(ctrl.HasBlocked);
Assert.IsFalse(ctrl.AnyStalledThreads());
Join(waitThreads);
// now stall threads and wake them up again
ctrl.UpdateStalled(true);
waitThreads = WaitThreads(AtLeast(1), ctrl);
Start(waitThreads);
AwaitState(ThreadState.WaitSleepJoin, waitThreads);
Assert.IsTrue(ctrl.HasBlocked);
Assert.IsTrue(ctrl.AnyStalledThreads());
ctrl.UpdateStalled(false);
Assert.IsFalse(ctrl.AnyStalledThreads());
Join(waitThreads);
}
[Test]
public virtual void TestRandom()
{
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.UpdateStalled(false);
ThreadJob[] stallThreads = new ThreadJob[AtLeast(3)];
for (int i = 0; i < stallThreads.Length; i++)
{
int stallProbability = 1 + Random.Next(10);
stallThreads[i] = new ThreadAnonymousInnerClassHelper(ctrl, stallProbability);
}
Start(stallThreads);
long time = Environment.TickCount;
/*
* use a 100 sec timeout to make sure we not hang forever. join will fail in
* that case
*/
while ((Environment.TickCount - time) < 100 * 1000 && !Terminated(stallThreads))
{
ctrl.UpdateStalled(false);
if (Random.NextBoolean())
{
Thread.Sleep(0);
}
else
{
Thread.Sleep(1);
}
}
Join(stallThreads);
}
private class ThreadAnonymousInnerClassHelper : ThreadJob
{
private readonly DocumentsWriterStallControl ctrl;
private readonly int stallProbability;
public ThreadAnonymousInnerClassHelper(DocumentsWriterStallControl ctrl, int stallProbability)
{
this.ctrl = ctrl;
this.stallProbability = stallProbability;
}
public override void Run()
{
int iters = AtLeast(1000);
for (int j = 0; j < iters; j++)
{
ctrl.UpdateStalled(Random.Next(stallProbability) == 0);
if (Random.Next(5) == 0) // thread 0 only updates
{
ctrl.WaitIfStalled();
}
}
}
}
[Test]
public virtual void TestAccquireReleaseRace()
{
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
ctrl.UpdateStalled(false);
AtomicBoolean stop = new AtomicBoolean(false);
AtomicBoolean checkPoint = new AtomicBoolean(true);
int numStallers = AtLeast(1);
int numReleasers = AtLeast(1);
int numWaiters = AtLeast(1);
var sync = new Synchronizer(numStallers + numReleasers, numStallers + numReleasers + numWaiters);
var threads = new ThreadJob[numReleasers + numStallers + numWaiters];
IList<Exception> exceptions = new SynchronizedList<Exception>();
for (int i = 0; i < numReleasers; i++)
{
threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, exceptions);
}
for (int i = numReleasers; i < numReleasers + numStallers; i++)
{
threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, exceptions);
}
for (int i = numReleasers + numStallers; i < numReleasers + numStallers + numWaiters; i++)
{
threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions);
}
Start(threads);
int iters = AtLeast(10000);
//float checkPointProbability = TestNightly ? 0.5f : 0.1f;
// LUCENENET specific - reduced probabliltiy on x86 to prevent it from timing out.
float checkPointProbability = TestNightly ? (Lucene.Net.Util.Constants.RUNTIME_IS_64BIT ? 0.5f : 0.25f) : 0.1f;
for (int i = 0; i < iters; i++)
{
if (checkPoint)
{
Assert.IsTrue(sync.updateJoin.Wait(TimeSpan.FromSeconds(10)), "timed out waiting for update threads - deadlock?");
if (exceptions.Count > 0)
{
foreach (Exception throwable in exceptions)
{
Console.WriteLine(throwable.ToString());
Console.Write(throwable.StackTrace);
}
Assert.Fail("got exceptions in threads");
}
if (ctrl.HasBlocked && ctrl.IsHealthy)
{
AssertState(numReleasers, numStallers, numWaiters, threads, ctrl);
}
checkPoint.Value = (false);
sync.waiter.Signal();
sync.leftCheckpoint.Wait();
}
Assert.IsFalse(checkPoint);
Assert.AreEqual(0, sync.waiter.CurrentCount);
if (checkPointProbability >= (float)Random.NextDouble())
{
sync.Reset(numStallers + numReleasers, numStallers + numReleasers + numWaiters);
checkPoint.Value = (true);
}
}
if (!checkPoint)
{
sync.Reset(numStallers + numReleasers, numStallers + numReleasers + numWaiters);
checkPoint.Value = (true);
}
Assert.IsTrue(sync.updateJoin.Wait(new TimeSpan(0, 0, 0, 10)));
AssertState(numReleasers, numStallers, numWaiters, threads, ctrl);
checkPoint.Value = (false);
stop.Value = (true);
sync.waiter.Signal();
sync.leftCheckpoint.Wait();
for (int i = 0; i < threads.Length; i++)
{
ctrl.UpdateStalled(false);
threads[i].Join(2000);
if (threads[i].IsAlive && threads[i] is Waiter)
{
if (threads[i].State == ThreadState.WaitSleepJoin)
{
Assert.Fail("waiter is not released - anyThreadsStalled: " + ctrl.AnyStalledThreads());
}
}
}
}
private void AssertState(int numReleasers, int numStallers, int numWaiters, ThreadJob[] threads, DocumentsWriterStallControl ctrl)
{
int millisToSleep = 100;
while (true)
{
if (ctrl.HasBlocked && ctrl.IsHealthy)
{
for (int n = numReleasers + numStallers; n < numReleasers + numStallers + numWaiters; n++)
{
if (ctrl.IsThreadQueued(threads[n]))
{
if (millisToSleep < 60000)
{
Thread.Sleep(millisToSleep);
millisToSleep *= 2;
break;
}
else
{
Assert.Fail("control claims no stalled threads but waiter seems to be blocked ");
}
}
}
break;
}
else
{
break;
}
}
}
internal class Waiter : ThreadJob
{
internal Synchronizer sync;
internal DocumentsWriterStallControl ctrl;
internal AtomicBoolean checkPoint;
internal AtomicBoolean stop;
internal IList<Exception> exceptions;
public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint, DocumentsWriterStallControl ctrl, Synchronizer sync, IList<Exception> exceptions)
: base("waiter")
{
this.stop = stop;
this.checkPoint = checkPoint;
this.ctrl = ctrl;
this.sync = sync;
this.exceptions = exceptions;
}
public override void Run()
{
try
{
while (!stop)
{
ctrl.WaitIfStalled();
if (checkPoint)
{
#if FEATURE_THREAD_INTERRUPT
try
{
#endif
Assert.IsTrue(sync.await());
#if FEATURE_THREAD_INTERRUPT
}
catch (ThreadInterruptedException /*e*/)
{
Console.WriteLine("[Waiter] got interrupted - wait count: " + sync.waiter.CurrentCount);
//throw new ThreadInterruptedException("Thread Interrupted Exception", e);
throw; // LUCENENET: CA2200: Rethrow to preserve stack details (https://docs.microsoft.com/en-us/visualstudio/code-quality/ca2200-rethrow-to-preserve-stack-details)
}
#endif
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
Console.Write(e.StackTrace);
exceptions.Add(e);
}
}
}
internal class Updater : ThreadJob
{
internal Synchronizer sync;
internal DocumentsWriterStallControl ctrl;
internal AtomicBoolean checkPoint;
internal AtomicBoolean stop;
internal bool release;
internal IList<Exception> exceptions;
public Updater(AtomicBoolean stop, AtomicBoolean checkPoint, DocumentsWriterStallControl ctrl, Synchronizer sync, bool release, IList<Exception> exceptions)
: base("updater")
{
this.stop = stop;
this.checkPoint = checkPoint;
this.ctrl = ctrl;
this.sync = sync;
this.release = release;
this.exceptions = exceptions;
}
public override void Run()
{
try
{
while (!stop)
{
int internalIters = release && Random.NextBoolean() ? AtLeast(5) : 1;
for (int i = 0; i < internalIters; i++)
{
ctrl.UpdateStalled(Random.NextBoolean());
}
if (checkPoint)
{
sync.updateJoin.Signal();
try
{
Assert.IsTrue(sync.await());
}
#if FEATURE_THREAD_INTERRUPT
catch (ThreadInterruptedException /*e*/)
{
Console.WriteLine("[Updater] got interrupted - wait count: " + sync.waiter.CurrentCount);
//throw new ThreadInterruptedException("Thread Interrupted Exception", e);
throw; // LUCENENET: CA2200: Rethrow to preserve stack details (https://docs.microsoft.com/en-us/visualstudio/code-quality/ca2200-rethrow-to-preserve-stack-details)
}
#endif
catch (Exception e)
{
Console.Write("signal failed with : " + e);
throw; // LUCENENET: CA2200: Rethrow to preserve stack details (https://docs.microsoft.com/en-us/visualstudio/code-quality/ca2200-rethrow-to-preserve-stack-details)
}
sync.leftCheckpoint.Signal();
}
if (Random.NextBoolean())
{
Thread.Sleep(0);
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
Console.Write(e.StackTrace);
exceptions.Add(e);
}
if (!sync.updateJoin.IsSet)
{
sync.updateJoin.Signal();
}
}
}
public static bool Terminated(ThreadJob[] threads)
{
foreach (ThreadJob thread in threads)
{
if (ThreadState.Stopped != thread.State)
{
return false;
}
}
return true;
}
public static void Start(ThreadJob[] tostart)
{
foreach (ThreadJob thread in tostart)
{
thread.Start();
}
Thread.Sleep(1); // let them start
}
public static void Join(ThreadJob[] toJoin)
{
foreach (ThreadJob thread in toJoin)
{
thread.Join();
}
}
internal static ThreadJob[] WaitThreads(int num, DocumentsWriterStallControl ctrl)
{
ThreadJob[] array = new ThreadJob[num];
for (int i = 0; i < array.Length; i++)
{
array[i] = new ThreadAnonymousInnerClassHelper2(ctrl);
}
return array;
}
private class ThreadAnonymousInnerClassHelper2 : ThreadJob
{
private readonly DocumentsWriterStallControl ctrl;
public ThreadAnonymousInnerClassHelper2(DocumentsWriterStallControl ctrl)
{
this.ctrl = ctrl;
}
public override void Run()
{
ctrl.WaitIfStalled();
}
}
/// <summary>
/// Waits for all incoming threads to be in wait()
/// methods.
/// </summary>
public static void AwaitState(ThreadState state, params ThreadJob[] threads)
{
while (true)
{
bool done = true;
foreach (ThreadJob thread in threads)
{
if (thread.State != state)
{
done = false;
break;
}
}
if (done)
{
return;
}
if (Random.NextBoolean())
{
Thread.Sleep(0);
}
else
{
Thread.Sleep(1);
}
}
}
public sealed class Synchronizer
{
internal volatile CountdownEvent waiter;
internal volatile CountdownEvent updateJoin;
internal volatile CountdownEvent leftCheckpoint;
public Synchronizer(int numUpdater, int numThreads)
{
Reset(numUpdater, numThreads);
}
public void Reset(int numUpdaters, int numThreads)
{
this.waiter = new CountdownEvent(1);
this.updateJoin = new CountdownEvent(numUpdaters);
this.leftCheckpoint = new CountdownEvent(numUpdaters);
}
public bool @await()
{
return waiter.Wait(new TimeSpan(0, 0, 0, 10));
}
}
}
}