Add token factory support, respond to server auth challenge on token refresh (#95)

* get pulsar manager running in docker compose

* finally have a test env

* extract separate test for token refresh

* rollback change

* add more test cases

* fix docker issues
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 7ef079e..ea2ab5b 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -16,6 +16,7 @@
 
 using System;
 using System.Security.Cryptography.X509Certificates;
+using System.Threading.Tasks;
 
 /// <summary>
 /// A pulsar client building abstraction.
@@ -33,6 +34,11 @@
     IPulsarClientBuilder AuthenticateUsingToken(string token);
 
     /// <summary>
+    /// Authenticate using a (JSON Web) token factory. This is optional.
+    /// </summary>
+    IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory);
+
+    /// <summary>
     /// Set connection encryption policy. The default is 'EnforceUnencrypted' if the ServiceUrl scheme is 'pulsar' and 'EnforceEncrypted' if it's 'pulsar+ssl'.
     /// </summary>
     IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 1ee72e3..60c6433 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -20,6 +20,8 @@
 using PulsarApi;
 using System;
 using System.Buffers;
+using System.Diagnostics;
+using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -29,14 +31,16 @@
     private readonly ChannelManager _channelManager;
     private readonly PingPongHandler _pingPongHandler;
     private readonly IPulsarStream _stream;
+    private readonly Func<Task<string>>? _accessTokenFactory;
     private int _isDisposed;
 
-    public Connection(IPulsarStream stream, TimeSpan keepAliveInterval)
+    public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, Func<Task<string>>? accessTokenFactory)
     {
         _lock = new AsyncLock();
         _channelManager = new ChannelManager();
         _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
         _stream = stream;
+        _accessTokenFactory = accessTokenFactory;
     }
 
     public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
@@ -81,6 +85,9 @@
         return await responseTask.ConfigureAwait(false);
     }
 
+    private Task Send(CommandAuthResponse authResponse, CancellationToken cancellationToken)
+        => Send(authResponse.AsBaseCommand(), cancellationToken);
+
     public Task Send(CommandPing command, CancellationToken cancellationToken)
         => Send(command.AsBaseCommand(), cancellationToken);
 
@@ -268,13 +275,13 @@
         }
     }
 
-    public async Task ProcessIncommingFrames()
+    public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
     {
         await Task.Yield();
 
         try
         {
-            await foreach (var frame in _stream.Frames())
+            await foreach (var frame in _stream.Frames(cancellationToken))
             {
                 var commandSize = frame.ReadUInt32(0, true);
                 var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
@@ -285,7 +292,18 @@
                 if (command.CommandType == BaseCommand.Type.Message)
                     _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
                 else
-                    _channelManager.Incoming(command);
+                {
+                    if (_accessTokenFactory != null && command.CommandType == BaseCommand.Type.AuthChallenge)
+                    {
+                        var token = await _accessTokenFactory.GetToken();
+                        await Send(new CommandAuthResponse { Response = new AuthData { Data = Encoding.UTF8.GetBytes(token), AuthMethodName = "token" } }, cancellationToken);
+                        DotPulsarEventSource.Log.TokenRefreshed();
+                    }
+                    else
+                    {
+                        _channelManager.Incoming(command);
+                    }
+                }
             }
         }
         catch
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index 14ab85f..9e5dc2f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -21,6 +21,7 @@
 using System;
 using System.Collections.Concurrent;
 using System.Linq;
+using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -36,6 +37,8 @@
     private readonly Task _closeInactiveConnections;
     private readonly string? _listenerName;
     private readonly TimeSpan _keepAliveInterval;
+    private readonly IExecute _executor;
+    private readonly Func<Task<string>>? _accessTokenFactory;
 
     public ConnectionPool(
         CommandConnect commandConnect,
@@ -44,7 +47,9 @@
         EncryptionPolicy encryptionPolicy,
         TimeSpan closeInactiveConnectionsInterval,
         string? listenerName,
-        TimeSpan keepAliveInterval)
+        TimeSpan keepAliveInterval,
+        IExecute executor,
+        Func<Task<string>>? accessTokenFactory)
     {
         _lock = new AsyncLock();
         _commandConnect = commandConnect;
@@ -56,6 +61,8 @@
         _cancellationTokenSource = new CancellationTokenSource();
         _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
         _keepAliveInterval = keepAliveInterval;
+        _executor = executor;
+        _accessTokenFactory = accessTokenFactory;
     }
 
     public async ValueTask DisposeAsync()
