Add token factory support, respond to server auth challenge on token refresh (#95)
* get pulsar manager running in docker compose
* finally have a test env
* extract separate test for token refresh
* rollback change
* add more test cases
* fix docker issues
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 7ef079e..ea2ab5b 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,6 +16,7 @@
using System;
using System.Security.Cryptography.X509Certificates;
+using System.Threading.Tasks;
/// <summary>
/// A pulsar client building abstraction.
@@ -33,6 +34,11 @@
IPulsarClientBuilder AuthenticateUsingToken(string token);
/// <summary>
+ /// Authenticate using a (JSON Web) token factory. This is optional.
+ /// </summary>
+ IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory);
+
+ /// <summary>
/// Set connection encryption policy. The default is 'EnforceUnencrypted' if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's 'pulsar+ssl'.
/// </summary>
IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 1ee72e3..60c6433 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -20,6 +20,8 @@
using PulsarApi;
using System;
using System.Buffers;
+using System.Diagnostics;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -29,14 +31,16 @@
private readonly ChannelManager _channelManager;
private readonly PingPongHandler _pingPongHandler;
private readonly IPulsarStream _stream;
+ private readonly Func<Task<string>>? _accessTokenFactory;
private int _isDisposed;
- public Connection(IPulsarStream stream, TimeSpan keepAliveInterval)
+ public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, Func<Task<string>>? accessTokenFactory)
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
_pingPongHandler = new PingPongHandler(this, keepAliveInterval);
_stream = stream;
+ _accessTokenFactory = accessTokenFactory;
}
public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
@@ -81,6 +85,9 @@
return await responseTask.ConfigureAwait(false);
}
+ private Task Send(CommandAuthResponse authResponse, CancellationToken cancellationToken)
+ => Send(authResponse.AsBaseCommand(), cancellationToken);
+
public Task Send(CommandPing command, CancellationToken cancellationToken)
=> Send(command.AsBaseCommand(), cancellationToken);
@@ -268,13 +275,13 @@
}
}
- public async Task ProcessIncommingFrames()
+ public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
{
await Task.Yield();
try
{
- await foreach (var frame in _stream.Frames())
+ await foreach (var frame in _stream.Frames(cancellationToken))
{
var commandSize = frame.ReadUInt32(0, true);
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
@@ -285,7 +292,18 @@
if (command.CommandType == BaseCommand.Type.Message)
_channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
else
- _channelManager.Incoming(command);
+ {
+ if (_accessTokenFactory != null && command.CommandType == BaseCommand.Type.AuthChallenge)
+ {
+ var token = await _accessTokenFactory.GetToken();
+ await Send(new CommandAuthResponse { Response = new AuthData { Data = Encoding.UTF8.GetBytes(token), AuthMethodName = "token" } }, cancellationToken);
+ DotPulsarEventSource.Log.TokenRefreshed();
+ }
+ else
+ {
+ _channelManager.Incoming(command);
+ }
+ }
}
}
catch
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 14ab85f..9e5dc2f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -21,6 +21,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -36,6 +37,8 @@
private readonly Task _closeInactiveConnections;
private readonly string? _listenerName;
private readonly TimeSpan _keepAliveInterval;
+ private readonly IExecute _executor;
+ private readonly Func<Task<string>>? _accessTokenFactory;
public ConnectionPool(
CommandConnect commandConnect,
@@ -44,7 +47,9 @@
EncryptionPolicy encryptionPolicy,
TimeSpan closeInactiveConnectionsInterval,
string? listenerName,
- TimeSpan keepAliveInterval)
+ TimeSpan keepAliveInterval,
+ IExecute executor,
+ Func<Task<string>>? accessTokenFactory)
{
_lock = new AsyncLock();
_commandConnect = commandConnect;
@@ -56,6 +61,8 @@
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
_keepAliveInterval = keepAliveInterval;
+ _executor = executor;
+ _accessTokenFactory = accessTokenFactory;
}
public async ValueTask DisposeAsync()
@@ -105,6 +112,7 @@
if (response.LookupTopicResponse.ProxyThroughServiceUrl)
{
+ await connection.DisposeAsync();
var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
return await GetConnection(url, cancellationToken).ConfigureAwait(false);
}
@@ -157,14 +165,21 @@
private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
{
var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
- var connection = new Connection(new PulsarStream(stream), _keepAliveInterval);
+ var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _accessTokenFactory);
DotPulsarEventSource.Log.ConnectionCreated();
_connections[url] = connection;
- _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(url));
+ _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(url));
var commandConnect = _commandConnect;
+ if (_accessTokenFactory != null)
+ {
+ var token = await _accessTokenFactory.GetToken(_executor);
+ commandConnect.AuthMethodName = "token";
+ commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
+ }
+
if (url.ProxyThroughServiceUrl)
- commandConnect = WithProxyToBroker(_commandConnect, url.Logical);
+ commandConnect = WithProxyToBroker(commandConnect, url.Logical);
var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
@@ -184,15 +199,16 @@
{
return new CommandConnect
{
- AuthData = commandConnect.AuthData,
- AuthMethod = commandConnect.AuthMethod,
- AuthMethodName = commandConnect.AuthMethodName,
+ AuthData = commandConnect.ShouldSerializeAuthData() ? commandConnect.AuthData : null,
+ AuthMethod = commandConnect.ShouldSerializeAuthMethod() ? commandConnect.AuthMethod : AuthMethod.AuthMethodNone,
+ AuthMethodName = commandConnect.ShouldSerializeAuthMethodName() ? commandConnect.AuthMethodName : null,
ClientVersion = commandConnect.ClientVersion,
- OriginalPrincipal = commandConnect.OriginalPrincipal,
+ OriginalPrincipal = commandConnect.ShouldSerializeOriginalPrincipal() ? commandConnect.OriginalPrincipal : null,
ProtocolVersion = commandConnect.ProtocolVersion,
- OriginalAuthData = commandConnect.OriginalAuthData,
- OriginalAuthMethod = commandConnect.OriginalAuthMethod,
- ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"
+ OriginalAuthData = commandConnect.ShouldSerializeOriginalAuthData() ? commandConnect.OriginalAuthData : null,
+ OriginalAuthMethod = commandConnect.ShouldSerializeOriginalAuthMethod() ? commandConnect.OriginalAuthMethod : null,
+ ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}",
+ FeatureFlags = commandConnect.FeatureFlags
};
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index a42eeef..019340b 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -62,6 +62,7 @@
SocketError.NetworkUnreachable => FaultAction.Rethrow,
_ => FaultAction.Retry
},
+ TokenFactoryFailedException => FaultAction.Retry,
_ => FaultAction.Rethrow
};
}
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 5547b07..aaccbc4 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -38,6 +38,10 @@
public void ReaderCreated() { }
public void ReaderDisposed() { }
+
+ public void TokenRefreshed() { }
+
+ public long TokenRefreshCount => 0;
}
#else
@@ -76,11 +80,15 @@
private PollingCounter? _currentReadersCounter;
private long _currentReaders;
+
+ private PollingCounter? _tokenRefreshCounter;
+ private long _totalTokenRefreshes;
#pragma warning restore IDE0052 // Remove unread private members
public static readonly DotPulsarEventSource Log = new();
public DotPulsarEventSource() : base("DotPulsar") { }
+ public long TokenRefreshCount => _totalTokenRefreshes;
public void ClientCreated()
{
@@ -137,6 +145,11 @@
Interlocked.Decrement(ref _currentReaders);
}
+ public void TokenRefreshed()
+ {
+ Interlocked.Increment(ref _totalTokenRefreshes);
+ }
+
protected override void OnEventCommand(EventCommandEventArgs command)
{
if (command.Command != EventCommand.Enable)
@@ -191,6 +204,11 @@
{
DisplayName = "Current number of readers"
};
+
+ _tokenRefreshCounter ??= new PollingCounter("total-token-refreshes", this, () => Volatile.Read(ref _totalTokenRefreshes))
+ {
+ DisplayName = "Number of times token was refreshed"
+ };
}
}
#endif
diff --git a/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
new file mode 100644
index 0000000..7dee344
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Exceptions;
+
+using System;
+
+public class TokenFactoryFailedException : Exception
+{
+ public TokenFactoryFailedException(Exception innerException) : base("Exception when trying to fetch token from token factory", innerException)
+ {
+ }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index a17e313..1853398 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -100,6 +100,13 @@
Ping = command
};
+ public static BaseCommand AsBaseCommand(this CommandAuthResponse command)
+ => new()
+ {
+ CommandType = BaseCommand.Type.AuthResponse,
+ AuthResponse = command
+ };
+
public static BaseCommand AsBaseCommand(this CommandPong command)
=> new()
{
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index fa06e04..ddd1a52 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -14,6 +14,7 @@
namespace DotPulsar.Internal;
+using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using PulsarApi;
@@ -21,6 +22,7 @@
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
+using System.Threading.Tasks;
public sealed class PulsarClientBuilder : IPulsarClientBuilder
{
@@ -36,6 +38,7 @@
private bool _verifyCertificateAuthority;
private bool _verifyCertificateName;
private TimeSpan _closeInactiveConnectionsInterval;
+ private Func<Task<string>>? _tokenFactory;
public PulsarClientBuilder()
{
@@ -69,6 +72,15 @@
return this;
}
+ public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory)
+ {
+ _tokenFactory = tokenFactory;
+ var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
+ featureFlags.SupportsAuthRefresh = true;
+ _commandConnect.FeatureFlags = featureFlags;
+ return this;
+ }
+
public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
{
_encryptionPolicy = encryptionPolicy;
@@ -152,13 +164,21 @@
else
throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
-
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
- var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval);
- var processManager = new ProcessManager(connectionPool);
+
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName,
+ _keepAliveInterval,
+ new Executor(Guid.Empty, new EmptyRegisterEvent(), exceptionHandlerPipeline),
+ _tokenFactory);
+ var processManager = new ProcessManager(connectionPool);
return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
}
}
+
+internal class EmptyRegisterEvent : IRegisterEvent
+{
+ public void Register(IEvent @event) { }
+}
diff --git a/src/DotPulsar/Internal/TokenFactoryExtensions.cs b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
new file mode 100644
index 0000000..3aa26bf
--- /dev/null
+++ b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
@@ -0,0 +1,26 @@
+namespace DotPulsar.Internal;
+
+using Abstractions;
+using Exceptions;
+using System;
+using System.Threading.Tasks;
+
+internal static class TokenFactoryExtensions
+{
+ public static async Task<string> GetToken(this Func<Task<string>> tokenFactory, IExecute executor)
+ {
+ return await executor.Execute(tokenFactory.GetToken);
+ }
+
+ public static async Task<string> GetToken(this Func<Task<string>> tokenFactory)
+ {
+ try
+ {
+ return await tokenFactory();
+ }
+ catch (Exception e)
+ {
+ throw new TokenFactoryFailedException(e);
+ }
+ }
+}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index c65fb53..c1997b6 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -28,6 +28,12 @@
<None Update="docker-compose-standalone-tests.yml">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
+ <None Update="docker-compose-standalone-token-tests.yml">
+ <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+ </None>
+ <None Update="appdata\my-secret.key">
+ <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+ </None>
</ItemGroup>
</Project>
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
index c3e8d9d..eeb3090 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
@@ -18,14 +18,22 @@
using Services;
using System.Threading.Tasks;
using Xunit;
+using Xunit.Abstractions;
public class StandaloneClusterFixture : IAsyncLifetime
{
+ private readonly IMessageSink _messageSink;
+
+ public StandaloneClusterFixture(IMessageSink messageSink)
+ {
+ _messageSink = messageSink;
+ }
+
public IPulsarService? PulsarService { private set; get; }
public async Task InitializeAsync()
{
- PulsarService = ServiceFactory.CreatePulsarService();
+ PulsarService = ServiceFactory.CreatePulsarService(_messageSink);
await PulsarService.InitializeAsync();
}
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
new file mode 100644
index 0000000..cb27b93
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
@@ -0,0 +1,6 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Xunit;
+
+[CollectionDefinition(nameof(StandaloneTokenClusterTest))]
+public class StandaloneTokenClusterTest : ICollectionFixture<TokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
new file mode 100644
index 0000000..3c57911
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
@@ -0,0 +1,88 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Abstraction;
+using Services;
+using System;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+public class TokenClusterFixture : PulsarServiceBase
+{
+ private readonly IMessageSink _messageSink;
+
+ public TokenClusterFixture(IMessageSink messageSink) : base(messageSink)
+ {
+ _messageSink = messageSink;
+ }
+
+ public IPulsarService PulsarService => this;
+
+ public override async Task InitializeAsync()
+ {
+ await TakeDownPulsar(); // clean-up if anything was left running from previous run
+
+ await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml up -d")
+ .ThrowOnFailure();
+
+ var waitTries = 10;
+
+ using var handler = new HttpClientHandler { AllowAutoRedirect = true };
+
+ using var client = new HttpClient(handler);
+
+ var token = await GetAuthToken(false);
+
+ client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+
+ while (waitTries > 0)
+ {
+ try
+ {
+ await client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
+ return;
+ }
+ catch(Exception e)
+ {
+ _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
+ waitTries--;
+ await Task.Delay(5000).ConfigureAwait(false);
+ }
+ }
+
+ throw new Exception("Unable to confirm Pulsar has initialized");
+ }
+
+ protected override async Task OnDispose()
+ => await TakeDownPulsar();
+
+ public override Uri GetBrokerUri() => new("pulsar://localhost:54547");
+
+ public override Uri GetWebServiceUri() => new("http://localhost:54548");
+
+ private Task TakeDownPulsar()
+ => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml down")
+ .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
+
+ public static async Task<string> GetAuthToken(bool includeExpiry)
+ {
+ var arguments = "exec pulsar-tokens bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
+
+ if (includeExpiry)
+ {
+ arguments += " --expiry-time 10s";
+ }
+
+ var result = await ProcessAsyncHelper.ExecuteShellCommand("docker",
+ arguments);
+
+ if (!result.Completed)
+ {
+ throw new InvalidOperationException($"Getting token from container failed{Environment.NewLine}{result.Output}");
+ }
+
+ return result.Output;
+ }
+}
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
new file mode 100644
index 0000000..56c00b8
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
@@ -0,0 +1,127 @@
+using System;
+using System.Diagnostics;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace DotPulsar.IntegrationTests;
+
+public static class ProcessAsyncHelper
+{
+ public static async Task ThrowOnFailure(this Task<ProcessResult> resultTask)
+ {
+ var result = await resultTask;
+
+ if (!result.Completed)
+ {
+ throw new InvalidOperationException($"Process did not complete correctly, {Environment.NewLine}{result.Output}");
+ }
+ }
+
+ public static async Task LogFailure(this Task<ProcessResult> resultTask, Action<string> logAction)
+ {
+ var result = await resultTask;
+
+ if (!result.Completed)
+ {
+ logAction(result.Output);
+ }
+ }
+
+ public static async Task<ProcessResult> ExecuteShellCommand(string command, string arguments)
+ {
+ var result = new ProcessResult();
+
+ using var process = new Process();
+
+ process.StartInfo.FileName = command;
+ process.StartInfo.Arguments = arguments;
+ process.StartInfo.UseShellExecute = false;
+ process.StartInfo.RedirectStandardOutput = true;
+ process.StartInfo.RedirectStandardError = true;
+ process.StartInfo.CreateNoWindow = true;
+
+ var outputBuilder = new StringBuilder();
+ var outputCloseEvent = new TaskCompletionSource<bool>();
+
+ process.OutputDataReceived += (s, e) =>
+ {
+ // The output stream has been closed i.e. the process has terminated
+ if (e.Data == null)
+ {
+ outputCloseEvent.SetResult(true);
+ }
+ else
+ {
+ outputBuilder.Append(e.Data);
+ }
+ };
+
+ var errorBuilder = new StringBuilder();
+ var errorCloseEvent = new TaskCompletionSource<bool>();
+
+ process.ErrorDataReceived += (s, e) =>
+ {
+ // The error stream has been closed i.e. the process has terminated
+ if (e.Data == null)
+ {
+ errorCloseEvent.SetResult(true);
+ }
+ else
+ {
+ errorBuilder.Append(e.Data);
+ }
+ };
+
+ bool isStarted;
+
+ try
+ {
+ isStarted = process.Start();
+ }
+ catch (Exception error)
+ {
+ // Usually it occurs when an executable file is not found or is not executable
+
+ result.Completed = true;
+ result.ExitCode = -1;
+ result.Output = error.Message;
+
+ isStarted = false;
+ }
+
+ if (isStarted)
+ {
+ // Reads the output stream first and then waits because deadlocks are possible
+ process.BeginOutputReadLine();
+ process.BeginErrorReadLine();
+
+ // Creates task to wait for process exit using timeout
+ var waitForExit = WaitForExitAsync(process);
+
+ // Create task to wait for process exit and closing all output streams
+ await Task.WhenAll(waitForExit, outputCloseEvent.Task, errorCloseEvent.Task);
+
+ result.Completed = waitForExit.Result;
+ result.ExitCode = process.ExitCode;
+ // Adds process output if it was completed with error
+ result.Output = process.ExitCode != 0 ? $"{outputBuilder}{errorBuilder}" : outputBuilder.ToString();
+ }
+
+ return result;
+ }
+
+
+ private static async Task<bool> WaitForExitAsync(Process process)
+ {
+ await process.WaitForExitAsync();
+ return process.ExitCode == 0;
+ }
+
+
+ public struct ProcessResult
+ {
+ public bool Completed;
+ public int? ExitCode;
+ public string Output;
+ }
+}
diff --git a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
index 7f642d8..c614a84 100644
--- a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
@@ -20,33 +20,45 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
-public class PulsarServiceBase : IPulsarService
+public abstract class PulsarServiceBase : IPulsarService
{
+ protected readonly IMessageSink MessageSink;
private readonly CancellationTokenSource _cts;
private readonly HttpClient _adminClient;
- protected PulsarServiceBase()
+ protected PulsarServiceBase(IMessageSink messageSink)
{
+ MessageSink = messageSink;
_cts = new CancellationTokenSource();
_adminClient = new HttpClient();
}
- public virtual Task InitializeAsync()
- => Task.CompletedTask;
+ public abstract Task InitializeAsync();
- public virtual Task DisposeAsync()
+ public async Task DisposeAsync()
{
_adminClient.Dispose();
_cts.Dispose();
- return Task.CompletedTask;
+
+ try
+ {
+ await OnDispose();
+ }
+ catch (Exception e)
+ {
+ MessageSink.OnMessage(new DiagnosticMessage("Error disposing: {0}", e));
+ }
}
- public virtual Uri GetBrokerUri()
- => throw new NotImplementedException();
+ protected virtual Task OnDispose()
+ => Task.CompletedTask;
- public virtual Uri GetWebServiceUri()
- => throw new NotImplementedException();
+ public abstract Uri GetBrokerUri();
+
+ public abstract Uri GetWebServiceUri();
public async Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions)
{
diff --git a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
index 3b9e0a1..90e10ec 100644
--- a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
@@ -15,21 +15,22 @@
namespace DotPulsar.IntegrationTests.Services;
using Abstraction;
+using Xunit.Abstractions;
public static class ServiceFactory
{
private const string _pulsarDeploymentType = "PULSAR_DEPLOYMENT_TYPE";
private const string _containerDeployment = "container";
- public static IPulsarService CreatePulsarService()
+ public static IPulsarService CreatePulsarService(IMessageSink messageSink)
{
var deploymentType = System.Environment.GetEnvironmentVariable(_pulsarDeploymentType);
if (deploymentType == _containerDeployment)
{
- return new StandaloneContainerService();
+ return new StandaloneContainerService(messageSink);
}
- return new StandaloneExternalService();
+ return new StandaloneExternalService(messageSink);
}
}
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
index e54b0fb..4d7d42e 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
@@ -15,18 +15,21 @@
namespace DotPulsar.IntegrationTests.Services;
using System;
-using System.Diagnostics;
using System.Net.Http;
using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
public sealed class StandaloneContainerService : PulsarServiceBase
{
+ public StandaloneContainerService(IMessageSink messageSink) : base(messageSink) { }
+
public override async Task InitializeAsync()
{
- await base.InitializeAsync().ConfigureAwait(false);
- TakeDownPulsar(); // clean-up if anything was left running from previous run
+ await TakeDownPulsar(); // clean-up if anything was left running from previous run
- RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
+ await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+ .ThrowOnFailure();
var waitTries = 10;
@@ -51,33 +54,11 @@
throw new Exception("Unable to confirm Pulsar has initialized");
}
- public override async Task DisposeAsync()
- {
- await base.DisposeAsync().ConfigureAwait(false);
- TakeDownPulsar();
- }
+ protected override Task OnDispose() => TakeDownPulsar();
- private static void TakeDownPulsar()
- => RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml down");
-
- private static void RunProcess(string name, string arguments)
- {
- var processStartInfo = new ProcessStartInfo { FileName = name, Arguments = arguments };
-
- processStartInfo.Environment["TAG"] = "test";
- processStartInfo.Environment["CONFIGURATION"] = "Debug";
- processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
- var process = Process.Start(processStartInfo);
-
- if (process is null)
- throw new Exception("Process.Start returned null");
-
- process.WaitForExit();
-
- if (process.ExitCode != 0)
- throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
- }
+ private Task TakeDownPulsar()
+ => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+ .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
public override Uri GetBrokerUri()
=> new("pulsar://localhost:54545");
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
index ed7222b..c1ce2f0 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
@@ -15,9 +15,14 @@
namespace DotPulsar.IntegrationTests.Services;
using System;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
public sealed class StandaloneExternalService : PulsarServiceBase
{
+ public StandaloneExternalService(IMessageSink messageSink) : base(messageSink) { }
+ public override Task InitializeAsync() => Task.CompletedTask;
+
public override Uri GetBrokerUri()
=> new("pulsar://localhost:6650");
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
new file mode 100644
index 0000000..5a0891a
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
@@ -0,0 +1,156 @@
+namespace DotPulsar.IntegrationTests;
+
+using Abstraction;
+using Abstractions;
+using Extensions;
+using Fixtures;
+using Internal;
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection(nameof(StandaloneTokenClusterTest))]
+public class TokenRefreshTests
+{
+ public enum TokenTestRefreshType
+ {
+ Standard,
+ FailAtStartup,
+ FailOnRefresh,
+ TimeoutOnRefresh
+ }
+
+ private const string MyTopic = "persistent://public/default/mytopic";
+ private readonly ITestOutputHelper _testOutputHelper;
+ private readonly IPulsarService _pulsarService;
+
+ public TokenRefreshTests(ITestOutputHelper outputHelper, TokenClusterFixture fixture)
+ {
+ _testOutputHelper = outputHelper;
+ Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
+ _pulsarService = fixture.PulsarService;
+ }
+
+ [InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with no token refresh failures
+ [InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at startup, not on refresh
+ [InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh which will force a reconnection and fail once more on new connection
+ [InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will be disconnected by server due to slow response to auth challenge
+ [Theory]
+ public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int timesToFail)
+ {
+ var initialRefreshCount = DotPulsarEventSource.Log.TokenRefreshCount;
+
+ var publishingStarted = false;
+ var delayedNames = new HashSet<string>();
+ Task<string> GetToken(string name, ref int count)
+ {
+ if (refreshType is TokenTestRefreshType.Standard)
+ {
+ return GetAuthToken(name);
+ }
+
+ if (refreshType is TokenTestRefreshType.FailAtStartup && !publishingStarted && ++count <= timesToFail)
+ {
+ return Task.FromException<string>(new Exception("Initial Token Failed"));
+ }
+
+ if (refreshType is TokenTestRefreshType.FailOnRefresh && publishingStarted && ++count <= timesToFail)
+ {
+ return Task.FromException<string>(count == 1 ? new Exception("Refresh Failed") : new Exception("Initial Token Failed"));
+ }
+
+ if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && publishingStarted && !delayedNames.Contains(name))
+ {
+ delayedNames.Add(name);
+ return Task.Delay(6000).ContinueWith(_ => GetAuthToken(name)).Unwrap();
+ }
+
+ return GetAuthToken(name);
+ }
+
+ var producerTokenCount = 0;
+ await using var producerClient = GetPulsarClient("Producer", ()
+ => GetToken("Producer", ref producerTokenCount));
+
+ var consumerTokenCount = 0;
+ await using var consumerClient = GetPulsarClient("Consumer", ()
+ => GetToken("Consumer", ref consumerTokenCount));
+
+ var producer = CreateProducer(producerClient);
+
+ var consumer = consumerClient.NewConsumer(Schema.String)
+ .Topic(MyTopic)
+ .SubscriptionName("test-sub")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .Create();
+
+ var received = new List<string>();
+ const int messageCount = 20;
+
+ var publisherTask = Task.Run(async () =>
+ {
+ for (var i = 0; i < messageCount; i++)
+ {
+ _testOutputHelper.WriteLine("Trying to publish message for index {0}", i);
+ var messageId = await producer.Send(Encoding.UTF8.GetBytes(i.ToString()));
+ publishingStarted = true;
+ _testOutputHelper.WriteLine("Published message {0} for index {1}", messageId, i);
+
+ await Task.Delay(1000);
+ }
+ });
+
+ var consumerTask = Task.Run(async () =>
+ {
+ await consumer.OnStateChangeTo(ConsumerState.Active);
+ for (int j = 0; j < messageCount; j++)
+ {
+ var message = await consumer.Receive();
+ received.Add(Encoding.UTF8.GetString(message.Data));
+ }
+ });
+
+ var all = Task.WhenAll(consumerTask, publisherTask);
+ var timeoutTask = Task.Delay(60_000);
+ var result = await Task.WhenAny(all, timeoutTask);
+ Assert.True(result != timeoutTask);
+
+ if (refreshType is TokenTestRefreshType.Standard)
+ {
+ Assert.True(DotPulsarEventSource.Log.TokenRefreshCount > initialRefreshCount);
+ }
+
+ var expected = Enumerable.Range(0, messageCount).Select(i => i.ToString()).ToList();
+ var missing = expected.Except(received).ToList();
+
+ if (missing.Count > 0)
+ {
+ Assert.True(false, $"Missing values: {string.Join(",", missing)}");
+ }
+ }
+
+ private static IProducer<ReadOnlySequence<byte>> CreateProducer(IPulsarClient producerClient)
+ => producerClient.NewProducer()
+ .Topic(MyTopic)
+ .Create();
+
+ private IPulsarClient GetPulsarClient(string name, Func<Task<string>> tokenFactory)
+ => PulsarClient.Builder()
+ .AuthenticateUsingToken(tokenFactory)
+ .RetryInterval(TimeSpan.FromSeconds(1))
+ .ExceptionHandler(ec => _testOutputHelper.WriteLine("Error (handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, ec.Exception))
+ .ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+
+ private async Task<string> GetAuthToken(string name)
+ {
+ var result = await TokenClusterFixture.GetAuthToken(true);
+ _testOutputHelper.WriteLine("{0} received token {1}", name, result);
+ return result;
+ }
+}
diff --git a/tests/DotPulsar.IntegrationTests/appdata/my-secret.key b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
new file mode 100644
index 0000000..457a354
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
@@ -0,0 +1,2 @@
+ûðO¡Ãõ
+¬¢ªm ¦ýÛm4Ãx##¨vn!÷å
\ No newline at end of file
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
index a3642ec..0e517e5 100644
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
@@ -12,3 +12,10 @@
PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
command: |
/bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+ networks:
+ - pulsar-standalone
+
+networks:
+ pulsar-standalone:
+ name: pulsar-standalone
+ driver: bridge
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
new file mode 100644
index 0000000..af76c5b
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
@@ -0,0 +1,34 @@
+version: '3.5'
+
+services:
+
+ pulsar-tokens:
+ container_name: pulsar-tokens
+ image: 'apachepulsar/pulsar:2.7.0'
+ ports:
+ - '54548:8081'
+ - '54547:6651'
+ volumes:
+ - ./appdata/:/appdata
+ environment:
+ - PULSAR_PREFIX_tokenSecretKey=file:///appdata/my-secret.key
+ - authenticationEnabled=true
+ - authorizationEnabled=true
+ - authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ - authenticateOriginalAuthData=false
+ - brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
+ - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.CoBrja1EHr0e2kZKGFS8M-xS2SOC2E08yZmjktvcYOs
+ - superUserRoles=test-user
+ - PULSAR_PREFIX_authenticationRefreshCheckSeconds=5
+ - webServicePort=8081
+ - brokerServicePort=6651
+# - PULSAR_LOG_LEVEL=debug
+ command: |
+ /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+ networks:
+ - pulsar-tokens
+
+networks:
+ pulsar-tokens:
+ name: pulsar-tokens
+ driver: bridge
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index abf1d14..5524900 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -26,6 +26,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+ <ProjectReference Include="..\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj" />
</ItemGroup>
</Project>
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index ef7321c..500cd84 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -14,19 +14,29 @@
namespace DotPulsar.StressTests.Fixtures;
+using IntegrationTests;
using System;
-using System.Diagnostics;
using System.Net.Http;
using System.Threading.Tasks;
using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
public class StandaloneClusterFixture : IAsyncLifetime
{
+ private readonly IMessageSink _messageSink;
+
+ public StandaloneClusterFixture(IMessageSink messageSink)
+ {
+ _messageSink = messageSink;
+ }
+
public async Task InitializeAsync()
{
- TakeDownPulsar(); // clean-up if anything was left running from previous run
+ await TakeDownPulsar(); // clean-up if anything was left running from previous run
- RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
+ await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+ .ThrowOnFailure();
var waitTries = 10;
@@ -54,34 +64,19 @@
throw new Exception("Unable to confirm Pulsar has initialized");
}
- public Task DisposeAsync()
+ public async Task DisposeAsync()
{
- TakeDownPulsar();
- return Task.CompletedTask;
- }
-
- private static void TakeDownPulsar()
- => RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml down");
-
- private static void RunProcess(string name, string arguments)
- {
- var processStartInfo = new ProcessStartInfo
+ try
{
- FileName = name,
- Arguments = arguments
- };
-
- processStartInfo.Environment["TAG"] = "test";
- processStartInfo.Environment["CONFIGURATION"] = "Debug";
- processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
- var process = Process.Start(processStartInfo);
- if (process is null)
- throw new Exception("Process.Start returned null");
-
- process.WaitForExit();
-
- if (process.ExitCode != 0)
- throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
+ await TakeDownPulsar();
+ }
+ catch (Exception e)
+ {
+ _messageSink.OnMessage(new DiagnosticMessage("Error taking down pulsar: {0}", e));
+ }
}
+
+ private Task TakeDownPulsar()
+ => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+ .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
}
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index b9b3cd9..bca3999 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -12,6 +12,8 @@
PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
command: |
/bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+ networks:
+ - pulsar-stresstests
loadbalancer:
container_name: loadbalancer
@@ -25,3 +27,10 @@
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
depends_on:
- pulsar
+ networks:
+ - pulsar-stresstests
+
+networks:
+ pulsar-stresstests:
+ name: pulsar-stresstests
+ driver: bridge
\ No newline at end of file