Refactoring and moving that can be moved to extension methods.
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 48fe671..7fb4e16 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,97 +22,19 @@
     /// <summary>
     /// A consumer abstraction.
     /// </summary>
-    public interface IConsumer : IAsyncDisposable
+    public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState<ConsumerState>, IAsyncDisposable
     {
         /// <summary>
-        /// Acknowledge the consumption of a single message.
-        /// </summary>
-        ValueTask Acknowledge(Message message, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// Acknowledge the consumption of a single message using the MessageId.
         /// </summary>
         ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
-        /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
-        /// </summary>
-        ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId.
         /// </summary>
         ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
-        /// Get the MessageId of the last message on the topic.
-        /// </summary>
-        ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Ask whether the current state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState();
-
-        /// <summary>
-        /// Ask whether the provided state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState(ConsumerState state);
-
-        /// <summary>
-        /// Get an IAsyncEnumerable for consuming messages.
-        /// </summary>
-        IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this consumer to a specific MessageId.
-        /// </summary>
-        ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this consumer to a specific message publish time using unix time in milliseconds.
-        /// </summary>
-        ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this consumer to a specific message publish time using an UTC DateTime.
-        /// </summary>
-        ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this consumer to a specific message publish time using a DateTimeOffset.
-        /// </summary>
-        ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change to a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change from a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// The topic of the consumer.
         /// </summary>
         string Topic { get; }
@@ -125,11 +47,11 @@
         /// <summary>
         /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
         /// </summary>
-        ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken);
+        ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
         /// </summary>
-        ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken);
+        ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
similarity index 63%
copy from src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
copy to src/DotPulsar/Abstractions/IGetLastMessageId.cs
index c651dbf..0c3059a 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
@@ -12,13 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
 {
     using System.Threading;
     using System.Threading.Tasks;
 
-    public interface IReaderChannelFactory
+    /// <summary>
+    /// An abstraction for getting the last message id.
+    /// </summary>
+    public interface IGetLastMessageId
     {
-        Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
+        /// <summary>
+        /// Get the MessageId of the last message on the topic.
+        /// </summary>
+        ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
index 04d6b60..8be3a99 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
@@ -25,7 +25,7 @@
         /// <summary>
         /// Called after a state has changed.
         /// </summary>
-        ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken);
+        ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// The cancellation token to use when waiting for and handling state changes.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 03ff03e..27c59ee 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -15,84 +15,13 @@
 namespace DotPulsar.Abstractions
 {
     using System;
-    using System.Buffers;
-    using System.Threading;
-    using System.Threading.Tasks;
 
     /// <summary>
     /// A producer abstraction.
     /// </summary>
-    public interface IProducer : IAsyncDisposable
+    public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
     {
         /// <summary>
-        /// Ask whether the current state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState();
-
-        /// <summary>
-        /// Ask whether the provided state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState(ProducerState state);
-
-        /// <summary>
-        /// Sends a message.
-        /// </summary>
-        ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Sends a message.
-        /// </summary>
-        ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Sends a message.
-        /// </summary>
-        ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Sends a message with metadata.
-        /// </summary>
-        ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Sends a message with metadata.
-        /// </summary>
-        ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Sends a message with metadata.
-        /// </summary>
-        ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change to a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change from a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// The topic of the producer.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 1023419..6b4207b 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -15,84 +15,13 @@
 namespace DotPulsar.Abstractions
 {
     using System;
-    using System.Collections.Generic;
-    using System.Threading;
-    using System.Threading.Tasks;
 
     /// <summary>
     /// A reader abstraction.
     /// </summary>
-    public interface IReader : IAsyncDisposable
+    public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
     {
         /// <summary>
-        /// Ask whether the current state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState();
-
-        /// <summary>
-        /// Ask whether the provided state is final, meaning that it will never change.
-        /// </summary>
-        /// <returns>
-        /// True if it's final and False if it's not.
-        /// </returns>
-        bool IsFinalState(ReaderState state);
-
-        /// <summary>
-        /// Get the MessageId of the last message on the topic.
-        /// </summary>
-        ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Get an IAsyncEnumerable for reading messages
-        /// </summary>
-        IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this reader to a specific MessageId.
-        /// </summary>
-        ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this reader to a specific message publish time using unix time in milliseconds.
-        /// </summary>
-        ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this reader to a specific message publish time using an UTC DateTime.
-        /// </summary>
-        ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Reset the subscription associated with this reader to a specific message publish time using a DateTimeOffset.
-        /// </summary>
-        ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change to a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
-        /// Wait for the state to change from a specific state.
-        /// </summary>
-        /// <returns>
-        /// The current state.
-        /// </returns>
-        /// <remarks>
-        /// If the state change to a final state, then all awaiting tasks will complete.
-        /// </remarks>
-        ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// The topic of the reader.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Abstractions/IReceive.cs
similarity index 66%
rename from src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
rename to src/DotPulsar/Abstractions/IReceive.cs
index c651dbf..2ff275c 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Abstractions/IReceive.cs
@@ -12,13 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
 {
     using System.Threading;
     using System.Threading.Tasks;
 
-    public interface IReaderChannelFactory
+    /// <summary>
+    /// An abstraction for receiving a single message.
+    /// </summary>
+    public interface IReceive
     {
-        Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
+        /// <summary>
+        /// Receive a single message.
+        /// </summary>
+        ValueTask<Message> Receive(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/ISeek.cs b/src/DotPulsar/Abstractions/ISeek.cs
new file mode 100644
index 0000000..373c619
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISeek.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// An abstraction for seeking.
+    /// </summary>
+    public interface ISeek
+    {
+        /// <summary>
+        /// Reset the cursor associated with the consumer or reader to a specific MessageId.
+        /// </summary>
+        ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the cursor associated with the consumer or reader to a specific message publish time using unix time in milliseconds.
+        /// </summary>
+        ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
new file mode 100644
index 0000000..2948c34
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// An abstraction for sending a message.
+    /// </summary>
+    public interface ISend
+    {
+        /// <summary>
+        /// Sends a message.
+        /// </summary>
+        ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Sends a message with metadata.
+        /// </summary>
+        ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Abstractions/IState.cs b/src/DotPulsar/Abstractions/IState.cs
new file mode 100644
index 0000000..48a975e
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IState.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// A state change monitoring abstraction.
+    /// </summary>
+    public interface IState<TState> where TState : notnull
+    {
+        /// <summary>
+        /// Ask whether the current state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(TState state);
+
+        /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<TState> OnStateChangeTo(TState state, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        ValueTask<TState> OnStateChangeFrom(TState state, CancellationToken cancellationToken = default);
+    }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
new file mode 100644
index 0000000..b61971c
--- /dev/null
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// Extensions for IConsumer.
+    /// </summary>
+    public static class ConsumerExtensions
+    {
+        /// <summary>
+        /// Acknowledge the consumption of a single message.
+        /// </summary>
+        public static async ValueTask Acknowledge(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+            => await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
+        /// </summary>
+        public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+            => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await consumer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(consumer, newState);
+        }
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await consumer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(consumer, newState);
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 9c3007c..8f02262 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -16,7 +16,12 @@
 {
     using Abstractions;
     using Internal;
+    using System.Threading;
+    using System.Threading.Tasks;
 
+    /// <summary>
+    /// Extensions for IProducer.
+    /// </summary>
     public static class ProducerExtensions
     {
         /// <summary>
@@ -24,5 +29,35 @@
         /// </summary>
         public static IMessageBuilder NewMessage(this IProducer producer)
             => new MessageBuilder(producer);
+
+        /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await producer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(producer, newState);
+        }
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static  async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await producer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(producer, newState);
+        }
     }
 }
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 77e95aa..574bce1 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -17,6 +17,9 @@
     using Abstractions;
     using Internal;
 
+    /// <summary>
+    /// Extensions for IPulsarClient.
+    /// </summary>
     public static class PulsarClientExtensions
     {
         /// <summary>
diff --git a/src/DotPulsar/Extensions/ReaderExtensions.cs b/src/DotPulsar/Extensions/ReaderExtensions.cs
new file mode 100644
index 0000000..86d093b
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReaderExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// Extensions for IReader.
+    /// </summary>
+    public static class ReaderExtensions
+    {
+        /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await reader.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(reader, newState);
+        }
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will complete.
+        /// </remarks>
+        public static async ValueTask<ReaderStateChanged> StateChangedFrom(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
+        {
+            var newState = await reader.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(reader, newState);
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/ReceiveExtensions.cs b/src/DotPulsar/Extensions/ReceiveExtensions.cs
new file mode 100644
index 0000000..c2262c6
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReceiveExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using System.Collections.Generic;
+    using System.Runtime.CompilerServices;
+    using System.Threading;
+
+    /// <summary>
+    /// Extensions for IReceive.
+    /// </summary>
+    public static class ReceiveExtensions
+    {
+        /// <summary>
+        /// Get an IAsyncEnumerable for receiving messages.
+        /// </summary>
+        public static async IAsyncEnumerable<Message> Messages(this IReceive receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+        {
+            while (!cancellationToken.IsCancellationRequested)
+                yield return await receiver.Receive(cancellationToken).ConfigureAwait(false);
+        }
+    }
+}
diff --git a/src/DotPulsar/Extensions/SeekExtensions.cs b/src/DotPulsar/Extensions/SeekExtensions.cs
new file mode 100644
index 0000000..b4f0d2c
--- /dev/null
+++ b/src/DotPulsar/Extensions/SeekExtensions.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using DotPulsar.Abstractions;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// Extensions for ISeek.
+    /// </summary>
+    public static class SeekExtensions
+    {
+        /// <summary>
+        /// Reset the cursor associated with the consumer or reader to a specific message publish time using an UTC DateTime.
+        /// </summary>
+        public static async ValueTask Seek(this ISeek seeker, DateTime publishTime, CancellationToken cancellationToken = default)
+            => await seeker.Seek((ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds(), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Reset the cursor associated with the consumer or reader to a specific message publish time using a DateTimeOffset.
+        /// </summary>
+        public static async ValueTask Seek(this ISeek seeker, DateTimeOffset publishTime, CancellationToken cancellationToken = default)
+            => await seeker.Seek((ulong) publishTime.ToUnixTimeMilliseconds(), cancellationToken).ConfigureAwait(false);
+    }
+}
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs b/src/DotPulsar/Extensions/SendExtensions.cs
new file mode 100644
index 0000000..41ace1e
--- /dev/null
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+    using Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    /// <summary>
+    /// Extensions for ISend.
+    /// </summary>
+    public static class SendExtensions
+    {
+        /// <summary>
+        /// Sends a message.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this ISend sender, byte[] data, CancellationToken cancellationToken = default)
+            => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this ISend sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+            => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message with metadata.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
+            => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message with metadata.
+        /// </summary>
+        public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+            => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+    }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 31140b2..ef09431 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -23,9 +23,9 @@
     {
         Task Send(CommandAck command, CancellationToken cancellationToken);
         Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
-        Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
-        Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
-        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+        Task Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+        Task Send(CommandSeek command, CancellationToken cancellationToken);
+        Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken);
         ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
deleted file mode 100644
index 55135ea..0000000
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions
-{
-    using DotPulsar.Internal.PulsarApi;
-    using System;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    public interface IReaderChannel : IAsyncDisposable
-    {
-        Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
-        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
-        ValueTask<Message> Receive(CancellationToken cancellationToken);
-        ValueTask ClosedByClient(CancellationToken cancellationToken);
-    }
-}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index c6e9dfc..d263df2 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -17,13 +17,13 @@
     using Abstractions;
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using DotPulsar.Internal.Extensions;
     using Events;
     using Microsoft.Extensions.ObjectPool;
     using PulsarApi;
     using System;
     using System.Collections.Generic;
     using System.Linq;
-    using System.Runtime.CompilerServices;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -59,17 +59,11 @@
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
         }
 
-        public async ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
-            return new ConsumerStateChanged(this, newState);
-        }
+        public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, CancellationToken cancellationToken)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
-            return new ConsumerStateChanged(this, newState);
-        }
+        public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState state, CancellationToken cancellationToken)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
         public bool IsFinalState()
             => _state.IsFinalState();
@@ -87,35 +81,28 @@
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
-        public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+        public async ValueTask<Message> Receive(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            while (!cancellationToken.IsCancellationRequested)
-                yield return await _executor.Execute(() => Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
-        private async ValueTask<Message> Receive(CancellationToken cancellationToken)
+        private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
             => await _channel.Receive(cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
-            => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
         public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
-            => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
-        public async ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
-            => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+            => await Acknowledge(messageId, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
-            => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+            => await Acknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             var command = new CommandRedeliverUnacknowledgedMessages();
-            command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+            command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData()));
             await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
@@ -131,16 +118,14 @@
         }
 
         private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
-        {
-            _ = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-        }
+            =>await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+            var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+            await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
@@ -148,23 +133,7 @@
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = publishTime };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-
-        public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-
-        public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+            await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
@@ -176,22 +145,21 @@
         }
 
         private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
-        {
-            var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-            return new MessageId(response.LastMessageId);
-        }
-
-        private async ValueTask<CommandSuccess> Seek(CommandSeek command, CancellationToken cancellationToken)
             => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+        private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
+            => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+
+        private async ValueTask Acknowledge(MessageId messageId, CommandAck.AckType ackType, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             var commandAck = _commandAckPool.Get();
             commandAck.Type = ackType;
-            commandAck.MessageIds.Clear();
-            commandAck.MessageIds.Add(messageIdData);
+            if (commandAck.MessageIds.Count == 0)
+                commandAck.MessageIds.Add(messageId.ToMessageIdData());
+            else
+                commandAck.MessageIds[0].MapFrom(messageId);
 
             try
             {
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 775e240..de109dd 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -21,7 +21,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
+    public sealed class ConsumerChannel : IConsumerChannel
     {
         private readonly ulong _id;
         private readonly AsyncQueue<MessagePackage> _queue;
@@ -88,7 +88,7 @@
 
                     return metadata.ShouldSerializeNumMessagesInBatch()
                         ? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
-                        : MessageFactory.Create(new MessageId(messageId), redeliveryCount, metadata, data);
+                        : MessageFactory.Create(messageId.ToMessageId(), redeliveryCount, metadata, data);
                 }
             }
         }
@@ -117,29 +117,27 @@
             await _connection.Send(command, cancellationToken).ConfigureAwait(false);
         }
 
-        public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+        public async Task Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
             var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
             response.Expect(BaseCommand.Type.Success);
-            return response.Success;
         }
 
-        public async Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+        public async Task Send(CommandSeek command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
             var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
             response.Expect(BaseCommand.Type.Success);
             _batchHandler.Clear();
-            return response.Success;
         }
 
-        public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+        public async Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
             var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
             response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
-            return response.GetLastMessageIdResponse;
+            return response.GetLastMessageIdResponse.LastMessageId.ToMessageId();
         }
 
         public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
new file mode 100644
index 0000000..857b00c
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions
+{
+    using DotPulsar.Internal.PulsarApi;
+
+    public static class MessageIdDataExtensions
+    {
+        public static MessageId ToMessageId(this MessageIdData messageIdData)
+            => new MessageId(messageIdData.LedgerId, messageIdData.EntryId, messageIdData.Partition, messageIdData.BatchIndex);
+
+        public static void MapFrom(this MessageIdData destination, MessageId source)
+        {
+            destination.LedgerId = source.LedgerId;
+            destination.EntryId = source.EntryId;
+            destination.Partition = source.Partition;
+            destination.BatchIndex = source.BatchIndex;
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index d7c7b02..c17ec19 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal
 {
     using DotPulsar.Abstractions;
+    using DotPulsar.Extensions;
     using Extensions;
     using System;
     using System.Threading;
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
index 28e0f9a..7a50044 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal
 {
     using DotPulsar.Abstractions;
+    using DotPulsar.Extensions;
     using System.Threading.Tasks;
 
     public static class StateMonitor
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index e463ff6..75e724f 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -22,7 +22,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel, IReaderChannel
+    public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel
     {
         public ValueTask DisposeAsync()
             => new ValueTask();
@@ -42,13 +42,13 @@
         public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
             => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+        public Task Send(CommandUnsubscribe command, CancellationToken cancellationToken)
             => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+        public Task Send(CommandSeek command, CancellationToken cancellationToken)
             => throw GetException();
 
-        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+        public Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
             => throw GetException();
 
         private static Exception GetException()
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2347eee..12e4625 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -17,6 +17,7 @@
     using Abstractions;
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using DotPulsar.Internal.Extensions;
     using Events;
     using Microsoft.Extensions.ObjectPool;
     using System;
@@ -60,17 +61,11 @@
             _eventRegister.Register(new ProducerCreated(_correlationId, this));
         }
 
-        public async ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
-            return new ProducerStateChanged(this, newState);
-        }
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
-            return new ProducerStateChanged(this, newState);
-        }
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
         public bool IsFinalState()
             => _state.IsFinalState();
@@ -88,12 +83,6 @@
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
-        public ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken)
-            => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
-        public ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
-            => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
@@ -110,12 +99,6 @@
             }
         }
 
-        public ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken)
-            => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
-
-        public ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
-            => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
-
         public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
@@ -138,7 +121,7 @@
         private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
-            return new MessageId(response.MessageId);
+            return response.MessageId.ToMessageId();
         }
 
         internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 571c5b6..ccbea59 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -20,8 +20,6 @@
     using DotPulsar.Internal.PulsarApi;
     using Events;
     using System;
-    using System.Collections.Generic;
-    using System.Runtime.CompilerServices;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -29,7 +27,7 @@
     {
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
-        private IReaderChannel _channel;
+        private IConsumerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ReaderState> _state;
         private int _isDisposed;
@@ -40,7 +38,7 @@
             Guid correlationId,
             string topic,
             IRegisterEvent eventRegister,
-            IReaderChannel initialChannel,
+            IConsumerChannel initialChannel,
             IExecute executor,
             IStateChanged<ReaderState> state)
         {
@@ -55,17 +53,11 @@
             _eventRegister.Register(new ReaderCreated(_correlationId, this));
         }
 
-        public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
-            return new ReaderStateChanged(this, newState);
-        }
+        public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, CancellationToken cancellationToken)
+            => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
-        {
-            var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
-            return new ReaderStateChanged(this, newState);
-        }
+        public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state, CancellationToken cancellationToken)
+            => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
 
         public bool IsFinalState()
             => _state.IsFinalState();