@@ -105,6 +112,7 @@
 
             if (response.LookupTopicResponse.ProxyThroughServiceUrl)
             {
+                await connection.DisposeAsync();
                 var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
                 return await GetConnection(url, cancellationToken).ConfigureAwait(false);
             }
@@ -157,14 +165,21 @@
     private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
     {
         var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
-        var connection = new Connection(new PulsarStream(stream), _keepAliveInterval);
+        var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _accessTokenFactory);
         DotPulsarEventSource.Log.ConnectionCreated();
         _connections[url] = connection;
-        _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(url));
+        _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(url));
         var commandConnect = _commandConnect;
 
+        if (_accessTokenFactory != null)
+        {
+            var token = await _accessTokenFactory.GetToken(_executor);
+            commandConnect.AuthMethodName = "token";
+            commandConnect.AuthData = Encoding.UTF8.GetBytes(token);
+        }
+
         if (url.ProxyThroughServiceUrl)
-            commandConnect = WithProxyToBroker(_commandConnect, url.Logical);
+            commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
         var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
         response.Expect(BaseCommand.Type.Connected);
@@ -184,15 +199,16 @@
     {
         return new CommandConnect
         {
-            AuthData = commandConnect.AuthData,
-            AuthMethod = commandConnect.AuthMethod,
-            AuthMethodName = commandConnect.AuthMethodName,
+            AuthData = commandConnect.ShouldSerializeAuthData() ? commandConnect.AuthData : null,
+            AuthMethod = commandConnect.ShouldSerializeAuthMethod() ? commandConnect.AuthMethod : AuthMethod.AuthMethodNone,
+            AuthMethodName = commandConnect.ShouldSerializeAuthMethodName() ? commandConnect.AuthMethodName : null,
             ClientVersion = commandConnect.ClientVersion,
-            OriginalPrincipal = commandConnect.OriginalPrincipal,
+            OriginalPrincipal = commandConnect.ShouldSerializeOriginalPrincipal() ? commandConnect.OriginalPrincipal : null,
             ProtocolVersion = commandConnect.ProtocolVersion,
-            OriginalAuthData = commandConnect.OriginalAuthData,
-            OriginalAuthMethod = commandConnect.OriginalAuthMethod,
-            ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"
+            OriginalAuthData = commandConnect.ShouldSerializeOriginalAuthData() ? commandConnect.OriginalAuthData : null,
+            OriginalAuthMethod = commandConnect.ShouldSerializeOriginalAuthMethod() ? commandConnect.OriginalAuthMethod : null,
+            ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}",
+            FeatureFlags = commandConnect.FeatureFlags
         };
     }
 
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index a42eeef..019340b 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -62,6 +62,7 @@
                 SocketError.NetworkUnreachable => FaultAction.Rethrow,
                 _ => FaultAction.Retry
             },
+            TokenFactoryFailedException => FaultAction.Retry,
             _ => FaultAction.Rethrow
         };
 }
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 5547b07..aaccbc4 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -38,6 +38,10 @@
     public void ReaderCreated() { }
 
     public void ReaderDisposed() { }
+
+    public void TokenRefreshed() { }
+
+    public long TokenRefreshCount => 0;
 }
 
 #else
@@ -76,11 +80,15 @@
 
     private PollingCounter? _currentReadersCounter;
     private long _currentReaders;
+
+    private PollingCounter? _tokenRefreshCounter;
+    private long _totalTokenRefreshes;
 #pragma warning restore IDE0052 // Remove unread private members
 
     public static readonly DotPulsarEventSource Log = new();
 
     public DotPulsarEventSource() : base("DotPulsar") { }
+    public long TokenRefreshCount => _totalTokenRefreshes;
 
     public void ClientCreated()
     {
@@ -137,6 +145,11 @@
         Interlocked.Decrement(ref _currentReaders);
     }
 
+    public void TokenRefreshed()
+    {
+        Interlocked.Increment(ref _totalTokenRefreshes);
+    }
+
     protected override void OnEventCommand(EventCommandEventArgs command)
     {
         if (command.Command != EventCommand.Enable)
@@ -191,6 +204,11 @@
         {
             DisplayName = "Current number of readers"
         };
+
+        _tokenRefreshCounter ??= new PollingCounter("total-token-refreshes", this, () => Volatile.Read(ref _totalTokenRefreshes))
+        {
+            DisplayName = "Number of times token was refreshed"
+        };
     }
 }
 #endif
