| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| |
| namespace Org.Apache.REEF.Wake.Util |
| { |
| internal sealed class LimitedConcurrencyLevelTaskScheduler : TaskScheduler |
| { |
| private static readonly Logger LOGGER = Logger.GetLogger(typeof(LimitedConcurrencyLevelTaskScheduler)); |
| |
| /// <summary>Whether the current thread is processing work items.</summary> |
| [ThreadStatic] |
| private static bool _currentThreadIsProcessingItems; |
| |
| /// <summary>The list of tasks to be executed.</summary> |
| private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) |
| |
| /// <summary>The maximum concurrency level allowed by this scheduler.</summary> |
| private readonly int _maxDegreeOfParallelism; |
| |
| /// <summary>Whether the scheduler is currently processing work items.</summary> |
| private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) |
| |
| /// <summary> |
| /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the |
| /// specified degree of parallelism. |
| /// </summary> |
| /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> |
| public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) |
| { |
| if (maxDegreeOfParallelism < 1) |
| { |
| Exceptions.Throw(new ArgumentOutOfRangeException("maxDegreeOfParallelism"), LOGGER); |
| } |
| _maxDegreeOfParallelism = maxDegreeOfParallelism; |
| } |
| |
| /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> |
| public sealed override int MaximumConcurrencyLevel |
| { |
| get |
| { |
| return _maxDegreeOfParallelism; |
| } |
| } |
| |
| /// <summary>Queues a task to the scheduler.</summary> |
| /// <param name="task">The task to be queued.</param> |
| protected sealed override void QueueTask(Task task) |
| { |
| // Add the task to the list of tasks to be processed. If there aren't enough |
| // delegates currently queued or running to process tasks, schedule another. |
| lock (_tasks) |
| { |
| _tasks.AddLast(task); |
| if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) |
| { |
| ++_delegatesQueuedOrRunning; |
| NotifyThreadPoolOfPendingWork(); |
| } |
| } |
| } |
| |
| /// <summary>Attempts to execute the specified task on the current thread.</summary> |
| /// <param name="task">The task to be executed.</param> |
| /// <param name="taskWasPreviouslyQueued"></param> |
| /// <returns>Whether the task could be executed on the current thread.</returns> |
| protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) |
| { |
| // If this thread isn't already processing a task, we don't support inlining |
| if (!_currentThreadIsProcessingItems) |
| { |
| return false; |
| } |
| |
| // If the task was previously queued, remove it from the queue |
| if (taskWasPreviouslyQueued) |
| { |
| TryDequeue(task); |
| } |
| |
| // Try to run the task. |
| return TryExecuteTask(task); |
| } |
| |
| /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> |
| /// <param name="task">The task to be removed.</param> |
| /// <returns>Whether the task could be found and removed.</returns> |
| protected sealed override bool TryDequeue(Task task) |
| { |
| lock (_tasks) |
| { |
| return _tasks.Remove(task); |
| } |
| } |
| |
| /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> |
| /// <returns>An enumerable of the tasks currently scheduled.</returns> |
| protected sealed override IEnumerable<Task> GetScheduledTasks() |
| { |
| bool lockTaken = false; |
| try |
| { |
| Monitor.TryEnter(_tasks, ref lockTaken); |
| if (lockTaken) |
| { |
| return _tasks.ToArray(); |
| } |
| else |
| { |
| throw new NotSupportedException(); |
| } |
| } |
| finally |
| { |
| if (lockTaken) |
| { |
| Monitor.Exit(_tasks); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Informs the ThreadPool that there's work to be executed for this scheduler. |
| /// </summary> |
| private void NotifyThreadPoolOfPendingWork() |
| { |
| ThreadPool.UnsafeQueueUserWorkItem(_ => |
| { |
| // Note that the current thread is now processing work items. |
| // This is necessary to enable inlining of tasks into this thread. |
| _currentThreadIsProcessingItems = true; |
| try |
| { |
| // Process all available items in the queue. |
| while (true) |
| { |
| Task item; |
| lock (_tasks) |
| { |
| // When there are no more items to be processed, |
| // note that we're done processing, and get out. |
| if (_tasks.Count == 0) |
| { |
| --_delegatesQueuedOrRunning; |
| break; |
| } |
| |
| // Get the next item from the queue |
| item = _tasks.First.Value; |
| _tasks.RemoveFirst(); |
| } |
| |
| // Execute the task we pulled out of the queue |
| base.TryExecuteTask(item); |
| } |
| } |
| |
| // We're done processing items on the current thread |
| finally |
| { |
| _currentThreadIsProcessingItems = false; |
| } |
| }, null); |
| } |
| } |
| } |