blob: 6ac8e6611a74738ca941fd55acf066308a3c061c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Util.Synchronization;
using NUnit.Framework;
namespace NMS.AMQP.Test.Utils.Synchronization
{
[TestFixture]
public class NmsSynchronizationMonitorTest
{
public class EventList
{
public List<string> list = new List<string>();
public Dictionary<string, List<string>> listsByPrefix = new Dictionary<string, List<string>>();
public void Add(string ev)
{
lock (this)
{
list.Add(ev);
string prefix = new string(new[] {ev[0]});
if (!listsByPrefix.ContainsKey(prefix))
{
listsByPrefix[prefix] = new List<string>();
}
listsByPrefix[prefix].Add(ev);
}
}
public string ToString(string prefix)
{
return listsByPrefix.ContainsKey(prefix) ? string.Join("",listsByPrefix[prefix]) : string.Empty;
}
public override string ToString()
{
return string.Join("", list);
}
}
[Test]
public void TestNestedLockAndWait()
{
NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor();
ManualResetEvent lockedEvent = new ManualResetEvent(false);
EventList evList = new EventList();
var task = Task.Run(() =>
{
Thread.Sleep(5);
using (syncRoot.Lock())
{
evList.Add("A1");
Thread.Sleep(1);
evList.Add("A1");
Thread.Sleep(1);
evList.Add("A1");
Thread.Sleep(1);
using (syncRoot.Lock())
{
evList.Add("A1");
Thread.Sleep(1);
evList.Add("A1");
Thread.Sleep(1);
evList.Add("A1");
Thread.Sleep(1);
lockedEvent.Set();
syncRoot.Wait();
evList.Add("A2");
Thread.Sleep(1);
evList.Add("A2");
Thread.Sleep(1);
evList.Add("A2");
Thread.Sleep(1);
}
evList.Add("A2");
Thread.Sleep(1);
evList.Add("A2");
Thread.Sleep(1);
evList.Add("A2");
Thread.Sleep(1);
}
});
var taskB = Task.Run(() =>
{
while (!task.IsCompleted)
{
using (syncRoot.Lock())
{
evList.Add("B");
Thread.Sleep(1);
}
}
});
lockedEvent.WaitOne();
Thread.Sleep(10); // to give Task A Time to go to sleep and B to work
Task.Run(() => { syncRoot.Pulse(); });
task.Wait();
taskB.Wait();
// Now Asses that block A1 and A2 are not intersected by B, however A1 And A2 should have some B between (during sleep period)
var events = evList.list.Select((a, i) => new Tuple<string, int>(a, i));
var a1 = events.Where(a => a.Item1 == "A1").ToList();
var a2 = events.Where(a => a.Item1 == "A2").ToList();
Assert.AreEqual(6, a1.Count());
Assert.AreEqual(5, a1.Last().Item2 - a1.First().Item2); // not intersected by anything
Assert.AreEqual(6, a2.Count());
Assert.AreEqual(5, a2.Last().Item2 - a2.First().Item2); // not intersected by anything
Assert.Greater(Math.Abs(a1.Last().Item2 - a2.First().Item2), 1, "A1 and A2, should be intersected by B, and not happening one right after another");
}
[Test]
public void TestNestedLockAndWaitAsync()
{
NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor();
ManualResetEvent lockedEvent = new ManualResetEvent(false);
EventList evList = new EventList();
var task = Task.Run(async () =>
{
Thread.Sleep(5);
using (await syncRoot.LockAsync())
{
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
using (await syncRoot.LockAsync())
{
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
evList.Add("A1");
await Task.Delay(1);
await Task.Yield();
lockedEvent.Set();
await syncRoot.WaitAsync();
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
}
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
evList.Add("A2");
await Task.Delay(1);
await Task.Yield();
}
});
var taskB = Task.Run(async () =>
{
while (!task.IsCompleted)
{
using (await syncRoot.LockAsync())
{
evList.Add("B");
await Task.Delay(1);
}
}
});
lockedEvent.WaitOne();
Thread.Sleep(10); // to give Task A Time to go to sleep and B to work
Task.Run(() => { syncRoot.Pulse(); });
task.Wait();
taskB.Wait();
// Now Asses that block A1 and A2 are not intersected by B, however A1 And A2 should have some B between (during sleep period)
var events = evList.list.Select((a, i) => new Tuple<string, int>(a, i));
var a1 = events.Where(a => a.Item1 == "A1").ToList();
var a2 = events.Where(a => a.Item1 == "A2").ToList();
Assert.AreEqual(6, a1.Count());
Assert.AreEqual(5, a1.Last().Item2 - a1.First().Item2); // not intersected by anything
Assert.AreEqual(6, a2.Count());
Assert.AreEqual(5, a2.Last().Item2 - a2.First().Item2); // not intersected by anything
Assert.Greater(Math.Abs(a1.Last().Item2 - a2.First().Item2), 1, "A1 and A2, should be intersected by B, and not happening one right after another");
}
[TestCase(1,3500,6)]
[TestCase(0,2000,200)]
[Timeout(20_000)]
public void TestConcurrentProducersSyncAndAsync(int sleepTimeMs, int testTimeMs, int minimumOccurences)
{
EventList evListCommon = new EventList();
NmsSynchronizationMonitor syncRootA = new NmsSynchronizationMonitor();
NmsSynchronizationMonitor syncRootB = new NmsSynchronizationMonitor();
bool runTest = true;
// int sleepTimeMs = 1;
var task1 = Task.Run(async () =>
{
int counter = 0;
while (runTest)
{
var lockA = (counter % 2 == 0) ? syncRootA.Lock() : await syncRootA.LockAsync();
using (lockA)
{
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
var lockB = (counter % 2 == 0) ? syncRootB.Lock() : await syncRootB.LockAsync();
using (lockB)
{
evListCommon.Add("B1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("B1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("B1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
}
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add("A1");
await Task.Delay(sleepTimeMs);
await Task.Yield();
}
counter++;
}
});
var task2 = Task.Run(async () =>
{
int counter = 0;
// Also test the reentrancy of lock and mix of async and non async
async Task ResourceAccess(string symbol, int level, NmsSynchronizationMonitor syncRoot)
{
var locked = (counter % 2 == 0) ? syncRoot.Lock() : await syncRoot.LockAsync();
using (locked)
{
int depth = counter % 4;
if (level == depth)
{
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add(symbol);
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add(symbol);
await Task.Delay(sleepTimeMs);
await Task.Yield();
evListCommon.Add(symbol);
await Task.Delay(sleepTimeMs);
await Task.Yield();
}
else
{
await ResourceAccess(symbol, level + 1, syncRoot);
}
}
}
while (runTest)
{
await ResourceAccess("A2",0, syncRootA);
await ResourceAccess("B2",0,syncRootB);
counter++;
}
});
var task3 = Task.Run(() =>
{
int counter = 0;
while (runTest)
{
var lockA = (counter % 2 == 0) ? syncRootA.Lock() : syncRootA.LockAsync().GetAsyncResult();
using (lockA)
{
Thread.Sleep(sleepTimeMs);
evListCommon.Add("A3");
Thread.Sleep(sleepTimeMs);
evListCommon.Add("A3");
Thread.Sleep(sleepTimeMs);
evListCommon.Add("A3");
Thread.Sleep(sleepTimeMs);
}
var lockB = (counter % 2 == 0) ? syncRootB.Lock() : syncRootB.LockAsync().GetAsyncResult();
using (lockB)
{
evListCommon.Add("B3");
Thread.Sleep(sleepTimeMs);
evListCommon.Add("B3");
Thread.Sleep(sleepTimeMs);
evListCommon.Add("B3");
Thread.Sleep(sleepTimeMs);
}
counter++;
}
});
// Let it run for one sec
Thread.Sleep(testTimeMs);
runTest = false;
task1.Wait();
task2.Wait();
task3.Wait();
var sequenceCommon = evListCommon.ToString();
var sequenceA = evListCommon.ToString("A");
var sequenceB = evListCommon.ToString("B");
// remove B locks interfering with A sequence of rvalidating task1
Enumerable.Range(0,10).ToList().ForEach( (i) => sequenceCommon = sequenceCommon
.Replace("A1B2", "A1")
.Replace("A1B3", "A1")
.Replace("B2A1","A1")
.Replace("B3A1","A1")
);
sequenceCommon = sequenceCommon.Replace("A1A1A1B1B1B1A1A1A1", "");
// The only allowed sequence in common for task1 is nested sequence
Assert.IsFalse(sequenceCommon.Contains("A1"), "Sequence should only contain task 1 resource in right order:"+sequenceCommon);
Assert.IsFalse(sequenceCommon.Contains("B1"), "Sequence should only contain task 1 resource in right order:"+sequenceCommon);
int countA1 = sequenceA.Where(a => a == '1').Count();
int countA2 = sequenceA.Where(a => a == '2').Count();
int countA3 = sequenceA.Where(a => a == '3').Count();
int countB1 = sequenceB.Where(a => a == '1').Count();
int countB2 = sequenceB.Where(a => a == '2').Count();
int countB3 = sequenceB.Where(a => a == '3').Count();
// Assert that all of the threads had their fair share of action
Assert.GreaterOrEqual(countA1, minimumOccurences);
Assert.GreaterOrEqual(countA2, minimumOccurences);
Assert.GreaterOrEqual(countA3, minimumOccurences);
Assert.GreaterOrEqual(countB1, minimumOccurences);
Assert.GreaterOrEqual(countB2, minimumOccurences);
Assert.GreaterOrEqual(countB3, minimumOccurences);
sequenceA = sequenceA.Replace("A1A1A1", "");
sequenceA = sequenceA.Replace("A2A2A2", "");
sequenceA = sequenceA.Replace("A3A3A3", "");
sequenceB = sequenceB.Replace("B1B1B1", "");
sequenceB = sequenceB.Replace("B2B2B2", "");
sequenceB = sequenceB.Replace("B3B3B3", "");
Assert.AreEqual(0,sequenceA.Length, "There were illegal sequences of execution for resource A: "+sequenceA);
Assert.AreEqual(0,sequenceB.Length, "There were illegal sequences of execution for resource B: "+sequenceB);
}
}
}