blob: 53a3e8feb0e85bc664be493854cf4e3167396761 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests.Internal;
using DotPulsar.Internal;
using DotPulsar.Internal.Exceptions;
using System;
[Trait("Category", "Unit")]
public class AsyncQueueWithCursorTests
{
[Fact]
public async Task TryPeek_GivenQueueIsEmpty_ShouldReturnFalse()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(10);
//Act
var actual = sut.TryPeek(out _);
//Assert
actual.Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task TryPeek_GivenQueueHasItem_ShouldReturnTrueAndItem()
{
//Arrange
var expected = new QueueItem(1);
var sut = new AsyncQueueWithCursor<QueueItem>(10);
await sut.Enqueue(expected, CancellationToken.None);
//Act
var actual = sut.TryPeek(out var result);
//Assert
actual.Should().BeTrue();
result.Should().NotBeNull();
result!.Should().Be(expected);
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Dequeue_GivenQueueHasItem_ShouldRemoveItemFromQueue()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(10);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Act
sut.Dequeue();
var actual = sut.TryPeek(out _);
//Assert
actual.Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Dequeue_GivenQueueIsEmpty_ShouldThrowException()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(10);
//Act
var act = () => sut.Dequeue();
//Assert
act.Should().Throw<InvalidOperationException>();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Enqueue_GivenQueueIsEmpty_ShouldAddItem()
{
//Arrange
var expected = new QueueItem(1);
var sut = new AsyncQueueWithCursor<QueueItem>(10);
//Act
await sut.Enqueue(expected, CancellationToken.None);
var actual = sut.TryPeek(out var result);
//Assert
actual.Should().BeTrue();
result.Should().NotBeNull();
result!.Should().Be(expected);
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Enqueue_GivenQueueIsFull_ShouldBlockInsertion()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Act
var actual = sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Assert
actual.IsCompleted.Should().BeFalse();
actual.IsFaulted.Should().BeFalse();
actual.IsCanceled.Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Dequeue_GivenQueueIsFullAndEnqueuePending_ShouldDequeueAndEnqueue()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
var cts = new CancellationTokenSource();
cts.CancelAfter(100);
var pendingEnqueue = sut.Enqueue(Substitute.For<IDisposable>(), cts.Token);
//Act
sut.Dequeue();
await pendingEnqueue;
//Assert
pendingEnqueue.IsCompleted.Should().BeTrue();
pendingEnqueue.IsFaulted.Should().BeFalse();
pendingEnqueue.IsCanceled.Should().BeFalse();
//Annihilate
cts.Dispose();
await sut.DisposeAsync();
}
[Fact]
public async Task Enqueue_GivenQueueIsFullAndTokenCanceled_ShouldCancelTask()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
var cts = new CancellationTokenSource();
//Act
var pendingEnqueue = sut.Enqueue(Substitute.For<IDisposable>(), cts.Token).AsTask();
cts.Cancel();
var exception = await Record.ExceptionAsync(() => pendingEnqueue);
//Assert
exception.Should().BeOfType<TaskCanceledException>();
pendingEnqueue.IsCompleted.Should().BeTrue();
pendingEnqueue.IsFaulted.Should().BeFalse();
pendingEnqueue.IsCanceled.Should().BeTrue();
//Annihilate
cts.Dispose();
await sut.DisposeAsync();
}
[Fact]
public async Task NextItem_GivenQueueHasItems_ShouldReturnItems()
{
//Arrange
var expected1 = new QueueItem(1);
var expected2 = new QueueItem(2);
var sut = new AsyncQueueWithCursor<QueueItem>(2);
await sut.Enqueue(expected1, CancellationToken.None);
await sut.Enqueue(expected2, CancellationToken.None);
//Act
var actual1 = await sut.NextItem(CancellationToken.None);
var actual2 = await sut.NextItem(CancellationToken.None);
//Assert
actual1.Should().Be(expected1);
actual2.Should().Be(expected2);
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task NextItem_GivenQueueIfFull_ShouldBlock()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
await sut.NextItem(CancellationToken.None);
//Act
var actual = sut.NextItem(CancellationToken.None);
//Assert
actual.IsCompleted.Should().BeFalse();
actual.IsFaulted.Should().BeFalse();
actual.IsCanceled.Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task NextItem_GivenItemInserted_ShouldReturnInsertedItem()
{
//Arrange
var expected = new QueueItem(1);
var sut = new AsyncQueueWithCursor<QueueItem>(1);
//Act
var task = sut.NextItem(CancellationToken.None);
await sut.Enqueue(expected, CancellationToken.None);
var actual = await task;
//Assert
actual.Should().Be(expected);
task.IsCompleted.Should().BeTrue();
task.IsFaulted.Should().BeFalse();
task.IsCanceled.Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task NextItem_WhenCanceled_ShouldThrowException()
{
//Arrange
var uut = new AsyncQueueWithCursor<QueueItem>(1);
var cts = new CancellationTokenSource();
//Act
var task = uut.NextItem(cts.Token);
cts.Cancel();
var exception = await Record.ExceptionAsync(() => task.AsTask());
//Assert
exception.Should().BeOfType<TaskCanceledException>();
}
[Fact]
public async Task ResetCursor_GivenCursorNotAtFirstElement_ShouldMoveToFirst()
{
//Arrange
var expected1 = new QueueItem(1);
var expected2 = new QueueItem(2);
var sut = new AsyncQueueWithCursor<QueueItem>(2);
await sut.Enqueue(expected1, CancellationToken.None);
await sut.Enqueue(expected2, CancellationToken.None);
await sut.NextItem(CancellationToken.None);
//Act
var before = await sut.NextItem(CancellationToken.None);
sut.ResetCursor();
var after = await sut.NextItem(CancellationToken.None);
//Assert
before.Should().Be(expected2);
after.Should().Be(expected1);
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task RemoveCurrentItem_GivenQueueEmpty_ShouldThrow()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(2);
//Act
var exception = Record.Exception(() => sut.RemoveCurrentItem());
//Assert
exception.Should().BeOfType<AsyncQueueWithCursorNoItemException>();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task RemoveCurrentItem_GivenQueueWithItemButCursorNotMoved_ShouldThrow()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(2);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Act
var exception = Record.Exception(() => sut.RemoveCurrentItem());
//Assert
exception.Should().BeOfType<AsyncQueueWithCursorNoItemException>();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task RemoveCurrentItem_GivenQueueWithCursorOnItem_ShouldRemoveItem()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(2);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
await sut.NextItem(CancellationToken.None);
//Act
sut.RemoveCurrentItem();
//Assert
sut.TryPeek(out _).Should().BeFalse();
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task RemoveCurrentItem_GivenQueueWithMultipleItems_ShouldMoveCursor()
{
//Arrange
var expected = new QueueItem(3);
var sut = new AsyncQueueWithCursor<QueueItem>(3);
await sut.Enqueue(new QueueItem(1), CancellationToken.None);
await sut.Enqueue(new QueueItem(2), CancellationToken.None);
await sut.Enqueue(expected, CancellationToken.None);
await sut.NextItem(CancellationToken.None);
await sut.NextItem(CancellationToken.None);
//Act
sut.RemoveCurrentItem();
var result = await sut.NextItem(CancellationToken.None);
//Assert
result.Should().Be(expected);
//Annihilate
await sut.DisposeAsync();
}
[Fact]
public async Task Dispose_GivenEnqueueWaiting_ShouldCancelTask()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
var enqueueTask = sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Act
await sut.DisposeAsync();
var act = async () => await enqueueTask;
//Assert
await act.Should().ThrowAsync<TaskCanceledException>();
enqueueTask.IsCompleted.Should().BeTrue();
enqueueTask.IsCanceled.Should().BeTrue();
}
[Fact]
public async Task Enqueue_GivenDisposed_ShouldThrow()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.DisposeAsync();
//Act
var act = async () => await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
//Assert
await act.Should().ThrowAsync<AsyncLockDisposedException>();
}
[Fact]
public async Task Dispose_GivenNextItemAwaiting_ShouldCancelTask()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(2);
var nextItemTask = sut.NextItem(CancellationToken.None);
//Act
await sut.DisposeAsync();
var act = async () => await nextItemTask;
//Assert
await act.Should().ThrowAsync<TaskCanceledException>();
nextItemTask.IsCompleted.Should().BeTrue();
nextItemTask.IsCanceled.Should().BeTrue();
}
[Fact]
public async Task Dispose_GivenQueueEmptyAwaiting_ShouldCancelTask()
{
//Arrange
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(Substitute.For<IDisposable>(), CancellationToken.None);
var waitForEmptyTask = sut.WaitForEmpty(CancellationToken.None);
//Act
await sut.DisposeAsync();
var act = async () => await waitForEmptyTask;
//Assert
await act.Should().ThrowAsync<TaskCanceledException>();
waitForEmptyTask.IsCompleted.Should().BeTrue();
waitForEmptyTask.IsCanceled.Should().BeTrue();
}
[Fact]
public async Task Dispose_GivenQueuedItem_ShouldDisposeItem()
{
//Arrange
var item = Substitute.For<IDisposable>();
var sut = new AsyncQueueWithCursor<IDisposable>(1);
await sut.Enqueue(item, CancellationToken.None);
//Act
await sut.DisposeAsync();
//Assert
item.Received().Dispose();
}
private class QueueItem : IDisposable, IEquatable<QueueItem>
{
private int Number { get; }
public QueueItem(int number) => Number = number;
public void Dispose() { }
public bool Equals(QueueItem? other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Number == other.Number;
}
public override bool Equals(object? obj) => obj is QueueItem qi && Equals(qi);
public override int GetHashCode() => Number;
}
}