| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using Apache.NMS.AMQP.Util.Synchronization; |
| using NUnit.Framework; |
| |
| namespace NMS.AMQP.Test.Utils.Synchronization |
| { |
| [TestFixture] |
| public class NmsSynchronizationMonitorTest |
| { |
| public class EventList |
| { |
| public List<string> list = new List<string>(); |
| public Dictionary<string, List<string>> listsByPrefix = new Dictionary<string, List<string>>(); |
| |
| public void Add(string ev) |
| { |
| lock (this) |
| { |
| list.Add(ev); |
| string prefix = new string(new[] {ev[0]}); |
| if (!listsByPrefix.ContainsKey(prefix)) |
| { |
| listsByPrefix[prefix] = new List<string>(); |
| } |
| listsByPrefix[prefix].Add(ev); |
| } |
| } |
| |
| public string ToString(string prefix) |
| { |
| return listsByPrefix.ContainsKey(prefix) ? string.Join("",listsByPrefix[prefix]) : string.Empty; |
| } |
| |
| public override string ToString() |
| { |
| return string.Join("", list); |
| } |
| } |
| |
| |
| [Test] |
| public void TestNestedLockAndWait() |
| { |
| NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor(); |
| ManualResetEvent lockedEvent = new ManualResetEvent(false); |
| |
| EventList evList = new EventList(); |
| |
| var task = Task.Run(() => |
| { |
| Thread.Sleep(5); |
| using (syncRoot.Lock()) |
| { |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| |
| using (syncRoot.Lock()) |
| { |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| evList.Add("A1"); |
| Thread.Sleep(1); |
| |
| lockedEvent.Set(); |
| syncRoot.Wait(); |
| |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| } |
| |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| evList.Add("A2"); |
| Thread.Sleep(1); |
| } |
| }); |
| |
| var taskB = Task.Run(() => |
| { |
| while (!task.IsCompleted) |
| { |
| using (syncRoot.Lock()) |
| { |
| evList.Add("B"); |
| Thread.Sleep(1); |
| } |
| } |
| }); |
| |
| lockedEvent.WaitOne(); |
| Thread.Sleep(10); // to give Task A Time to go to sleep and B to work |
| |
| |
| Task.Run(() => { syncRoot.Pulse(); }); |
| |
| task.Wait(); |
| taskB.Wait(); |
| |
| |
| // Now Asses that block A1 and A2 are not intersected by B, however A1 And A2 should have some B between (during sleep period) |
| var events = evList.list.Select((a, i) => new Tuple<string, int>(a, i)); |
| var a1 = events.Where(a => a.Item1 == "A1").ToList(); |
| var a2 = events.Where(a => a.Item1 == "A2").ToList(); |
| Assert.AreEqual(6, a1.Count()); |
| Assert.AreEqual(5, a1.Last().Item2 - a1.First().Item2); // not intersected by anything |
| |
| Assert.AreEqual(6, a2.Count()); |
| Assert.AreEqual(5, a2.Last().Item2 - a2.First().Item2); // not intersected by anything |
| |
| Assert.Greater(Math.Abs(a1.Last().Item2 - a2.First().Item2), 1, "A1 and A2, should be intersected by B, and not happening one right after another"); |
| } |
| |
| [Test] |
| public void TestNestedLockAndWaitAsync() |
| { |
| NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor(); |
| ManualResetEvent lockedEvent = new ManualResetEvent(false); |
| |
| EventList evList = new EventList(); |
| |
| var task = Task.Run(async () => |
| { |
| Thread.Sleep(5); |
| using (await syncRoot.LockAsync()) |
| { |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| |
| using (await syncRoot.LockAsync()) |
| { |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A1"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| |
| lockedEvent.Set(); |
| await syncRoot.WaitAsync(); |
| |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| } |
| |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| evList.Add("A2"); |
| await Task.Delay(1); |
| await Task.Yield(); |
| } |
| }); |
| |
| var taskB = Task.Run(async () => |
| { |
| while (!task.IsCompleted) |
| { |
| using (await syncRoot.LockAsync()) |
| { |
| evList.Add("B"); |
| await Task.Delay(1); |
| } |
| } |
| }); |
| |
| lockedEvent.WaitOne(); |
| Thread.Sleep(10); // to give Task A Time to go to sleep and B to work |
| |
| |
| Task.Run(() => { syncRoot.Pulse(); }); |
| |
| task.Wait(); |
| taskB.Wait(); |
| |
| |
| // Now Asses that block A1 and A2 are not intersected by B, however A1 And A2 should have some B between (during sleep period) |
| var events = evList.list.Select((a, i) => new Tuple<string, int>(a, i)); |
| var a1 = events.Where(a => a.Item1 == "A1").ToList(); |
| var a2 = events.Where(a => a.Item1 == "A2").ToList(); |
| Assert.AreEqual(6, a1.Count()); |
| Assert.AreEqual(5, a1.Last().Item2 - a1.First().Item2); // not intersected by anything |
| |
| Assert.AreEqual(6, a2.Count()); |
| Assert.AreEqual(5, a2.Last().Item2 - a2.First().Item2); // not intersected by anything |
| |
| Assert.Greater(Math.Abs(a1.Last().Item2 - a2.First().Item2), 1, "A1 and A2, should be intersected by B, and not happening one right after another"); |
| } |
| |
| |
| [TestCase(1,3500,6)] |
| [TestCase(0,2000,200)] |
| [Timeout(20_000)] |
| public void TestConcurrentProducersSyncAndAsync(int sleepTimeMs, int testTimeMs, int minimumOccurences) |
| { |
| EventList evListCommon = new EventList(); |
| |
| NmsSynchronizationMonitor syncRootA = new NmsSynchronizationMonitor(); |
| NmsSynchronizationMonitor syncRootB = new NmsSynchronizationMonitor(); |
| bool runTest = true; |
| |
| // int sleepTimeMs = 1; |
| |
| var task1 = Task.Run(async () => |
| { |
| int counter = 0; |
| while (runTest) |
| { |
| var lockA = (counter % 2 == 0) ? syncRootA.Lock() : await syncRootA.LockAsync(); |
| using (lockA) |
| { |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| |
| var lockB = (counter % 2 == 0) ? syncRootB.Lock() : await syncRootB.LockAsync(); |
| using (lockB) |
| { |
| evListCommon.Add("B1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("B1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("B1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| } |
| |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add("A1"); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| } |
| |
| counter++; |
| } |
| }); |
| |
| var task2 = Task.Run(async () => |
| { |
| int counter = 0; |
| |
| // Also test the reentrancy of lock and mix of async and non async |
| async Task ResourceAccess(string symbol, int level, NmsSynchronizationMonitor syncRoot) |
| { |
| var locked = (counter % 2 == 0) ? syncRoot.Lock() : await syncRoot.LockAsync(); |
| using (locked) |
| { |
| int depth = counter % 4; |
| if (level == depth) |
| { |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add(symbol); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add(symbol); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| evListCommon.Add(symbol); |
| await Task.Delay(sleepTimeMs); |
| await Task.Yield(); |
| } |
| else |
| { |
| await ResourceAccess(symbol, level + 1, syncRoot); |
| } |
| } |
| } |
| |
| while (runTest) |
| { |
| await ResourceAccess("A2",0, syncRootA); |
| await ResourceAccess("B2",0,syncRootB); |
| |
| counter++; |
| } |
| }); |
| |
| var task3 = Task.Run(() => |
| { |
| int counter = 0; |
| while (runTest) |
| { |
| var lockA = (counter % 2 == 0) ? syncRootA.Lock() : syncRootA.LockAsync().GetAsyncResult(); |
| using (lockA) |
| { |
| Thread.Sleep(sleepTimeMs); |
| evListCommon.Add("A3"); |
| Thread.Sleep(sleepTimeMs); |
| evListCommon.Add("A3"); |
| Thread.Sleep(sleepTimeMs); |
| evListCommon.Add("A3"); |
| Thread.Sleep(sleepTimeMs); |
| } |
| |
| var lockB = (counter % 2 == 0) ? syncRootB.Lock() : syncRootB.LockAsync().GetAsyncResult(); |
| using (lockB) |
| { |
| evListCommon.Add("B3"); |
| Thread.Sleep(sleepTimeMs); |
| evListCommon.Add("B3"); |
| Thread.Sleep(sleepTimeMs); |
| evListCommon.Add("B3"); |
| Thread.Sleep(sleepTimeMs); |
| } |
| |
| |
| counter++; |
| } |
| }); |
| |
| // Let it run for one sec |
| Thread.Sleep(testTimeMs); |
| |
| runTest = false; |
| task1.Wait(); |
| task2.Wait(); |
| task3.Wait(); |
| |
| var sequenceCommon = evListCommon.ToString(); |
| var sequenceA = evListCommon.ToString("A"); |
| var sequenceB = evListCommon.ToString("B"); |
| |
| // remove B locks interfering with A sequence of rvalidating task1 |
| |
| Enumerable.Range(0,10).ToList().ForEach( (i) => sequenceCommon = sequenceCommon |
| .Replace("A1B2", "A1") |
| .Replace("A1B3", "A1") |
| .Replace("B2A1","A1") |
| .Replace("B3A1","A1") |
| ); |
| sequenceCommon = sequenceCommon.Replace("A1A1A1B1B1B1A1A1A1", ""); |
| // The only allowed sequence in common for task1 is nested sequence |
| Assert.IsFalse(sequenceCommon.Contains("A1"), "Sequence should only contain task 1 resource in right order:"+sequenceCommon); |
| Assert.IsFalse(sequenceCommon.Contains("B1"), "Sequence should only contain task 1 resource in right order:"+sequenceCommon); |
| |
| int countA1 = sequenceA.Where(a => a == '1').Count(); |
| int countA2 = sequenceA.Where(a => a == '2').Count(); |
| int countA3 = sequenceA.Where(a => a == '3').Count(); |
| |
| int countB1 = sequenceB.Where(a => a == '1').Count(); |
| int countB2 = sequenceB.Where(a => a == '2').Count(); |
| int countB3 = sequenceB.Where(a => a == '3').Count(); |
| |
| // Assert that all of the threads had their fair share of action |
| Assert.GreaterOrEqual(countA1, minimumOccurences); |
| Assert.GreaterOrEqual(countA2, minimumOccurences); |
| Assert.GreaterOrEqual(countA3, minimumOccurences); |
| Assert.GreaterOrEqual(countB1, minimumOccurences); |
| Assert.GreaterOrEqual(countB2, minimumOccurences); |
| Assert.GreaterOrEqual(countB3, minimumOccurences); |
| |
| |
| sequenceA = sequenceA.Replace("A1A1A1", ""); |
| sequenceA = sequenceA.Replace("A2A2A2", ""); |
| sequenceA = sequenceA.Replace("A3A3A3", ""); |
| |
| sequenceB = sequenceB.Replace("B1B1B1", ""); |
| sequenceB = sequenceB.Replace("B2B2B2", ""); |
| sequenceB = sequenceB.Replace("B3B3B3", ""); |
| |
| |
| Assert.AreEqual(0,sequenceA.Length, "There were illegal sequences of execution for resource A: "+sequenceA); |
| Assert.AreEqual(0,sequenceB.Length, "There were illegal sequences of execution for resource B: "+sequenceB); |
| |
| } |
| } |
| } |