blob: e244cab78fe9bdd2064bd2468f5fdb6f36ef39e2 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace DotPulsar.IntegrationTests;
using Abstraction;
using Abstractions;
using Extensions;
using Fixtures;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
[Collection(nameof(StandaloneTokenClusterTests))]
public class TokenRefreshTests
{
public enum TokenTestRefreshType
{
Standard,
FailAtStartup,
FailOnRefresh,
TimeoutOnRefresh
}
private const string MyTopic = "persistent://public/default/mytopic";
private readonly ITestOutputHelper _testOutputHelper;
private readonly IPulsarService _pulsarService;
public TokenRefreshTests(ITestOutputHelper outputHelper, StandaloneTokenClusterFixture fixture)
{
_testOutputHelper = outputHelper;
Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
_pulsarService = fixture.PulsarService;
}
[InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with no token refresh failures
[InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at startup, not on refresh
[InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh which will force a reconnection and fail once more on new connection
[InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will be disconnected by server due to slow response to auth challenge
[Theory]
public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int timesToFail)
{
var publishingStarted = false;
var delayedNames = new HashSet<string>();
ValueTask<string> GetToken(string name, ref int count)
{
if (refreshType is TokenTestRefreshType.Standard)
{
return GetAuthToken(name);
}
if (refreshType is TokenTestRefreshType.FailAtStartup && !publishingStarted && ++count <= timesToFail)
{
return ValueTask.FromException<string>(new Exception("Initial Token Failed"));
}
if (refreshType is TokenTestRefreshType.FailOnRefresh && publishingStarted && ++count <= timesToFail)
{
return ValueTask.FromException<string>(count == 1 ? new Exception("Refresh Failed") : new Exception("Initial Token Failed"));
}
if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && publishingStarted && !delayedNames.Contains(name))
{
delayedNames.Add(name);
Task.Delay(6000);
}
return GetAuthToken(name);
}
var producerTokenCount = 0;
await using var producerClient = GetPulsarClient("Producer", (ct) => GetToken("Producer", ref producerTokenCount));
var consumerTokenCount = 0;
await using var consumerClient = GetPulsarClient("Consumer", (ct) => GetToken("Consumer", ref consumerTokenCount));
await using var producer = producerClient.NewProducer(Schema.String)
.Topic(MyTopic)
.StateChangedHandler(Monitor)
.Create();
await using var consumer = consumerClient.NewConsumer(Schema.String)
.Topic(MyTopic)
.StateChangedHandler(Monitor)
.SubscriptionName("test-sub")
.InitialPosition(SubscriptionInitialPosition.Earliest)
.Create();
const int messageCount = 20;
var received = new List<string>(messageCount);
var publisherTask = Task.Run(async () =>
{
for (var i = 0; i < messageCount; i++)
{
_testOutputHelper.WriteLine("Trying to publish message for index {0}", i);
var messageId = await producer.Send(i.ToString());
publishingStarted = true;
_testOutputHelper.WriteLine("Published message {0} for index {1}", messageId, i);
await Task.Delay(1000);
}
});
var consumerTask = Task.Run(async () =>
{
for (var i = 0; i < messageCount; i++)
{
var message = await consumer.Receive();
received.Add(message.Value());
}
});
var timeoutTask = Task.Delay(60_000);
await Task.WhenAny(Task.WhenAll(consumerTask, publisherTask), timeoutTask);
Assert.False(timeoutTask.IsCompleted);
var expected = Enumerable.Range(0, messageCount).Select(i => i.ToString()).ToList();
var missing = expected.Except(received).ToList();
if (missing.Count > 0)
{
Assert.True(false, $"Missing values: {string.Join(",", missing)}");
}
}
private IPulsarClient GetPulsarClient(string name, Func<CancellationToken, ValueTask<string>> tokenFactory)
=> PulsarClient.Builder()
.Authentication(AuthenticationFactory.Token(tokenFactory))
.RetryInterval(TimeSpan.FromSeconds(1))
.ExceptionHandler(ec =>
{
_testOutputHelper.WriteLine("Error (handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, ec.Exception);
})
.ServiceUrl(_pulsarService.GetBrokerUri()).Build();
private async ValueTask<string> GetAuthToken(string name)
{
var result = await StandaloneTokenClusterFixture.GetAuthToken(true);
_testOutputHelper.WriteLine("{0} received token {1}", name, result);
return result;
}
private void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ProducerState switch
{
ProducerState.Connected => "is connected",
ProducerState.Disconnected => "is disconnected",
ProducerState.PartiallyConnected => "is partially connected",
ProducerState.Closed => "has closed",
ProducerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ProducerState}'"
};
var topic = stateChanged.Producer.Topic;
_testOutputHelper.WriteLine($"The producer for topic '{topic}' " + stateMessage);
}
private void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ConsumerState switch
{
ConsumerState.Active => "is active",
ConsumerState.Inactive => "is inactive",
ConsumerState.Disconnected => "is disconnected",
ConsumerState.Closed => "has closed",
ConsumerState.ReachedEndOfTopic => "has reached end of topic",
ConsumerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ConsumerState}'"
};
var topic = stateChanged.Consumer.Topic;
_testOutputHelper.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
}
}