diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 4daf83e..31140b2 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -27,6 +27,6 @@
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index dd806f4..b214237 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -23,6 +23,6 @@
     public interface IProducerChannel : IAsyncDisposable
     {
         Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index a692c18..55135ea 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -23,7 +23,7 @@
     {
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
-        ValueTask<Message> Receive(CancellationToken cancellationToken = default);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+        ValueTask<Message> Receive(CancellationToken cancellationToken);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2282711..c6e9dfc 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -83,7 +83,7 @@
                 return;
 
             _eventRegister.Register(new ConsumerDisposed(_correlationId, this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -111,7 +111,13 @@
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
-            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+        {
+            ThrowIfDisposed();
+
+            var command = new CommandRedeliverUnacknowledgedMessages();
+            command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+            await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
 
         public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
             => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
@@ -121,7 +127,12 @@
             ThrowIfDisposed();
 
             var unsubscribe = new CommandUnsubscribe();
-            _ = await _executor.Execute(() => _channel.Send(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
+            await _executor.Execute(() => Unsubscribe(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
+        {
+            _ = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -161,7 +172,12 @@
             ThrowIfDisposed();
 
             var getLastMessageId = new CommandGetLastMessageId();
-            var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => GetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
+        {
+            var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
             return new MessageId(response.LastMessageId);
         }
 
@@ -179,10 +195,7 @@
 
             try
             {
-                await _executor.Execute(() =>
-                {
-                    return _channel.Send(commandAck, cancellationToken);
-                }, cancellationToken).ConfigureAwait(false);
+                await _executor.Execute(() => Acknowledge(commandAck, cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -190,14 +203,11 @@
             }
         }
 
-        private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
+        private async ValueTask Acknowledge(CommandAck command, CancellationToken cancellationToken)
+            => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
-            var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
-            redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
-            await _executor.Execute(() => _channel.Send(redeliverUnacknowledgedMessages, cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
+        private async ValueTask RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+            => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
 
         internal async ValueTask SetChannel(IConsumerChannel channel)
         {
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 1ac6886..2347eee 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -84,7 +84,7 @@
                 return;
 
             _eventRegister.Register(new ProducerDisposed(_correlationId, this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -97,12 +97,12 @@
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
+
             var metadata = _messageMetadataPool.Get();
             try
             {
                 metadata.SequenceId = _sequenceId.FetchNext();
-                var response = await _executor.Execute(() => _channel.Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-                return new MessageId(response.MessageId);
+                return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -126,8 +126,7 @@
 
             try
             {
-                var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-                return new MessageId(response.MessageId);
+                return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -136,6 +135,12 @@
             }
         }
 
+        private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+        {
+            var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
+            return new MessageId(response.MessageId);
+        }
+
         internal async ValueTask SetChannel(IProducerChannel channel)
         {
             if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index eeb95f5..571c5b6 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,10 +73,17 @@
         public bool IsFinalState(ReaderState state)
             => _state.IsFinalState(state);
 
-        public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default)
+        public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
         {
+            ThrowIfDisposed();
+
             var getLastMessageId = new CommandGetLastMessageId();
-            var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => GetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
+        {
+            var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
             return new MessageId(response.LastMessageId);
         }
 
@@ -129,7 +136,7 @@
                 return;
 
             _eventRegister.Register(new ReaderDisposed(_correlationId, this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