diff --git a/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
new file mode 100644
index 0000000..7dee344
--- /dev/null
+++ b/src/DotPulsar/Internal/Exceptions/TokenFactoryFailedException.cs
@@ -0,0 +1,10 @@
+namespace DotPulsar.Internal.Exceptions;
+
+using System;
+
+public class TokenFactoryFailedException : Exception
+{
+    public TokenFactoryFailedException(Exception innerException) : base("Exception when trying to fetch token from token factory", innerException)
+    {
+    }
+}
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index a17e313..1853398 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -100,6 +100,13 @@
             Ping = command
         };
 
+    public static BaseCommand AsBaseCommand(this CommandAuthResponse command)
+        => new()
+        {
+            CommandType = BaseCommand.Type.AuthResponse,
+            AuthResponse = command
+        };
+
     public static BaseCommand AsBaseCommand(this CommandPong command)
         => new()
         {
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index fa06e04..ddd1a52 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Internal;
 
+using Abstractions;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using PulsarApi;
@@ -21,6 +22,7 @@
 using System.Collections.Generic;
 using System.Security.Cryptography.X509Certificates;
 using System.Text;
+using System.Threading.Tasks;
 
 public sealed class PulsarClientBuilder : IPulsarClientBuilder
 {
@@ -36,6 +38,7 @@
     private bool _verifyCertificateAuthority;
     private bool _verifyCertificateName;
     private TimeSpan _closeInactiveConnectionsInterval;
+    private Func<Task<string>>? _tokenFactory;
 
     public PulsarClientBuilder()
     {
@@ -69,6 +72,15 @@
         return this;
     }
 
+    public IPulsarClientBuilder AuthenticateUsingToken(Func<Task<string>> tokenFactory)
+    {
+        _tokenFactory = tokenFactory;
+        var featureFlags = _commandConnect.FeatureFlags ?? new FeatureFlags();
+        featureFlags.SupportsAuthRefresh = true;
+        _commandConnect.FeatureFlags = featureFlags;
+        return this;
+    }
+
     public IPulsarClientBuilder ConnectionSecurity(EncryptionPolicy encryptionPolicy)
     {
         _encryptionPolicy = encryptionPolicy;
@@ -152,13 +164,21 @@
         else
             throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'");
 
-
         var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
-        var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval);
-        var processManager = new ProcessManager(connectionPool);
+
         var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
         var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
+        var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName,
+            _keepAliveInterval,
+            new Executor(Guid.Empty, new EmptyRegisterEvent(), exceptionHandlerPipeline),
+            _tokenFactory);
+        var processManager = new ProcessManager(connectionPool);
 
         return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
     }
 }
