Adding new exceptions.
Adding new way of monitoring reader, consumer and producer state.
Updating NuGet packages.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 3bc70e9..914fc8a 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -40,12 +40,11 @@
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
             var consumer = client.NewConsumer()
+                .StateChangedHandler(Monitor)
                 .SubscriptionName("MySubscription")
                 .Topic(myTopic)
                 .Create();
 
-            var monitoring = Monitor(consumer);
-
             var cts = new CancellationTokenSource();
 
             var consuming = ConsumeMessages(consumer, cts.Token);
@@ -59,8 +58,6 @@
             await consuming;
 
             await consumer.DisposeAsync();
-
-            await monitoring;
         }
 
         private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -79,18 +76,9 @@
             catch (OperationCanceledException) { }
         }
 
-        private static async Task Monitor(IConsumer consumer)
+        private static void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
         {
-            await Task.Yield();
-
-            var state = ConsumerState.Disconnected;
-
-            while (true)
-            {
-                var stateChanged = await consumer.StateChangedFrom(state);
-                state = stateChanged.ConsumerState;
-
-                var stateMessage = state switch
+                var stateMessage = stateChanged.ConsumerState switch
                 {
                     ConsumerState.Active => "is active",
                     ConsumerState.Inactive => "is inactive",
@@ -98,15 +86,11 @@
                     ConsumerState.Closed => "has closed",
                     ConsumerState.ReachedEndOfTopic => "has reached end of topic",
                     ConsumerState.Faulted => "has faulted",
-                    _ => $"has an unknown state '{state}'"
+                    _ => $"has an unknown state '{stateChanged.ConsumerState}'"
                 };
 
                 var topic = stateChanged.Consumer.Topic;
                 Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
-
-                if (consumer.IsFinalState(state))
-                    return;
-            }
         }
     }
 }
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 7fe7d58..26f7f51 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -39,11 +39,10 @@
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
             var producer = client.NewProducer()
+                .StateChangedHandler(Monitor)
                 .Topic(myTopic)
                 .Create();
 
-            var monitoring = Monitor(producer);
-
             var cts = new CancellationTokenSource();
 
             var producing = ProduceMessages(producer, cts.Token);
@@ -57,8 +56,6 @@
             await producing;
 
             await producer.DisposeAsync();
-
-            await monitoring;
         }
 
         private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -82,32 +79,19 @@
             { }
         }
 
-        private static async Task Monitor(IProducer producer)
+        private static void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
         {
-            await Task.Yield();
-
-            var state = ProducerState.Disconnected;
-
-            while (true)
+            var stateMessage = stateChanged.ProducerState switch
             {
-                var stateChanged = await producer.StateChangedFrom(state).ConfigureAwait(false);
-                state = stateChanged.ProducerState;
+                ProducerState.Connected => "is connected",
+                ProducerState.Disconnected => "is disconnected",
+                ProducerState.Closed => "has closed",
+                ProducerState.Faulted => "has faulted",
+                _ => $"has an unknown state '{stateChanged.ProducerState}'"
+            };
 
-                var stateMessage = state switch
-                {
-                    ProducerState.Connected => "is connected",
-                    ProducerState.Disconnected => "is disconnected",
-                    ProducerState.Closed => "has closed",
-                    ProducerState.Faulted => "has faulted",
-                    _ => $"has an unknown state '{state}'"
-                };
-
-                var topic = stateChanged.Producer.Topic;
-                Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
-
-                if (producer.IsFinalState(state))
-                    return;
-            }
+            var topic = stateChanged.Producer.Topic;
+            Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
         }
     }
 }
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index bb29458..26289d3 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -41,11 +41,10 @@
 
             var reader = client.NewReader()
                 .StartMessageId(MessageId.Earliest)
+                .StateChangedHandler(Monitor)
                 .Topic(myTopic)
                 .Create();
 
-            var monitoring = Monitor(reader);
-
             var cts = new CancellationTokenSource();
 
             var reading = ReadMessages(reader, cts.Token);
