blob: eaea6640268e4e7b04201a01e6196b8cbba98831 [file] [log] [blame]
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
{
public sealed class StateTaskCollection<TState>
{
private readonly object _lock;
private readonly LinkedList<StateTask<TState>> _awaitors;
public StateTaskCollection()
{
_lock = new object();
_awaitors = new LinkedList<StateTask<TState>>();
}
public Task<TState> CreateTaskFor(TState state, StateChanged changed, CancellationToken cancellationToken)
{
LinkedListNode<StateTask<TState>> node;
lock (_lock)
{
node = _awaitors.AddFirst(new StateTask<TState>(state, changed));
}
node.Value.CancelableCompletionSource.SetupCancellation(() => TaskWasCanceled(node), cancellationToken);
return node.Value.CancelableCompletionSource.Task;
}
public void CompleteTasksAwaiting(TState state)
{
lock (_lock)
{
var awaitor = _awaitors.First;
while (awaitor != null)
{
var next = awaitor.Next;
if (awaitor.Value.IsAwaiting(state))
{
_awaitors.Remove(awaitor);
awaitor.Value.CancelableCompletionSource.SetResult(state);
awaitor.Value.CancelableCompletionSource.Dispose();
}
awaitor = next;
}
}
}
public void CompleteAllTasks(TState state)
{
lock (_lock)
{
foreach (var awaitor in _awaitors)
{
awaitor.CancelableCompletionSource.SetResult(state);
awaitor.CancelableCompletionSource.Dispose();
}
_awaitors.Clear();
}
}
private void TaskWasCanceled(LinkedListNode<StateTask<TState>> node)
{
lock (_lock)
{
try
{
_awaitors.Remove(node);
node.Value.Dispose();
}
catch
{
// Ignore
}
}
}
}
}