blob: 274e60011b7aed51149ade50df41778c8dd5ef42 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public sealed class ConsumerTests : IDisposable
{
private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper testOutputHelper)
{
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = testOutputHelper;
}
[Fact]
public async Task GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 6;
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected.Add(messageId);
}
}
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
}
[Fact]
public async Task GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
{
//Arrange
const int numberOfMessages = 6;
const int partitions = 3;
var topicName = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 3)
{
expected.Add(messageId);
}
}
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
}
[Fact]
public async Task GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
var expected = new List<MessageId> { MessageId.Earliest };
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.Should().BeEquivalentTo(expected);
}
[Fact]
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 1000;
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
//Act
var produced = await ProduceMessages(producer, numberOfMessages, "test-message", _cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages, _cts.Token);
//Assert
consumed.Should().BeEquivalentTo(produced);
}
[Fact]
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
const int numberOfMessages = 1000;
const int partitions = 3;
var topicName = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
//Act
var produced = await ProduceMessages(producer, numberOfMessages, "test-message", _cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages, _cts.Token);
//Assert
consumed.Should().BeEquivalentTo(produced);
}
[Fact]
public async Task Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
await using var client = PulsarClient.Builder().ExceptionHandler(context =>
{
semaphoreSlim.WaitAsync();
context.Result = FaultAction.Rethrow;
context.ExceptionHandled = true;
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
var receiveTask = consumer.Receive(_cts.Token);
semaphoreSlim.Release();
//Act
var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
await using var client = PulsarClient.Builder().ExceptionHandler(context =>
{
context.Result = FaultAction.Rethrow;
context.ExceptionHandled = true;
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
await consumer.State.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
//Act
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
var receiveTask = consumer.Receive(_cts.Token);
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
//Act
var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.Should().BeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
var connectionDown = await _fixture.DisableThePulsarConnection();
await using var consumer = CreateConsumer(client, topicName);
//Act
await connectionDown.DisposeAsync();
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
{
//Arrange
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
//Act
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
{
//Arrange
await using var client = CreateClient();
var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await using var consumer = CreateConsumer(client, topicName);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
}
private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<string> producer, int numberOfMessages, string content, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
for (var i = 0; i < numberOfMessages; ++i)
{
messageIds[i] = await producer.Send(content, ct);
}
return messageIds;
}
private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer<string> consumer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
await foreach (var message in consumer.Messages(ct))
{
messageIds.Add(message.MessageId);
if (messageIds.Count != numberOfMessages)
continue;
await consumer.AcknowledgeCumulative(message, ct);
break;
}
return messageIds;
}
private static string CreateSubscriptionName() => $"subscription-{Guid.NewGuid():N}";
private IProducer<string> CreateProducer(IPulsarClient pulsarClient, string topicName)
=> pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, string topicName)
=> pulsarClient.NewConsumer(Schema.String)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
.Authentication(_fixture.Authentication)
.ExceptionHandler(_testOutputHelper.Log)
.ServiceUrl(_fixture.ServiceUrl)
.Build();
public void Dispose() => _cts.Dispose();
}