blob: 77b924a46897b815a3df417d4cb54504fbc4bee2 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.Tests;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public sealed class PulsarClientTests : IDisposable
{
private readonly CancellationTokenSource _cts;
private readonly IntegrationFixture _fixture;
private readonly ITestOutputHelper _testOutputHelper;
public PulsarClientTests(IntegrationFixture fixture, ITestOutputHelper outputHelper)
{
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
_fixture = fixture;
_testOutputHelper = outputHelper;
}
[Fact]
public async Task TokenSupplier_WhenTokenSupplierInitiallyThrowsAnException_ShouldFaultProducer()
{
// Arrange
await using var client = CreateClient(ct => throw new Exception());
await using var producer = await CreateProducer(client);
// Act
var exception = await Record.ExceptionAsync(() => producer.Send("Test", _cts.Token).AsTask());
var state = await producer.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
state.Should().Be(ProducerState.Faulted);
}
[Fact]
public async Task TokenSupplier_WhenTokenSupplierThrowsAnExceptionOnAuthChallenge_ShouldFaultProducer()
{
// Arrange
var throwException = false;
await using var client = CreateClient(async ct =>
{
if (throwException)
throw new Exception();
var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10), _cts.Token);
_testOutputHelper.Log($"Received token: '{token}'");
return token;
});
await using var producer = await CreateProducer(client);
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a working connection
throwException = true;
var state = await producer.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
state.Should().Be(ProducerState.Faulted);
}
[Fact]
public async Task TokenSupplier_WhenTokenSupplierReturnsToLate_ShouldFaultProducer()
{
// Arrange
await using var client = CreateClient(async ct =>
{
await Task.Delay(TimeSpan.FromSeconds(10), ct);
return string.Empty;
});
await using var producer = await CreateProducer(client);
// Act
var exception = await Record.ExceptionAsync(() => producer.Send("Test", _cts.Token).AsTask());
var state = await producer.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
exception.InnerException.Should().BeOfType<AuthenticationException>();
state.Should().Be(ProducerState.Faulted);
}
[Fact]
public async Task TokenSupplier_WhenTokenSupplierReturnValidToken_ShouldStayConnected()
{
// Arrange
var refreshCount = 0;
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var client = CreateClient(async ct =>
{
++refreshCount;
if (refreshCount == 3)
tcs.SetResult();
var token = await _fixture.CreateToken(TimeSpan.FromSeconds(10), _cts.Token);
_testOutputHelper.Log($"Received token: '{token}'");
return token;
});
await using var producer = await CreateProducer(client);
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a working connection
await tcs.Task;
var state = await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
// Assert
state.Should().Be(ProducerState.Connected);
}
private IPulsarClient CreateClient(Func<CancellationToken, ValueTask<string>> tokenSupplier)
=> PulsarClient
.Builder()
.Authentication(AuthenticationFactory.Token(tokenSupplier))
.ExceptionHandler(_testOutputHelper.Log)
.ServiceUrl(_fixture.ServiceUrl)
.Build();
private async Task<IProducer<string>> CreateProducer(IPulsarClient client)
=> client
.NewProducer(Schema.String)
.Topic(await _fixture.CreateTopic(_cts.Token))
.StateChangedHandler(_testOutputHelper.Log)
.Create();
public void Dispose() => _cts.Dispose();
}