blob: b617d5cf5f8f300a4c13497886e680cecac44d7c [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests.Internal;
using Avro.Generic;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public sealed class ConsumerTests : IDisposable
{
private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper testOutputHelper)
{
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = testOutputHelper;
}
[Fact]
public async Task GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 6;
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 5)
{
expected.Add(messageId);
}
}
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.ShouldBe(expected);
}
[Fact]
public async Task GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
{
//Arrange
const int numberOfMessages = 6;
const int partitions = 3;
var topicName = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
var expected = new List<MessageId>();
for (var i = 0; i < numberOfMessages; i++)
{
var messageId = await producer.Send("test-message", _cts.Token);
if (i >= 3)
{
expected.Add(messageId);
}
}
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.ShouldBeEquivalentTo(expected);
}
[Fact]
public async Task GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
{
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
var expected = new List<MessageId> { MessageId.Earliest };
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
//Assert
actual.ShouldBe(expected);
}
[Fact]
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 1000;
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
//Act
var produced = await ProduceMessages(producer, numberOfMessages, "test-message", _cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages, _cts.Token);
//Assert
consumed.ShouldBe(produced);
}
[Fact]
public async Task Receive_GivenPartitionedTopic_ShouldReceiveAll()
{
//Arrange
const int numberOfMessages = 1000;
const int partitions = 3;
var topicName = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
//Act
var produced = await ProduceMessages(producer, numberOfMessages, "test-message", _cts.Token);
var consumed = await ConsumeMessages(consumer, numberOfMessages, _cts.Token);
//Assert
consumed.ShouldBe(produced, true);
}
[Fact]
public async Task Receive_GivenMultipleTopics_ShouldReceiveAll()
{
//Arrange
const int numberOfMessages = 100;
const int partitions = 3;
var topic = await _fixture.CreateTopic(_cts.Token);
var partitionedTopic = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, [topic, partitionedTopic]);
await using var producer = CreateProducer(client, topic);
await using var partitionedProducer = CreateProducer(client, partitionedTopic);
//Act
var produced = new List<MessageId>();
produced.AddRange(await ProduceMessages(producer, numberOfMessages, "test-message", _cts.Token));
produced.AddRange(await ProduceMessages(partitionedProducer, numberOfMessages, "test-message", _cts.Token));
var consumed = await ConsumeMessages(consumer, produced.Count, _cts.Token);
//Assert
consumed.ShouldBe(produced, true);
}
[Fact]
public async Task Receive_GivenTopicsPattern_ShouldReceiveAll()
{
//Arrange
var match1 = $"persistent://public/default/match-{Guid.NewGuid():N}";
var match2 = $"persistent://public/default/match-{Guid.NewGuid():N}";
var nomatch1 = $"non-persistent://public/default/match-{Guid.NewGuid():N}";
const string nomatch2 = "persistent://public/default/nomatch";
await _fixture.CreateTopics([match1, match2, nomatch1, nomatch2], _cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, new Regex(@"persistent://public/default/match.*"));
await using var producer1 = CreateProducer(client, match1);
await using var producer2 = CreateProducer(client, match2);
await using var producer3 = CreateProducer(client, nomatch1);
await using var producer4 = CreateProducer(client, nomatch2);
//Act
var produced = new List<MessageId>();
produced.AddRange(await ProduceMessages(producer1, 10, "test message", _cts.Token));
produced.AddRange(await ProduceMessages(producer2, 10, "test message", _cts.Token));
_ = await ProduceMessages(producer3, 10, "test message", _cts.Token);
_ = await ProduceMessages(producer4, 10, "test message", _cts.Token);
var consumed = await ConsumeMessages(consumer, produced.Count, _cts.Token);
//Assert
consumed.ShouldBe(produced, true);
}
[Fact]
public async Task Receive_GivenTopicsPatternWithNoMatches_ShouldFaultConsumer()
{
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, new Regex(@"persistent://public/default/nosuchtopics.*"));
//Act
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.ShouldBeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Receive_GivenInvalidTopicsPattern_ShouldFaultConsumer()
{
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, new Regex(@"invalid://public/default/match.*"));
//Act
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.ShouldBeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
await using var client = PulsarClient.Builder().ExceptionHandler(context =>
{
semaphoreSlim.WaitAsync();
context.Result = FaultAction.Rethrow;
context.ExceptionHandled = true;
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
var receiveTask = consumer.Receive(_cts.Token);
semaphoreSlim.Release();
//Act
var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.ShouldBeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
await using var client = PulsarClient.Builder().ExceptionHandler(context =>
{
context.Result = FaultAction.Rethrow;
context.ExceptionHandled = true;
})
.ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
await consumer.State.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
//Act
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.ShouldBeOfType<ConsumerFaultedException>();
}
[Fact]
public async Task Receive_WhenReceivingFromTopicWithSchemaAndReceiverHasWrongAvroISpecificRecordSchema_ShouldThrowException()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName, pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName, Schema.String);
await using var producer = CreateProducer(client, topicName, pulsarSchema);
await producer.Send(new ValidModel(), _cts.Token);
//Act
var exception = await Record.ExceptionAsync(consumer.Receive().AsTask);
//Assert
exception.ShouldBeOfType<IncompatibleSchemaException>();
}
[Fact]
public async Task Receive_WhenReceivingFromTopicWithSchemaAndReceiverHasRightAvroISpecificRecordSchema_ShouldBeAbleToRecieve()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName, pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName, pulsarSchema);
await using var producer = CreateProducer(client, topicName, pulsarSchema);
var expected = new ValidModel();
await producer.Send(expected, _cts.Token);
//Act
var message = await consumer.Receive(_cts.Token);
var actual = message.Value();
//Assert
actual.Name.ShouldBe(expected.Name);
actual.Surname.ShouldBe(expected.Surname);
actual.Age.ShouldBe(expected.Age);
}
[Fact]
public async Task Receive_WhenReceivingFromTopicWithSchemaAndReceiverHasRightAvroGenericRecordSchema_ShouldBeAbleToRecieve()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
var pulsarISpecificSchema = Schema.AvroISpecificRecord<ValidModel>();
var pulsarGenericRecordSchema = Schema.AvroGenericRecord<GenericRecord>(ValidModel._SCHEMA.ToString());
await _fixture.AddSchemaToExistingTopic(topicName, pulsarISpecificSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName, pulsarGenericRecordSchema);
await using var producer = CreateProducer(client, topicName, pulsarISpecificSchema);
var expected = new ValidModel
{
Name = "Shukri",
Surname = "Klinaku",
Age = 57
};
await producer.Send(expected, _cts.Token);
//Act
var message = await consumer.Receive(_cts.Token);
var actual = message.Value();
//Assert
actual["Name"].ShouldBe(expected.Name);
actual["Surname"].ShouldBe(expected.Surname);
actual["Age"].ShouldBe(expected.Age);
}
[Fact]
public async Task Subscription_WhenSubscribingToAnExistingTopicWithNoSchema_ShouldNotSubscribeWithSchemaTypeNone()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName, Schema.ByteSequence);
await producer.Send(Encoding.UTF8.GetBytes("Test"), _cts.Token);
await using var consumer = CreateConsumer(client, topicName, Schema.ByteSequence);
var receiveTask = consumer.Receive(_cts.Token);
//Act
var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
var receiveTask = consumer.Receive(_cts.Token);
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
//Act
var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeAbleToReceive()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
var connectionDown = await _fixture.DisableThePulsarConnection();
await using var consumer = CreateConsumer(client, topicName);
//Act
await connectionDown.DisposeAsync();
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispose()
{
//Arrange
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
//Act
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToDispose()
{
//Arrange
await using var client = CreateClient();
var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBeAbleToReceive()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
await using var consumer = CreateConsumer(client, topicName);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}
await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.ShouldBeNull();
}
[Fact]
public async Task TryReceive_WhenBufferIsEmpty_ShouldNotIncreasePermits()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
var subscription = CreateSubscriptionName();
var maxPrefetch = 2;
using var httpClient = CreateAdminClient();
await using var pulsarClient = CreateClient();
await using var consumer = CreateConsumer(pulsarClient, topicName, subscription, Schema.ByteSequence, (uint) maxPrefetch);
await using var producer = CreateProducer(pulsarClient, topicName, Schema.ByteSequence);
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
// Wait until we get our first message
await producer.Send([1], _cts.Token);
var message = await consumer.Receive(_cts.Token);
await consumer.Acknowledge(message, _cts.Token);
//Act
var maxPermits = 0L;
for (int i = 0; i < maxPrefetch * 5; i++)
{
consumer.TryReceive(out _).ShouldBe(false);
await Task.Delay(50, _cts.Token);
var permits = await GetPermits(httpClient, topicName, subscription, _cts.Token);
maxPermits = Math.Max(maxPermits, permits);
}
//Assert
Assert.True(maxPermits <= maxPrefetch, $"availablePermits increased above the threshold of {maxPrefetch} to {maxPermits}");
}
private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<string> producer, int numberOfMessages, string content, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
for (var i = 0; i < numberOfMessages; ++i)
{
messageIds[i] = await producer.Send(content, ct);
}
return messageIds;
}
private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer<string> consumer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
await foreach (var message in consumer.Messages(ct))
{
messageIds.Add(message.MessageId);
if (messageIds.Count != numberOfMessages)
continue;
await consumer.AcknowledgeCumulative(message, ct);
break;
}
return messageIds;
}
private static string CreateSubscriptionName() => $"subscription-{Guid.NewGuid():N}";
private IProducer<T> CreateProducer<T>(
IPulsarClient pulsarClient,
string topicName,
ISchema<T> schema)
=> pulsarClient.NewProducer(schema)
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IProducer<string> CreateProducer(
IPulsarClient pulsarClient,
string topicName) => CreateProducer(pulsarClient, topicName, Schema.String);
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, string topicName)
=> CreateConsumer(pulsarClient, topicName, Schema.String);
private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string topicName, ISchema<T> schema)
=> pulsarClient.NewConsumer(schema)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, IEnumerable<string> topics)
=> pulsarClient.NewConsumer(Schema.String)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
.Topics(topics)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, Regex topicsPattern)
=> pulsarClient.NewConsumer(Schema.String)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(CreateSubscriptionName())
.TopicsPattern(topicsPattern)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string topicName, string subscription, ISchema<T> schema, uint maxPrefetch)
=> pulsarClient.NewConsumer(schema)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName(subscription)
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.MessagePrefetchCount(maxPrefetch)
.Create();
private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
.Authentication(_fixture.Authentication)
.ExceptionHandler(_testOutputHelper.Log)
.ServiceUrl(_fixture.ServiceUrl)
.Build();
private HttpClient CreateAdminClient() => new()
{
BaseAddress = _fixture.AdminUrl,
DefaultRequestHeaders =
{
Authorization = _fixture.AuthorizationHeader
}
};
private static async ValueTask<long> GetPermits(HttpClient httpClient, string topic, string subscription, CancellationToken cancellationToken)
{
topic = topic.Replace("persistent://", string.Empty);
using var response = await httpClient.GetAsync($"/admin/v2/persistent/{topic}/stats", cancellationToken).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
var json = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken).ConfigureAwait(false);
if (!json.RootElement.TryGetProperty("subscriptions", out var subscriptionsProperty))
{
return 0;
}
if (!subscriptionsProperty.TryGetProperty(subscription, out var subscriptionProperty))
{
return 0;
}
if (subscriptionProperty.TryGetProperty("consumers", out var consumersProperty))
{
foreach (var consumer in consumersProperty.EnumerateArray())
{
if (consumer.TryGetProperty("availablePermits", out var permitsProperty))
{
var permits = permitsProperty.GetInt64();
if (permits > 0)
{
return permits;
}
}
}
}
}
return 0;
}
public void Dispose() => _cts.Dispose();
}