@@ -59,8 +58,6 @@
             await reading;
 
             await reader.DisposeAsync();
-
-            await monitoring;
         }
 
         private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -78,33 +75,20 @@
             catch (OperationCanceledException) { }
         }
 
-        private static async Task Monitor(IReader reader)
+        private static void Monitor(ReaderStateChanged stateChanged, CancellationToken cancellationToken)
         {
-            await Task.Yield();
-
-            var state = ReaderState.Disconnected;
-
-            while (true)
+            var stateMessage = stateChanged.ReaderState switch
             {
-                var stateChanged = await reader.StateChangedFrom(state);
-                state = stateChanged.ReaderState;
+                ReaderState.Connected => "is connected",
+                ReaderState.Disconnected => "is disconnected",
+                ReaderState.Closed => "has closed",
+                ReaderState.ReachedEndOfTopic => "has reached end of topic",
+                ReaderState.Faulted => "has faulted",
+                _ => $"has an unknown state '{stateChanged.ReaderState}'"
+            };
 
-                var stateMessage = state switch
-                {
-                    ReaderState.Connected => "is connected",
-                    ReaderState.Disconnected => "is disconnected",
-                    ReaderState.Closed => "has closed",
-                    ReaderState.ReachedEndOfTopic => "has reached end of topic",
-                    ReaderState.Faulted => "has faulted",
-                    _ => $"has an unknown state '{state}'"
-                };
-
-                var topic = stateChanged.Reader.Topic;
-                Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
-
-                if (reader.IsFinalState(state))
-                    return;
-            }
+            var topic = stateChanged.Reader.Topic;
+            Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
         }
     }
 }
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 44fd068..4de94f1 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,6 +14,10 @@
 
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A consumer building abstraction.
     /// </summary>
@@ -30,21 +34,36 @@
         IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
 
         /// <summary>
-        /// Set the priority level for the shared subscription consumer. The default is 0.
-        /// </summary>
-        IConsumerBuilder PriorityLevel(int priorityLevel);
-
-        /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
         IConsumerBuilder MessagePrefetchCount(uint count);
 
         /// <summary>
