When monitoring state we now return a class giving us both the new state and the Consumer/Producer/Reader.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 59fe108..36f8497 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -79,20 +79,22 @@
 
             while (true)
             {
-                state = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ConsumerState;
 
                 var stateMessage = state switch
                 {
-                    ConsumerState.Active => "The consumer is active",
-                    ConsumerState.Inactive => "The consumer is inactive",
-                    ConsumerState.Disconnected => "The consumer is disconnected",
-                    ConsumerState.Closed => "The consumer has closed",
-                    ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
-                    ConsumerState.Faulted => "The consumer has faulted",
-                    _ => $"The consumer has an unknown state '{state}'"
+                    ConsumerState.Active => "is active",
+                    ConsumerState.Inactive => "is inactive",
+                    ConsumerState.Disconnected => "is disconnected",
+                    ConsumerState.Closed => "has closed",
+                    ConsumerState.ReachedEndOfTopic => "has reached end of topic",
+                    ConsumerState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Consumer.Topic;
+                Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
 
                 if (consumer.IsFinalState(state))
                     return;
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 73ddff8..664793f 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -78,18 +78,20 @@
 
             while (true)
             {
-                state = await producer.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await producer.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ProducerState;
 
                 var stateMessage = state switch
                 {
-                    ProducerState.Connected => "The producer is connected",
-                    ProducerState.Disconnected => "The producer is disconnected",
-                    ProducerState.Closed => "The producer has closed",
-                    ProducerState.Faulted => "The producer has faulted",
-                    _ => $"The producer has an unknown state '{state}'"
+                    ProducerState.Connected => "is connected",
+                    ProducerState.Disconnected => "is disconnected",
+                    ProducerState.Closed => "has closed",
+                    ProducerState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Producer.Topic;
+                Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
 
                 if (producer.IsFinalState(state))
                     return;
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b42275d..d7c9f6c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -78,19 +78,21 @@
 
             while (true)
             {
-                state = await reader.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await reader.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ReaderState;
 
                 var stateMessage = state switch
                 {
-                    ReaderState.Connected => "The reader is connected",
-                    ReaderState.Disconnected => "The reader is disconnected",
-                    ReaderState.Closed => "The reader has closed",
-                    ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
-                    ReaderState.Faulted => "The reader has faulted",
-                    _ => $"The reader has an unknown state '{state}'"
+                    ReaderState.Connected => "is connected",
+                    ReaderState.Disconnected => "is disconnected",
+                    ReaderState.Closed => "has closed",
+                    ReaderState.ReachedEndOfTopic => "has reached end of topic",
+                    ReaderState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Reader.Topic;
+                Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
 
                 if (reader.IsFinalState(state))
                     return;
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 4d33ab7..70bfb99 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@
     /// <summary>
     /// A consumer abstraction.
     /// </summary>
-    public interface IConsumer : IStateChanged<ConsumerState>, IAsyncDisposable
+    public interface IConsumer : IAsyncDisposable
     {
         /// <summary>
         /// Acknowledge the consumption of a single message.
@@ -50,6 +50,22 @@
         ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ConsumerState state);
+
+        /// <summary>
         /// Get an IAsyncEnumerable for consuming messages
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
@@ -60,6 +76,28 @@
         ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the consumer.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 11e46d1..bd37d46 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,9 +22,25 @@
     /// <summary>
     /// A producer abstraction.
     /// </summary>
-    public interface IProducer : IStateChanged<ProducerState>, IAsyncDisposable
+    public interface IProducer : IAsyncDisposable
     {
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ProducerState state);
+
+        /// <summary>
         /// Sends a message.
         /// </summary>
         ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken = default);
@@ -55,6 +71,28 @@
         ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the producer.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 091551c..81bc9b4 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,18 +17,57 @@
     using System;
     using System.Collections.Generic;
     using System.Threading;
+    using System.Threading.Tasks;
 
     /// <summary>
     /// A reader abstraction.
     /// </summary>
-    public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+    public interface IReader : IAsyncDisposable
     {
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ReaderState state);
+
+        /// <summary>
         /// Get an IAsyncEnumerable for reading messages
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the reader.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/ConsumerStateChanged.cs b/src/DotPulsar/ConsumerStateChanged.cs
new file mode 100644
index 0000000..1f0a80e
--- /dev/null
+++ b/src/DotPulsar/ConsumerStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+
+    public sealed class ConsumerStateChanged
+    {
+        internal ConsumerStateChanged(IConsumer consumer, ConsumerState consumerState)
+        {
+            Consumer = consumer;
+            ConsumerState = consumerState;
+        }
+
+        public IConsumer Consumer { get; }
+        public ConsumerState ConsumerState { get; }
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
similarity index 97%
rename from src/DotPulsar/Abstractions/IStateChanged.cs
rename to src/DotPulsar/Internal/Abstractions/IStateChanged.cs
index 079b9be..732f13e 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal.Abstractions
 {
     using System.Threading;
     using System.Threading.Tasks;
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 9fbdc37..2208606 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -57,11 +57,17 @@
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
         }
 
-        public async ValueTask<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(this, newState);
+        }
 
-        public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index d9e5dd0..28e0fe2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -53,11 +53,17 @@
             _eventRegister.Register(new ProducerCreated(_correlationId, this));
         }
 
-        public ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
-            => _state.StateChangedTo(state, cancellationToken);
+        public async ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(this, newState);
+        }
 
-        public ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
-            => _state.StateChangedFrom(state, cancellationToken);
+        public async ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 5b7d740..bb659e7 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -54,11 +54,17 @@
             _eventRegister.Register(new ReaderCreated(_correlationId, this));
         }
 
-        public async ValueTask<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(this, newState);
+        }
 
-        public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/ProducerStateChanged.cs b/src/DotPulsar/ProducerStateChanged.cs
new file mode 100644
index 0000000..f866954
--- /dev/null
+++ b/src/DotPulsar/ProducerStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+
+    public sealed class ProducerStateChanged
+    {
+        internal ProducerStateChanged(IProducer producer, ProducerState producerState)
+        {
+            Producer = producer;
+            ProducerState = producerState;
+        }
+
+        public IProducer Producer { get; }
+        public ProducerState ProducerState { get; }
+    }
+}
diff --git a/src/DotPulsar/ReaderStateChanged.cs b/src/DotPulsar/ReaderStateChanged.cs
new file mode 100644
index 0000000..4133abd
--- /dev/null
+++ b/src/DotPulsar/ReaderStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+
+    public sealed class ReaderStateChanged
+    {
+        internal ReaderStateChanged(IReader reader, ReaderState readerState)
+        {
+            Reader = reader;
+            ReaderState = readerState;
+        }
+
+        public IReader Reader { get; }
+        public ReaderState ReaderState { get; }
+    }
+}