Make ready for release 3.0.1
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2dd7a71..1247972 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,16 +4,15 @@
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
-## [Unreleased]
+## [3.0.1] - 2023-09-15
 
 ### Changed
 
-- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if
-  MessageId.Earliest is found.
+- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if MessageId.Earliest is found
 
 ### Fixed
 
-- Fixed issue with DotPulsar client not handling connection faults for consumers and readers.
+- Fixed issue with the DotPulsar client not handling connection faults for consumers and readers
 
 ## [3.0.0] - 2023-08-30
 
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 36c6bb4..0e8fbd3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     <TargetFrameworks>netstandard2.0;netstandard2.1;net6.0;net7.0</TargetFrameworks>
-    <Version>3.0.0</Version>
+    <Version>3.0.1</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
     <Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs b/tests/DotPulsar.Tests/ConsumerTests.cs
index 4c63a51..6460dbe 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -208,26 +208,15 @@
     {
         //Arrange
         var semaphoreSlim = new SemaphoreSlim(1);
-        await using var
-            client = PulsarClient.Builder().ExceptionHandler(context =>
-                {
-                    semaphoreSlim.WaitAsync();
-                    context.Result = FaultAction.Rethrow;
-                    context.ExceptionHandled = true;
-                })
-                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
-                .Build();
+        await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+        {
+            semaphoreSlim.WaitAsync();
+            context.Result = FaultAction.Rethrow;
+            context.ExceptionHandled = true;
+        })
+        .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var consumer = client.NewConsumer(Schema.String)
-            .StateChangedHandler(changed =>
-            {
-                var topic = changed.Consumer.Topic;
-                var state = changed.ConsumerState;
-                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
-            })
-            .SubscriptionName("MySubscription")
-            .Topic("persistent://public/default/mytopic")
-            .Create();
+        await using var consumer = CreateConsumer(client, SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
 
         var receiveTask = consumer.Receive().AsTask();
         semaphoreSlim.Release();
@@ -243,29 +232,16 @@
     public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
     {
         //Arrange
-        var cts = new CancellationTokenSource();
+        await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+        {
+            context.Result = FaultAction.Rethrow;
+            context.ExceptionHandled = true;
+        })
+        .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var
-            client = PulsarClient.Builder().ExceptionHandler(context =>
-                {
-                    context.Result = FaultAction.Rethrow;
-                    context.ExceptionHandled = true;
-                })
-                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
-                .Build();
+        await using var consumer = CreateConsumer(client, SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
 
-        await using var consumer = client.NewConsumer(Schema.String)
-            .StateChangedHandler(changed =>
-            {
-                var topic = changed.Consumer.Topic;
-                var state = changed.ConsumerState;
-                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
-            })
-            .SubscriptionName("MySubscription")
-            .Topic("persistent://public/default/mytopic")
-            .Create();
-
-        await consumer.OnStateChangeTo(ConsumerState.Faulted, cts.Token);
+        await consumer.OnStateChangeTo(ConsumerState.Faulted);
 
         //Act
         var exception = await Record.ExceptionAsync(() => consumer.Receive().AsTask());
@@ -305,18 +281,30 @@
         return messageIds;
     }
 
-    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, string topicName) => pulsarClient.NewProducer(Schema.String)
+    private void LogState(ConsumerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The consumer for topic '{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
+
+    private void LogState(ProducerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The producer for topic '{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
+    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, string topicName)
+        => pulsarClient.NewProducer(Schema.String)
         .Topic(topicName)
+        .StateChangedHandler(LogState)
         .Create();
 
-    private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, SubscriptionInitialPosition subscriptionInitialPosition,
+    private IConsumer<string> CreateConsumer(
+        IPulsarClient pulsarClient,
+        SubscriptionInitialPosition subscriptionInitialPosition,
         string topicName,
         string consumerName,
-        string subscriptionName) => pulsarClient.NewConsumer(Schema.String)
+        string subscriptionName)
+        => pulsarClient.NewConsumer(Schema.String)
         .ConsumerName(consumerName)
         .InitialPosition(subscriptionInitialPosition)
         .SubscriptionName(subscriptionName)
         .Topic(topicName)
+        .StateChangedHandler(LogState)
         .Create();
 
     private IPulsarClient CreateClient()
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs b/tests/DotPulsar.Tests/ReaderTests.cs
index 6a595d6..a2c546f 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -210,26 +210,15 @@
     {
         //Arrange
         var semaphoreSlim = new SemaphoreSlim(1);
-        await using var
-            client = PulsarClient.Builder().ExceptionHandler(context =>
-                {
-                    semaphoreSlim.WaitAsync();
-                    context.Result = FaultAction.Rethrow;
-                    context.ExceptionHandled = true;
-                })
-                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
-                .Build();
+        await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+        {
+            semaphoreSlim.WaitAsync();
+            context.Result = FaultAction.Rethrow;
+            context.ExceptionHandled = true;
+        })
+        .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var reader = client.NewReader(Schema.String)
-            .StartMessageId(MessageId.Earliest)
-            .StateChangedHandler(changed =>
-            {
-                var topic = changed.Reader.Topic;
-                var state = changed.ReaderState;
-                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
-            })
-            .Topic("persistent://public/default/mytopic")
-            .Create();
+        await using var reader = CreateReader(client, MessageId.Earliest, "persistent://a/b/c");
 
         var receiveTask = reader.Receive().AsTask();
         semaphoreSlim.Release();
@@ -245,29 +234,16 @@
     public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
     {
         //Arrange
-        var cts = new CancellationTokenSource();
+        await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+        {
+            context.Result = FaultAction.Rethrow;
+            context.ExceptionHandled = true;
+        })
+        .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
 
-        await using var
-            client = PulsarClient.Builder().ExceptionHandler(context =>
-                {
-                    context.Result = FaultAction.Rethrow;
-                    context.ExceptionHandled = true;
-                })
-                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
-                .Build();
+        await using var reader = CreateReader(client, MessageId.Earliest, "persistent://a/b/c");
 
-        await using var reader = client.NewReader(Schema.String)
-            .StartMessageId(MessageId.Earliest)
-            .StateChangedHandler(changed =>
-            {
-                var topic = changed.Reader.Topic;
-                var state = changed.ReaderState;
-                _testOutputHelper.WriteLine($"The reader for topic '{topic}' changed state to '{state}'");
-            })
-            .Topic("persistent://public/default/mytopic")
-            .Create();
-
-        await reader.OnStateChangeTo(ReaderState.Faulted, cts.Token);
+        await reader.OnStateChangeTo(ReaderState.Faulted);
 
         //Act
         var exception = await Record.ExceptionAsync(() => reader.Receive().AsTask());
@@ -276,13 +252,21 @@
         exception.Should().BeOfType<ReaderFaultedException>();
     }
 
+    private void LogState(ReaderStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The reader for topic '{stateChange.Reader.Topic}' changed state to '{stateChange.ReaderState}'");
+
+    private void LogState(ProducerStateChanged stateChange)
+        => _testOutputHelper.WriteLine($"The producer for topic '{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
     private IProducer<String> CreateProducer(IPulsarClient pulsarClient, string topicName) => pulsarClient.NewProducer(Schema.String)
         .Topic(topicName)
+        .StateChangedHandler(LogState)
         .Create();
 
     private IReader<String> CreateReader(IPulsarClient pulsarClient, MessageId messageId, string topicName) => pulsarClient.NewReader(Schema.String)
         .StartMessageId(messageId)
         .Topic(topicName)
+        .StateChangedHandler(LogState)
         .Create();
 
     private IPulsarClient CreateClient()