+        /// Set the priority level for the shared subscription consumer. The default is 0.
+        /// </summary>
+        IConsumerBuilder PriorityLevel(int priorityLevel);
+
+        /// <summary>
         /// Whether to read from the compacted topic. The default is 'false'.
         /// </summary>
         IConsumerBuilder ReadCompacted(bool readCompacted);
 
         /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// Set the subscription name for this consumer. This is required.
         /// </summary>
         IConsumerBuilder SubscriptionName(string name);
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
new file mode 100644
index 0000000..04d6b60
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// An state change handling abstraction.
+    /// </summary>
+    public interface IHandleStateChanged<TStateChanged>
+    {
+        /// <summary>
+        /// Called after a state has changed.
+        /// </summary>
+        ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken);
+
+        /// <summary>
+        /// The cancellation token to use when waiting for and handling state changes.
+        /// </summary>
+        CancellationToken CancellationToken { get; }
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 386e09f..2cdc6ef 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -14,20 +14,39 @@
 
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A producer building abstraction.
     /// </summary>
     public interface IProducerBuilder
     {
         /// <summary>
+        /// Set the initial sequence id. The default is 0.
+        /// </summary>
+        IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+
+        /// <summary>
         /// Set the producer name. This is optional.
         /// </summary>
         IProducerBuilder ProducerName(string name);
 
         /// <summary>
-        /// Set the initial sequence id. The default is 0.
+        /// Register a state changed handler.
         /// </summary>
-        IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+        IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Set the topic for this producer. This is required.
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
index 4363697..3e97705 100644
--- a/src/DotPulsar/Abstractions/IReaderBuilder.cs
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -14,17 +14,16 @@
 
 namespace DotPulsar.Abstractions
 {
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
     /// <summary>
     /// A reader building abstraction.
     /// </summary>
     public interface IReaderBuilder
     {
         /// <summary>
-        /// Set the reader name. This is optional.
-        /// </summary>
-        IReaderBuilder ReaderName(string name);
-
-        /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
         IReaderBuilder MessagePrefetchCount(uint count);
@@ -35,11 +34,31 @@
         IReaderBuilder ReadCompacted(bool readCompacted);
 
         /// <summary>
+        /// Set the reader name. This is optional.
+        /// </summary>
+        IReaderBuilder ReaderName(string name);
+
+        /// <summary>
         /// The initial reader position is set to the specified message id. This is required.
         /// </summary>
         IReaderBuilder StartMessageId(MessageId messageId);
 
         /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Register a state changed handler.
+        /// </summary>
+        IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// Set the topic for this reader. This is required.
         /// </summary>
         IReaderBuilder Topic(string topic);
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index a07d529..3d12d9f 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar
 {
+    using DotPulsar.Abstractions;
+
     /// <summary>
     /// The consumer building options.
     /// </summary>
@@ -66,21 +68,26 @@
         public SubscriptionInitialPosition InitialPosition { get; set; }
 
         /// <summary>
-        /// Set the priority level for the shared subscription consumer. The default is 0.
-        /// </summary>
-        public int PriorityLevel { get; set; }
-
-        /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
         public uint MessagePrefetchCount { get; set; }
 
         /// <summary>
+        /// Set the priority level for the shared subscription consumer. The default is 0.
+        /// </summary>
+        public int PriorityLevel { get; set; }
+
+        /// <summary>
         /// Whether to read from the compacted topic. The default is 'false'.
         /// </summary>
         public bool ReadCompacted { get; set; }
 
         /// <summary>
+        /// Register a state changed handler. This is optional.
+        /// </summary>
+        public IHandleStateChanged<ConsumerStateChanged>? StateChangedHandler { get; set; }
+
+        /// <summary>
         /// Set the subscription name for this consumer. This is required.
         /// </summary>
         public string SubscriptionName { get; set; }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 9dcf9af..0fc4558 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -21,10 +21,10 @@
   </PropertyGroup>
 
   <ItemGroup>    
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.0" />    
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.1" />    
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="3.0.73" />
-    <PackageReference Include="System.IO.Pipelines" Version="5.0.0" />
+    <PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
diff --git a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
new file mode 100644
index 0000000..e106f5e
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    using System;
+
+    public sealed class InvalidTransactionStatusException : DotPulsarException
+    {
+        public InvalidTransactionStatusException(string message) : base(message) { }
+
+        public InvalidTransactionStatusException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/NotAllowedErrorException.cs b/src/DotPulsar/Exceptions/NotAllowedErrorException.cs
new file mode 100644
index 0000000..d92e2aa
--- /dev/null
+++ b/src/DotPulsar/Exceptions/NotAllowedErrorException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    using System;
+
+    public sealed class NotAllowedErrorException : DotPulsarException
+    {
+        public NotAllowedErrorException(string message) : base(message) { }
+
+        public NotAllowedErrorException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/TransactionConflictException.cs b/src/DotPulsar/Exceptions/TransactionConflictException.cs
new file mode 100644
index 0000000..c358d39
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TransactionConflictException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    using System;
+
+    public sealed class TransactionConflictException : DotPulsarException
+    {
+        public TransactionConflictException(string message) : base(message) { }
+
+        public TransactionConflictException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
new file mode 100644
index 0000000..33d9158
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    using System;
+
+    public sealed class TransactionCoordinatorNotFoundException : DotPulsarException
+    {
+        public TransactionCoordinatorNotFoundException(string message) : base(message) { }
+
+        public TransactionCoordinatorNotFoundException(string message, Exception innerException) : base(message, innerException) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/ActionStateChangedHandler.cs b/src/DotPulsar/Internal/ActionStateChangedHandler.cs
new file mode 100644
index 0000000..804d7b3
--- /dev/null
+++ b/src/DotPulsar/Internal/ActionStateChangedHandler.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class ActionStateChangedHandler<TStateChanged> : IHandleStateChanged<TStateChanged>
+    {
+        private readonly Action<TStateChanged, CancellationToken> _stateChangedHandler;
+
+        public ActionStateChangedHandler(Action<TStateChanged, CancellationToken> stateChangedHandler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = stateChangedHandler;
+            CancellationToken = cancellationToken;
+        }
+
+        public CancellationToken CancellationToken { get; }
+
+        public ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler(stateChanged, CancellationToken);
+            return new ValueTask();
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index edd856b..9fc42c3 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -105,7 +105,6 @@
                 else
                     await sslStream.DisposeAsync().ConfigureAwait(false);
 #endif
-
                 throw;
             }
         }
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 15b2875..6deee54 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,6 +16,9 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
 
     public sealed class ConsumerBuilder : IConsumerBuilder
     {
@@ -28,6 +31,7 @@
         private string? _subscriptionName;
         private SubscriptionType _subscriptionType;
         private string? _topic;
+        private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
 
         public ConsumerBuilder(IPulsarClient pulsarClient)
         {
@@ -51,24 +55,42 @@
             return this;
         }
 
-        public IConsumerBuilder PriorityLevel(int priorityLevel)
-        {
-            _priorityLevel = priorityLevel;
-            return this;
-        }
-
         public IConsumerBuilder MessagePrefetchCount(uint count)
         {
             _messagePrefetchCount = count;
             return this;
         }
 
+        public IConsumerBuilder PriorityLevel(int priorityLevel)
+        {
+            _priorityLevel = priorityLevel;
+            return this;
+        }
+
         public IConsumerBuilder ReadCompacted(bool readCompacted)
         {
             _readCompacted = readCompacted;
             return this;
         }
 
+        public IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
+        {
+            _stateChangedHandler = handler;
+            return this;
+        }
+
+        public IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
+            return this;
+        }
+
+        public IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
+            return this;
+        }
+
         public IConsumerBuilder SubscriptionName(string name)
         {
             _subscriptionName = name;
@@ -102,6 +124,7 @@
                 MessagePrefetchCount = _messagePrefetchCount,
                 PriorityLevel = _priorityLevel,
                 ReadCompacted = _readCompacted,
+                StateChangedHandler = _stateChangedHandler,
                 SubscriptionType = _subscriptionType
             };
 
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 4b182dc..6df4e0f 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -54,7 +54,9 @@
                 ServerError.ConsumerNotFound => new ConsumerNotFoundException(message),
                 ServerError.IncompatibleSchema => new IncompatibleSchemaException(message),
                 ServerError.InvalidTopicName => new InvalidTopicNameException(message),
+                ServerError.InvalidTxnStatus => new InvalidTransactionStatusException(message),
                 ServerError.MetadataError => new MetadataException(message),
+                ServerError.NotAllowedError => new NotAllowedErrorException(message),
                 ServerError.PersistenceError => new PersistenceException(message),
                 ServerError.ProducerBlockedQuotaExceededError => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
                 ServerError.ProducerBlockedQuotaExceededException => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
@@ -64,8 +66,10 @@
                 ServerError.TooManyRequests => new TooManyRequestsException(message),
                 ServerError.TopicNotFound => new TopicNotFoundException(message),
                 ServerError.TopicTerminatedError => new TopicTerminatedException(message),
-                ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
+                ServerError.TransactionConflict => new TransactionConflictException(message),
+                ServerError.TransactionCoordinatorNotFound => new TransactionCoordinatorNotFoundException(message),
                 ServerError.UnknownError => new UnknownException($"{message}. Error code: {error}"),
+                ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
                 _ => new UnknownException($"{message}. Error code: {error}")
             });
 
diff --git a/src/DotPulsar/Internal/FuncStateChangedHandler.cs b/src/DotPulsar/Internal/FuncStateChangedHandler.cs
new file mode 100644
index 0000000..ab97199
--- /dev/null
+++ b/src/DotPulsar/Internal/FuncStateChangedHandler.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class FuncStateChangedHandler<TStateChanged> : IHandleStateChanged<TStateChanged>
+    {
+        private readonly Func<TStateChanged, CancellationToken, ValueTask> _stateChangedHandler;
+
+        public FuncStateChangedHandler(Func<TStateChanged, CancellationToken, ValueTask> stateChangedHandler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = stateChangedHandler;
+            CancellationToken = cancellationToken;
+        }
+
+        public CancellationToken CancellationToken { get; }
+
+        public async ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken)
+            => await _stateChangedHandler(stateChanged, cancellationToken).ConfigureAwait(false);
+    }
+}
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
new file mode 100644
index 0000000..aafe349
--- /dev/null
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using DotPulsar.Abstractions;
+    using System.Threading.Tasks;
+
+    public static class StateMonitor
+
+    {
+        public static async Task MonitorProducer(IProducer producer, IHandleStateChanged<ProducerStateChanged> handler)
+        {
+            await Task.Yield();
+
+            var state = ProducerState.Disconnected;
+
+            while (!producer.IsFinalState(state))
+            {
+                var stateChanged = await producer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+                state = stateChanged.ProducerState;
+                try
+                {
+                    await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+                }
+                catch
+                {
+                    // Ignore
+                }
+            }
+        }
+
+        public static async Task MonitorConsumer(IConsumer consumer, IHandleStateChanged<ConsumerStateChanged> handler)
+        {
+            await Task.Yield();
+
+            var state = ConsumerState.Disconnected;
+
+            while (!consumer.IsFinalState(state))
+            {
+                var stateChanged = await consumer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+                state = stateChanged.ConsumerState;
+                try
+                {
+                    await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+                }
+                catch
+                {
+                    // Ignore
+                }
+            }
+        }
+
+        public static async Task MonitorReader(IReader reader, IHandleStateChanged<ReaderStateChanged> handler)
+        {
+            await Task.Yield();
+
+            var state = ReaderState.Disconnected;
+
+            while (!reader.IsFinalState(state))
+            {
+                var stateChanged = await reader.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+                state = stateChanged.ReaderState;
+                try
+                {
+                    await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+                }
+                catch
+                {
+                    // Ignore
+                }
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 8916cd0..941c0aa 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -16,6 +16,9 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
 
     public sealed class ProducerBuilder : IProducerBuilder
     {
@@ -23,6 +26,7 @@
         private string? _producerName;
         private ulong _initialSequenceId;
         private string? _topic;
+        private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
 
         public ProducerBuilder(IPulsarClient pulsarClient)
         {
@@ -30,15 +34,33 @@
             _initialSequenceId = ProducerOptions.DefaultInitialSequenceId;
         }
 
+        public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+        {
+            _initialSequenceId = initialSequenceId;
+            return this;
+        }
+
         public IProducerBuilder ProducerName(string name)
         {
             _producerName = name;
             return this;
         }
 
-        public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+        public IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
         {
-            _initialSequenceId = initialSequenceId;
+            _stateChangedHandler = handler;
+            return this;
+        }
+
+        public IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
+            return this;
+        }
+
+        public IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
             return this;
         }
 
@@ -56,7 +78,8 @@
             var options = new ProducerOptions(_topic!)
             {
                 InitialSequenceId = _initialSequenceId,
-                ProducerName = _producerName
+                ProducerName = _producerName,
+                StateChangedHandler = _stateChangedHandler
             };
 
             return _pulsarClient.CreateProducer(options);
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index 41ff928..4936cd0 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -16,6 +16,9 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
 
     public sealed class ReaderBuilder : IReaderBuilder
     {
@@ -25,6 +28,7 @@
         private bool _readCompacted;
         private MessageId? _startMessageId;
         private string? _topic;
+        private IHandleStateChanged<ReaderStateChanged>? _stateChangedHandler;
 
         public ReaderBuilder(IPulsarClient pulsarClient)
         {
@@ -33,12 +37,6 @@
             _readCompacted = ReaderOptions.DefaultReadCompacted;
         }
 
-        public IReaderBuilder ReaderName(string name)
-        {
-            _readerName = name;
-            return this;
-        }
-
         public IReaderBuilder MessagePrefetchCount(uint count)
         {
             _messagePrefetchCount = count;
@@ -51,12 +49,36 @@
             return this;
         }
 
+        public IReaderBuilder ReaderName(string name)
+        {
+            _readerName = name;
+            return this;
+        }
+
         public IReaderBuilder StartMessageId(MessageId messageId)
         {
             _startMessageId = messageId;
             return this;
         }
 
+        public IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
+        {
+            _stateChangedHandler = handler;
+            return this;
+        }
+
+        public IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
+            return this;
+        }
+
+        public IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+        {
+            _stateChangedHandler = new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
+            return this;
+        }
+
         public IReaderBuilder Topic(string topic)
         {
             _topic = topic;
@@ -75,7 +97,8 @@
             {
                 MessagePrefetchCount = _messagePrefetchCount,
                 ReadCompacted = _readCompacted,
-                ReaderName = _readerName
+                ReaderName = _readerName,
+                StateChangedHandler = _stateChangedHandler
             };
 
             return _pulsarClient.CreateReader(options);
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 906b72d..7002742 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar
 {
+    using DotPulsar.Abstractions;
+
     /// <summary>
     /// The producer building options.
     /// </summary>
@@ -31,14 +33,19 @@
         }
 
         /// <summary>
+        /// Set the initial sequence id. The default is 0.
+        /// </summary>
+        public ulong InitialSequenceId { get; set; }
+
+        /// <summary>
         /// Set the producer name. This is optional.
         /// </summary>
         public string? ProducerName { get; set; }
 
         /// <summary>
-        /// Set the initial sequence id. The default is 0.
+        /// Register a state changed handler. This is optional.
         /// </summary>
-        public ulong InitialSequenceId { get; set; }
+        public IHandleStateChanged<ProducerStateChanged>? StateChangedHandler { get; set; }
 
         /// <summary>
         /// Set the topic for this producer. This is required.
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 982ee3b..e2c0577 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -58,6 +58,8 @@
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+            if (options.StateChangedHandler is not null)
+                _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
             var process = new ProducerProcess(correlationId, stateManager, factory, producer);
             _processManager.Add(process);
             process.Start();
@@ -73,9 +75,10 @@
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
-
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
             var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            if (options.StateChangedHandler is not null)
+                _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
             var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
             _processManager.Add(process);
             process.Start();
@@ -93,6 +96,8 @@
             var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
             var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            if (options.StateChangedHandler is not null)
+                _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
             var process = new ReaderProcess(correlationId, stateManager, factory, reader);
             _processManager.Add(process);
             process.Start();
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
index 4021087..3baa295 100644
--- a/src/DotPulsar/ReaderOptions.cs
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar
 {
+    using DotPulsar.Abstractions;
+
     /// <summary>
     /// The reader building options.
     /// </summary>
@@ -38,11 +40,6 @@
         }
 
         /// <summary>
-        /// Set the reader name. This is optional.
-        /// </summary>
-        public string? ReaderName { get; set; }
-
-        /// <summary>
         /// Number of messages that will be prefetched. The default is 1000.
         /// </summary>
         public uint MessagePrefetchCount { get; set; }
@@ -53,11 +50,21 @@
         public bool ReadCompacted { get; set; }
 
         /// <summary>
+        /// Set the reader name. This is optional.
+        /// </summary>
+        public string? ReaderName { get; set; }
+
+        /// <summary>
         /// The initial reader position is set to the specified message id. This is required.
         /// </summary>
         public MessageId StartMessageId { get; set; }
 
         /// <summary>
+        /// Register a state changed handler. This is optional.
+        /// </summary>
+        public IHandleStateChanged<ReaderStateChanged>? StateChangedHandler { get; set; }
+
+        /// <summary>
         /// Set the topic for this reader. This is required.
         /// </summary>
         public string Topic { get; set; }