+
+internal class EmptyRegisterEvent : IRegisterEvent
+{
+    public void Register(IEvent @event) { }
+}
diff --git a/src/DotPulsar/Internal/TokenFactoryExtensions.cs b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
new file mode 100644
index 0000000..3aa26bf
--- /dev/null
+++ b/src/DotPulsar/Internal/TokenFactoryExtensions.cs
@@ -0,0 +1,26 @@
+namespace DotPulsar.Internal;
+
+using Abstractions;
+using Exceptions;
+using System;
+using System.Threading.Tasks;
+
+internal static class TokenFactoryExtensions
+{
+    public static async Task<string> GetToken(this Func<Task<string>> tokenFactory, IExecute executor)
+    {
+        return await executor.Execute(tokenFactory.GetToken);
+    }
+
+    public static async Task<string> GetToken(this Func<Task<string>> tokenFactory)
+    {
+        try
+        {
+            return await tokenFactory();
+        }
+        catch (Exception e)
+        {
+            throw new TokenFactoryFailedException(e);
+        }
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
index c65fb53..c1997b6 100644
--- a/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
+++ b/tests/DotPulsar.IntegrationTests/DotPulsar.IntegrationTests.csproj
@@ -28,6 +28,12 @@
       <None Update="docker-compose-standalone-tests.yml">
         <CopyToOutputDirectory>Always</CopyToOutputDirectory>
       </None>
+      <None Update="docker-compose-standalone-token-tests.yml">
+        <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+      </None>
+      <None Update="appdata\my-secret.key">
+        <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+      </None>
     </ItemGroup>
 
 </Project>
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
index c3e8d9d..eeb3090 100644
--- a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneClusterFixture.cs
@@ -18,14 +18,22 @@
 using Services;
 using System.Threading.Tasks;
 using Xunit;
+using Xunit.Abstractions;
 
 public class StandaloneClusterFixture : IAsyncLifetime
 {
+    private readonly IMessageSink _messageSink;
+
+    public StandaloneClusterFixture(IMessageSink messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
     public IPulsarService? PulsarService { private set; get; }
 
     public async Task InitializeAsync()
     {
-        PulsarService = ServiceFactory.CreatePulsarService();
+        PulsarService = ServiceFactory.CreatePulsarService(_messageSink);
         await PulsarService.InitializeAsync();
     }
 
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
new file mode 100644
index 0000000..cb27b93
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/StandaloneTokenClusterTest.cs
@@ -0,0 +1,6 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Xunit;
+
+[CollectionDefinition(nameof(StandaloneTokenClusterTest))]
+public class StandaloneTokenClusterTest : ICollectionFixture<TokenClusterFixture> { }
diff --git a/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
new file mode 100644
index 0000000..3c57911
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/Fixtures/TokenClusterFixture.cs
@@ -0,0 +1,88 @@
+namespace DotPulsar.IntegrationTests.Fixtures;
+
+using Abstraction;
+using Services;
+using System;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+public class TokenClusterFixture : PulsarServiceBase
+{
+    private readonly IMessageSink _messageSink;
+
+    public TokenClusterFixture(IMessageSink messageSink) : base(messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
+    public IPulsarService PulsarService => this;
+
+    public override async Task InitializeAsync()
+    {
+        await TakeDownPulsar(); // clean-up if anything was left running from previous run
+
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml up -d")
+            .ThrowOnFailure();
+
+        var waitTries = 10;
+
+        using var handler = new HttpClientHandler { AllowAutoRedirect = true };
+
+        using var client = new HttpClient(handler);
+
+        var token = await GetAuthToken(false);
+
+        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
+
+        while (waitTries > 0)
+        {
+            try
+            {
+                await client.GetAsync($"{PulsarService.GetWebServiceUri()}/metrics/").ConfigureAwait(false);
+                return;
+            }
+            catch(Exception e)
+            {
+                _messageSink.OnMessage(new DiagnosticMessage("Error trying to fetch metrics: {0}", e));
+                waitTries--;
+                await Task.Delay(5000).ConfigureAwait(false);
+            }
+        }
+
+        throw new Exception("Unable to confirm Pulsar has initialized");
+    }
+
+    protected override async Task OnDispose()
+        => await TakeDownPulsar();
+
+    public override Uri GetBrokerUri() => new("pulsar://localhost:54547");
+
+    public override Uri GetWebServiceUri() => new("http://localhost:54548");
+
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-token-tests.yml down")
+            .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
+
+    public static async Task<string> GetAuthToken(bool includeExpiry)
+    {
+        var arguments = "exec pulsar-tokens bin/pulsar tokens create --secret-key file:///appdata/my-secret.key --subject test-user";
+
+        if (includeExpiry)
+        {
+            arguments += " --expiry-time 10s";
+        }
+
+        var result = await ProcessAsyncHelper.ExecuteShellCommand("docker",
+            arguments);
+
+        if (!result.Completed)
+        {
+            throw new InvalidOperationException($"Getting token from container failed{Environment.NewLine}{result.Output}");
+        }
+
+        return result.Output;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
new file mode 100644
index 0000000..56c00b8
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/ProcessAsyncHelper.cs
@@ -0,0 +1,127 @@
+using System;
+using System.Diagnostics;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace DotPulsar.IntegrationTests;
+
+public static class ProcessAsyncHelper
+{
+    public static async Task ThrowOnFailure(this Task<ProcessResult> resultTask)
+    {
+        var result = await resultTask;
+
+        if (!result.Completed)
+        {
+            throw new InvalidOperationException($"Process did not complete correctly, {Environment.NewLine}{result.Output}");
+        }
+    }
+
+    public static async Task LogFailure(this Task<ProcessResult> resultTask, Action<string> logAction)
+    {
+        var result = await resultTask;
+
+        if (!result.Completed)
+        {
+            logAction(result.Output);
+        }
+    }
+
+    public static async Task<ProcessResult> ExecuteShellCommand(string command, string arguments)
+    {
+        var result = new ProcessResult();
+
+        using var process = new Process();
+
+        process.StartInfo.FileName = command;
+        process.StartInfo.Arguments = arguments;
+        process.StartInfo.UseShellExecute = false;
+        process.StartInfo.RedirectStandardOutput = true;
+        process.StartInfo.RedirectStandardError = true;
+        process.StartInfo.CreateNoWindow = true;
+
+        var outputBuilder = new StringBuilder();
+        var outputCloseEvent = new TaskCompletionSource<bool>();
+
+        process.OutputDataReceived += (s, e) =>
+        {
+            // The output stream has been closed i.e. the process has terminated
+            if (e.Data == null)
+            {
+                outputCloseEvent.SetResult(true);
+            }
+            else
+            {
+                outputBuilder.Append(e.Data);
+            }
+        };
+
+        var errorBuilder = new StringBuilder();
+        var errorCloseEvent = new TaskCompletionSource<bool>();
+
+        process.ErrorDataReceived += (s, e) =>
+        {
+            // The error stream has been closed i.e. the process has terminated
+            if (e.Data == null)
+            {
+                errorCloseEvent.SetResult(true);
+            }
+            else
+            {
+                errorBuilder.Append(e.Data);
+            }
+        };
+
+        bool isStarted;
+
+        try
+        {
+            isStarted = process.Start();
+        }
+        catch (Exception error)
+        {
+            // Usually it occurs when an executable file is not found or is not executable
+
+            result.Completed = true;
+            result.ExitCode = -1;
+            result.Output = error.Message;
+
+            isStarted = false;
+        }
+
+        if (isStarted)
+        {
+            // Reads the output stream first and then waits because deadlocks are possible
+            process.BeginOutputReadLine();
+            process.BeginErrorReadLine();
+
+            // Creates task to wait for process exit using timeout
+            var waitForExit = WaitForExitAsync(process);
+
+            // Create task to wait for process exit and closing all output streams
+            await Task.WhenAll(waitForExit, outputCloseEvent.Task, errorCloseEvent.Task);
+
+            result.Completed = waitForExit.Result;
+            result.ExitCode = process.ExitCode;
+            // Adds process output if it was completed with error
+            result.Output = process.ExitCode != 0 ? $"{outputBuilder}{errorBuilder}" : outputBuilder.ToString();
+        }
+
+        return result;
+    }
+
+
+    private static async Task<bool> WaitForExitAsync(Process process)
+    {
+        await process.WaitForExitAsync();
+        return process.ExitCode == 0;
+    }
+
+
+    public struct ProcessResult
+    {
+        public bool Completed;
+        public int? ExitCode;
+        public string Output;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
index 7f642d8..c614a84 100644
--- a/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/PulsarServiceBase.cs
@@ -20,33 +20,45 @@
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
-public class PulsarServiceBase : IPulsarService
+public abstract class PulsarServiceBase : IPulsarService
 {
+    protected readonly IMessageSink MessageSink;
     private readonly CancellationTokenSource _cts;
     private readonly HttpClient _adminClient;
 
-    protected PulsarServiceBase()
+    protected PulsarServiceBase(IMessageSink messageSink)
     {
+        MessageSink = messageSink;
         _cts = new CancellationTokenSource();
         _adminClient = new HttpClient();
     }
 
-    public virtual Task InitializeAsync()
-        => Task.CompletedTask;
+    public abstract Task InitializeAsync();
 
-    public virtual Task DisposeAsync()
+    public async Task DisposeAsync()
     {
         _adminClient.Dispose();
         _cts.Dispose();
-        return Task.CompletedTask;
+
+        try
+        {
+            await OnDispose();
+        }
+        catch (Exception e)
+        {
+            MessageSink.OnMessage(new DiagnosticMessage("Error disposing: {0}", e));
+        }
     }
 
-    public virtual Uri GetBrokerUri()
-        => throw new NotImplementedException();
+    protected virtual Task OnDispose()
+        => Task.CompletedTask;
 
-    public virtual Uri GetWebServiceUri()
-        => throw new NotImplementedException();
+    public abstract Uri GetBrokerUri();
+
+    public abstract Uri GetWebServiceUri();
 
     public async Task<HttpResponseMessage?> CreatePartitionedTopic(string restTopic, int numPartitions)
     {
diff --git a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
index 3b9e0a1..90e10ec 100644
--- a/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/ServiceFactory.cs
@@ -15,21 +15,22 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using Abstraction;
+using Xunit.Abstractions;
 
 public static class ServiceFactory
 {
     private const string _pulsarDeploymentType = "PULSAR_DEPLOYMENT_TYPE";
     private const string _containerDeployment = "container";
 
-    public static IPulsarService CreatePulsarService()
+    public static IPulsarService CreatePulsarService(IMessageSink messageSink)
     {
         var deploymentType = System.Environment.GetEnvironmentVariable(_pulsarDeploymentType);
 
         if (deploymentType == _containerDeployment)
         {
-            return new StandaloneContainerService();
+            return new StandaloneContainerService(messageSink);
         }
 
-        return new StandaloneExternalService();
+        return new StandaloneExternalService(messageSink);
     }
 }
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
index e54b0fb..4d7d42e 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneContainerService.cs
@@ -15,18 +15,21 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using System;
-using System.Diagnostics;
 using System.Net.Http;
 using System.Threading.Tasks;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
 public sealed class StandaloneContainerService : PulsarServiceBase
 {
+    public StandaloneContainerService(IMessageSink messageSink) : base(messageSink) { }
+
     public override async Task InitializeAsync()
     {
-        await base.InitializeAsync().ConfigureAwait(false);
-        TakeDownPulsar(); // clean-up if anything was left running from previous run
+        await TakeDownPulsar(); // clean-up if anything was left running from previous run
 
-        RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+            .ThrowOnFailure();
 
         var waitTries = 10;
 
@@ -51,33 +54,11 @@
         throw new Exception("Unable to confirm Pulsar has initialized");
     }
 
-    public override async Task DisposeAsync()
-    {
-        await base.DisposeAsync().ConfigureAwait(false);
-        TakeDownPulsar();
-    }
+    protected override Task OnDispose() => TakeDownPulsar();
 
-    private static void TakeDownPulsar()
-        => RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml down");
-
-    private static void RunProcess(string name, string arguments)
-    {
-        var processStartInfo = new ProcessStartInfo { FileName = name, Arguments = arguments };
-
-        processStartInfo.Environment["TAG"] = "test";
-        processStartInfo.Environment["CONFIGURATION"] = "Debug";
-        processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
-        var process = Process.Start(processStartInfo);
-
-        if (process is null)
-            throw new Exception("Process.Start returned null");
-
-        process.WaitForExit();
-
-        if (process.ExitCode != 0)
-            throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
-    }
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+            .LogFailure(s => MessageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
 
     public override Uri GetBrokerUri()
         => new("pulsar://localhost:54545");
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
index ed7222b..c1ce2f0 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
@@ -15,9 +15,14 @@
 namespace DotPulsar.IntegrationTests.Services;
 
 using System;
+using System.Threading.Tasks;
+using Xunit.Abstractions;
 
 public sealed class StandaloneExternalService : PulsarServiceBase
 {
+    public StandaloneExternalService(IMessageSink messageSink) : base(messageSink) { }
+    public override Task InitializeAsync() => Task.CompletedTask;
+
     public override Uri GetBrokerUri()
         => new("pulsar://localhost:6650");
 
diff --git a/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
new file mode 100644
index 0000000..5a0891a
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/TokenRefreshTests.cs
@@ -0,0 +1,156 @@
+namespace DotPulsar.IntegrationTests;
+
+using Abstraction;
+using Abstractions;
+using Extensions;
+using Fixtures;
+using Internal;
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection(nameof(StandaloneTokenClusterTest))]
+public class TokenRefreshTests
+{
+    public enum TokenTestRefreshType
+    {
+        Standard,
+        FailAtStartup,
+        FailOnRefresh,
+        TimeoutOnRefresh
+    }
+
+    private const string MyTopic = "persistent://public/default/mytopic";
+    private readonly ITestOutputHelper _testOutputHelper;
+    private readonly IPulsarService _pulsarService;
+
+    public TokenRefreshTests(ITestOutputHelper outputHelper, TokenClusterFixture fixture)
+    {
+        _testOutputHelper = outputHelper;
+        Debug.Assert(fixture.PulsarService != null, "fixture.PulsarService != null");
+        _pulsarService = fixture.PulsarService;
+    }
+
+    [InlineData(TokenTestRefreshType.Standard, 0)] // Standard happy path with no token refresh failures
+    [InlineData(TokenTestRefreshType.FailAtStartup, 1)] // 1 Failure at startup, not on refresh
+    [InlineData(TokenTestRefreshType.FailOnRefresh, 2)] // Fails on refresh which will force a reconnection and fail once more on new connection
+    [InlineData(TokenTestRefreshType.TimeoutOnRefresh, 0)] // Connection will be disconnected by server due to slow response to auth challenge
+    [Theory]
+    public async Task TestExpiryRefresh(TokenTestRefreshType refreshType, int timesToFail)
+    {
+        var initialRefreshCount = DotPulsarEventSource.Log.TokenRefreshCount;
+
+        var publishingStarted = false;
+        var delayedNames = new HashSet<string>();
+        Task<string> GetToken(string name, ref int count)
+        {
+            if (refreshType is TokenTestRefreshType.Standard)
+            {
+                return GetAuthToken(name);
+            }
+
+            if (refreshType is TokenTestRefreshType.FailAtStartup && !publishingStarted && ++count <= timesToFail)
+            {
+                return Task.FromException<string>(new Exception("Initial Token Failed"));
+            }
+
+            if (refreshType is TokenTestRefreshType.FailOnRefresh && publishingStarted && ++count <= timesToFail)
+            {
+                return Task.FromException<string>(count == 1 ? new Exception("Refresh Failed") : new Exception("Initial Token Failed"));
+            }
+
+            if (refreshType is TokenTestRefreshType.TimeoutOnRefresh && publishingStarted && !delayedNames.Contains(name))
+            {
+                delayedNames.Add(name);
+                return Task.Delay(6000).ContinueWith(_ => GetAuthToken(name)).Unwrap();
+            }
+
+            return GetAuthToken(name);
+        }
+
+        var producerTokenCount = 0;
+        await using var producerClient = GetPulsarClient("Producer", ()
+            => GetToken("Producer", ref producerTokenCount));
+
+        var consumerTokenCount = 0;
+        await using var consumerClient = GetPulsarClient("Consumer", ()
+            => GetToken("Consumer", ref consumerTokenCount));
+
+        var producer = CreateProducer(producerClient);
+
+        var consumer = consumerClient.NewConsumer(Schema.String)
+            .Topic(MyTopic)
+            .SubscriptionName("test-sub")
+            .InitialPosition(SubscriptionInitialPosition.Earliest)
+            .Create();
+
+        var received = new List<string>();
+        const int messageCount = 20;
+
+        var publisherTask = Task.Run(async () =>
+        {
+            for (var i = 0; i < messageCount; i++)
+            {
+                _testOutputHelper.WriteLine("Trying to publish message for index {0}", i);
+                var messageId = await producer.Send(Encoding.UTF8.GetBytes(i.ToString()));
+                publishingStarted = true;
+                _testOutputHelper.WriteLine("Published message {0} for index {1}", messageId, i);
+
+                await Task.Delay(1000);
+            }
+        });
+
+        var consumerTask = Task.Run(async () =>
+        {
+            await consumer.OnStateChangeTo(ConsumerState.Active);
+            for (int j = 0; j < messageCount; j++)
+            {
+                var message = await consumer.Receive();
+                received.Add(Encoding.UTF8.GetString(message.Data));
+            }
+        });
+
+        var all = Task.WhenAll(consumerTask, publisherTask);
+        var timeoutTask = Task.Delay(60_000);
+        var result = await Task.WhenAny(all, timeoutTask);
+        Assert.True(result != timeoutTask);
+
+        if (refreshType is TokenTestRefreshType.Standard)
+        {
+            Assert.True(DotPulsarEventSource.Log.TokenRefreshCount > initialRefreshCount);
+        }
+
+        var expected = Enumerable.Range(0, messageCount).Select(i => i.ToString()).ToList();
+        var missing = expected.Except(received).ToList();
+
+        if (missing.Count > 0)
+        {
+            Assert.True(false, $"Missing values: {string.Join(",", missing)}");
+        }
+    }
+
+    private static IProducer<ReadOnlySequence<byte>> CreateProducer(IPulsarClient producerClient)
+        => producerClient.NewProducer()
+            .Topic(MyTopic)
+            .Create();
+
+    private IPulsarClient GetPulsarClient(string name, Func<Task<string>> tokenFactory)
+        => PulsarClient.Builder()
+            .AuthenticateUsingToken(tokenFactory)
+            .RetryInterval(TimeSpan.FromSeconds(1))
+            .ExceptionHandler(ec => _testOutputHelper.WriteLine("Error (handled={0}) occurred in {1} client: {2}", ec.ExceptionHandled, name, ec.Exception))
+            .ServiceUrl(_pulsarService.GetBrokerUri()).Build();
+
+    private async Task<string> GetAuthToken(string name)
+    {
+        var result = await TokenClusterFixture.GetAuthToken(true);
+        _testOutputHelper.WriteLine("{0} received token {1}", name, result);
+        return result;
+    }
+}
diff --git a/tests/DotPulsar.IntegrationTests/appdata/my-secret.key b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
new file mode 100644
index 0000000..457a354
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/appdata/my-secret.key
@@ -0,0 +1,2 @@
+ûðO¡Ãõ
+¬¢ªm	¦ýÛm4Ãx#™#¨vn!÷å
\ No newline at end of file
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
index a3642ec..0e517e5 100644
--- a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-tests.yml
@@ -12,3 +12,10 @@
       PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
     command: |
       /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-standalone
+        
+networks:
+  pulsar-standalone:
+    name: pulsar-standalone
+    driver: bridge
diff --git a/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
new file mode 100644
index 0000000..af76c5b
--- /dev/null
+++ b/tests/DotPulsar.IntegrationTests/docker-compose-standalone-token-tests.yml
@@ -0,0 +1,34 @@
+version: '3.5'
+  
+services:
+        
+  pulsar-tokens:
+    container_name: pulsar-tokens
+    image: 'apachepulsar/pulsar:2.7.0'
+    ports:
+      - '54548:8081'
+      - '54547:6651'
+    volumes:
+      - ./appdata/:/appdata
+    environment:
+      - PULSAR_PREFIX_tokenSecretKey=file:///appdata/my-secret.key
+      - authenticationEnabled=true
+      - authorizationEnabled=true
+      - authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+      - authenticateOriginalAuthData=false
+      - brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
+      - brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.CoBrja1EHr0e2kZKGFS8M-xS2SOC2E08yZmjktvcYOs
+      - superUserRoles=test-user
+      - PULSAR_PREFIX_authenticationRefreshCheckSeconds=5
+      - webServicePort=8081
+      - brokerServicePort=6651
+#      - PULSAR_LOG_LEVEL=debug
+    command: |
+      /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-tokens
+        
+networks:
+  pulsar-tokens:
+    name: pulsar-tokens
+    driver: bridge
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index abf1d14..5524900 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -26,6 +26,7 @@
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    <ProjectReference Include="..\DotPulsar.IntegrationTests\DotPulsar.IntegrationTests.csproj" />
   </ItemGroup>
 
 </Project>
diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
index ef7321c..500cd84 100644
--- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
+++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs
@@ -14,19 +14,29 @@
 
 namespace DotPulsar.StressTests.Fixtures;
 
+using IntegrationTests;
 using System;
-using System.Diagnostics;
 using System.Net.Http;
 using System.Threading.Tasks;
 using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
 
 public class StandaloneClusterFixture : IAsyncLifetime
 {
+    private readonly IMessageSink _messageSink;
+
+    public StandaloneClusterFixture(IMessageSink messageSink)
+    {
+        _messageSink = messageSink;
+    }
+
     public async Task InitializeAsync()
     {
-        TakeDownPulsar(); // clean-up if anything was left running from previous run
+        await TakeDownPulsar(); // clean-up if anything was left running from previous run
 
-        RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml up -d");
+        await ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml up -d")
+            .ThrowOnFailure();
 
         var waitTries = 10;
 
@@ -54,34 +64,19 @@
         throw new Exception("Unable to confirm Pulsar has initialized");
     }
 
-    public Task DisposeAsync()
+    public async Task DisposeAsync()
     {
-        TakeDownPulsar();
-        return Task.CompletedTask;
-    }
-
-    private static void TakeDownPulsar()
-        => RunProcess("docker-compose", "-f docker-compose-standalone-tests.yml down");
-
-    private static void RunProcess(string name, string arguments)
-    {
-        var processStartInfo = new ProcessStartInfo
+        try
         {
-            FileName = name,
-            Arguments = arguments
-        };
-
-        processStartInfo.Environment["TAG"] = "test";
-        processStartInfo.Environment["CONFIGURATION"] = "Debug";
-        processStartInfo.Environment["COMPUTERNAME"] = Environment.MachineName;
-
-        var process = Process.Start(processStartInfo);
-        if (process is null)
-            throw new Exception("Process.Start returned null");
-
-        process.WaitForExit();
-
-        if (process.ExitCode != 0)
-            throw new Exception($"Exit code {process.ExitCode} when running process {name} with arguments {arguments}");
+            await TakeDownPulsar();
+        }
+        catch (Exception e)
+        {
+            _messageSink.OnMessage(new DiagnosticMessage("Error taking down pulsar: {0}", e));
+        }
     }
+
+    private Task TakeDownPulsar()
+        => ProcessAsyncHelper.ExecuteShellCommand("docker-compose", "-f docker-compose-standalone-tests.yml down")
+            .LogFailure(s => _messageSink.OnMessage(new DiagnosticMessage("Error bringing down container: {0}", s)));
 }
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index b9b3cd9..bca3999 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -12,6 +12,8 @@
       PULSAR_MEM: " -Xms1g -Xmx1g -XX:MaxDirectMemorySize=2g"
     command: |
       /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone --no-functions-worker"
+    networks:
+      - pulsar-stresstests
 
   loadbalancer:
     container_name: loadbalancer
@@ -25,3 +27,10 @@
       - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
     depends_on:
       - pulsar
+    networks:
+      - pulsar-stresstests
+
+networks:
+  pulsar-stresstests:
+    name: pulsar-stresstests
+    driver: bridge
\ No newline at end of file