@@ -82,28 +74,24 @@
         }
 
         private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
-        {
-            var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-            return new MessageId(response.LastMessageId);
-        }
+            => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+        public async ValueTask<Message> Receive(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            while (!cancellationToken.IsCancellationRequested)
-                yield return await _executor.Execute(() => Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
-        private async ValueTask<Message> Receive(CancellationToken cancellationToken)
+        private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
             => await _channel.Receive(cancellationToken).ConfigureAwait(false);
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+            var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+            await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
@@ -111,23 +99,7 @@
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = publishTime };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-
-        public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-
-        public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+            await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask DisposeAsync()
@@ -140,10 +112,10 @@
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
-        private async ValueTask<CommandSuccess> Seek(CommandSeek command, CancellationToken cancellationToken)
+        private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
             => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-        internal async ValueTask SetChannel(IReaderChannel channel)
+        internal async ValueTask SetChannel(IConsumerChannel channel)
         {
             if (_isDisposed != 0)
             {
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 9efe033..58b31e1 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -20,7 +20,7 @@
     using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class ReaderChannelFactory : IReaderChannelFactory
+    public sealed class ReaderChannelFactory : IConsumerChannelFactory
     {
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
@@ -48,7 +48,7 @@
                 ConsumerName = options.ReaderName,
                 Durable = false,
                 ReadCompacted = options.ReadCompacted,
-                StartMessageId = options.StartMessageId.Data,
+                StartMessageId = options.StartMessageId.ToMessageIdData(),
                 Subscription = $"Reader-{Guid.NewGuid():N}",
                 Topic = options.Topic
             };
@@ -56,10 +56,10 @@
             _batchHandler = new BatchHandler(false);
         }
 
-        public async Task<IReaderChannel> Create(CancellationToken cancellationToken)
+        public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
             => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
 
-        private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellationToken)
+        private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
         {
             var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
             var messageQueue = new AsyncQueue<MessagePackage>();
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 67763c5..404de7e 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,13 +21,13 @@
     public sealed class ReaderProcess : Process
     {
         private readonly IStateManager<ReaderState> _stateManager;
-        private readonly IReaderChannelFactory _factory;
+        private readonly IConsumerChannelFactory _factory;
         private readonly Reader _reader;
 
         public ReaderProcess(
             Guid correlationId,
             IStateManager<ReaderState> stateManager,
-            IReaderChannelFactory factory,
+            IConsumerChannelFactory factory,
             Reader reader) : base(correlationId)
         {
             _stateManager = stateManager;
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index cdcc2eb..bc32039 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar
 {
+    using DotPulsar.Internal.Extensions;
     using Internal.PulsarApi;
     using System;
 
@@ -38,42 +39,36 @@
         /// </summary>
         public static MessageId Latest { get; }
 
-        internal MessageId(MessageIdData messageIdData)
-            => Data = messageIdData;
-
         /// <summary>
         /// Initializes a new instance using the specified ledgerId, entryId, partition and batchIndex.
         /// </summary>
         public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
-            => Data = new MessageIdData
-            {
-                LedgerId = ledgerId,
-                EntryId = entryId,
-                Partition = partition,
-                BatchIndex = batchIndex
-            };
-
-        internal MessageIdData Data { get; }
+        {
+            LedgerId = ledgerId;
+            EntryId = entryId;
+            Partition = partition;
+            BatchIndex = batchIndex;
+        }
 
         /// <summary>
         /// The id of the ledger.
         /// </summary>
-        public ulong LedgerId => Data.LedgerId;
+        public ulong LedgerId { get; }
 
         /// <summary>
         /// The id of the entry.
         /// </summary>
-        public ulong EntryId => Data.EntryId;
+        public ulong EntryId { get; }
 
         /// <summary>
         /// The partition.
         /// </summary>
-        public int Partition => Data.Partition;
+        public int Partition { get; }
 
         /// <summary>
         /// The batch index.
         /// </summary>
-        public int BatchIndex => Data.BatchIndex;
+        public int BatchIndex { get; }
 
         public int CompareTo(MessageId? other)
         {
@@ -124,5 +119,12 @@
 
         public override string ToString()
             => $"{LedgerId}:{EntryId}:{Partition}:{BatchIndex}";
+
+        internal MessageIdData ToMessageIdData()
+        {
+            var messageIdData = new MessageIdData();
+            messageIdData.MapFrom(this);
+            return messageIdData;
+        }
     }
 }