pass cancellation token to async lock
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 3ce1b95..766e9ad 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -29,7 +29,7 @@
             var sut = new AsyncLock();
 
             //Act
-            var actual = sut.Lock();
+            var actual = sut.Lock(CancellationToken.None);
 
             //Assert
             Assert.True(actual.IsCompleted);
@@ -44,10 +44,10 @@
         {
             //Arrange
             var sut = new AsyncLock();
-            var alreadyTaken = await sut.Lock();
+            var alreadyTaken = await sut.Lock(CancellationToken.None);
 
             //Act
-            var actual = sut.Lock();
+            var actual = sut.Lock(CancellationToken.None);
 
             //Assert
             Assert.False(actual.IsCompleted);
@@ -66,7 +66,7 @@
             await sut.DisposeAsync();
 
             //Act
-            var exception = await Record.ExceptionAsync(() => sut.Lock());
+            var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None));
 
             //Assert
             Assert.IsType<ObjectDisposedException>(exception);
@@ -77,8 +77,8 @@
         {
             //Arrange
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
-            var awaiting = sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
+            var awaiting = sut.Lock(CancellationToken.None);
             _ = Task.Run(async () => await sut.DisposeAsync());
 
             //Act
@@ -98,7 +98,7 @@
             //Arrange
             var cts = new CancellationTokenSource();
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
             var awaiting = sut.Lock(cts.Token);
 
             //Act
@@ -119,7 +119,7 @@
         {
             //Arrange
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
             var disposeTask = Task.Run(async () => await sut.DisposeAsync());
             Assert.False(disposeTask.IsCompleted);
 
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 459300e..cce261d 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -14,29 +14,30 @@
 
 using DotPulsar.Internal.PulsarApi;
 using System;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal.Abstractions
 {
     public interface IConnection : IAsyncDisposable
     {
-        ValueTask<bool> HasChannels();
+        ValueTask<bool> HasChannels(CancellationToken cancellationToken);
 
-        Task<ProducerResponse> Send(CommandProducer command, IChannel channel);
-        Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel);
+        Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken);
+        Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken);
 
-        Task Send(CommandPing command);
-        Task Send(CommandPong command);
-        Task Send(CommandAck command);
-        Task Send(CommandFlow command);
+        Task Send(CommandPing command, CancellationToken cancellationToken);
+        Task Send(CommandPong command, CancellationToken cancellationToken);
+        Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task Send(CommandFlow command, CancellationToken cancellationToken);
 
-        Task<BaseCommand> Send(CommandUnsubscribe command);
-        Task<BaseCommand> Send(CommandConnect command);
-        Task<BaseCommand> Send(CommandLookupTopic command);
-        Task<BaseCommand> Send(CommandSeek command);
-        Task<BaseCommand> Send(CommandGetLastMessageId command);
-        Task<BaseCommand> Send(CommandCloseProducer command);
-        Task<BaseCommand> Send(CommandCloseConsumer command);
-        Task<BaseCommand> Send(SendPackage command);
+        Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
+        Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index dc05155..a2cbe34 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -21,10 +21,10 @@
 {
     public interface IConsumerChannel : IAsyncDisposable
     {
-        Task Send(CommandAck command);
-        Task<CommandSuccess> Send(CommandUnsubscribe command);
-        Task<CommandSuccess> Send(CommandSeek command);
-        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command);
-        ValueTask<Message> Receive(CancellationToken cancellationToken = default);
+        Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+        Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
+        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+        ValueTask<Message> Receive(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 57276ce..f15fca2 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -15,13 +15,14 @@
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal.Abstractions
 {
     public interface IProducerChannel : IAsyncDisposable
     {
-        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload);
-        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload);
+        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
     }
 }
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index a374518..de04523 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -36,7 +36,7 @@
             _isDisposed = false;
         }
 
-        public Task<IDisposable> Lock(CancellationToken cancellationToken = default)
+        public Task<IDisposable> Lock(CancellationToken cancellationToken)
         {
             LinkedListNode<CancelableCompletionSource<IDisposable>>? node = null;
 
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b14770a..eb27e11 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal
@@ -36,19 +37,19 @@
             _stream = stream;
         }
 
-        public async ValueTask<bool> HasChannels()
+        public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
         {
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 return _channelManager.HasChannels();
             }
         }
 
-        public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel)
+        public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken)
         {
             Task<ProducerResponse>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -60,11 +61,11 @@
             return await responseTask;
         }
 
-        public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel)
+        public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken)
         {
             Task<SubscribeResponse>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -76,16 +77,23 @@
             return await responseTask;
         }
 
-        public async Task Send(CommandPing command) => await Send(command.AsBaseCommand());
-        public async Task Send(CommandPong command) => await Send(command.AsBaseCommand());
-        public async Task Send(CommandAck command) => await Send(command.AsBaseCommand());
-        public async Task Send(CommandFlow command) => await Send(command.AsBaseCommand());
+        public async Task Send(CommandPing command, CancellationToken cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
 
-        public async Task<BaseCommand> Send(CommandUnsubscribe command)
+        public async Task Send(CommandPong command, CancellationToken cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task Send(CommandAck command, CancellationToken cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task Send(CommandFlow command, CancellationToken cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -97,16 +105,23 @@
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(CommandConnect command) => await SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandLookupTopic command) => await SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandSeek command) => await SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandGetLastMessageId command) => await SendRequestResponse(command.AsBaseCommand());
+        public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
 
-        public async Task<BaseCommand> Send(CommandCloseProducer command)
+        public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -118,11 +133,11 @@
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(CommandCloseConsumer command)
+        public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -134,10 +149,10 @@
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(SendPackage command)
+        public async Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? response = null;
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.Command.AsBaseCommand();
                 response = _requestResponseHandler.Outgoing(baseCommand);
@@ -147,10 +162,10 @@
             return await response;
         }
 
-        private async Task<BaseCommand> SendRequestResponse(BaseCommand command)
+        private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? response = null;
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 response = _requestResponseHandler.Outgoing(command);
                 var sequence = Serializer.Serialize(command);
@@ -159,16 +174,16 @@
             return await response;
         }
 
-        private async Task Send(BaseCommand command)
+        private async Task Send(BaseCommand command, CancellationToken cancellationToken)
         {
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var sequence = Serializer.Serialize(command);
                 await _stream.Send(sequence);
             }
         }
 
-        public async Task ProcessIncommingFrames()
+        public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
         {
             await Task.Yield();
 
@@ -197,7 +212,7 @@
                             _channelManager.Incoming(command.CloseProducer);
                             break;
                         case BaseCommand.Type.Ping:
-                            _pingPongHandler.Incoming(command.Ping);
+                            _pingPongHandler.Incoming(command.Ping, cancellationToken);
                             break;
                         default:
                             _requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index d3dff22..7c1f7b5 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -74,7 +74,7 @@
             while (true)
             {
                 var connection = await GetConnection(serviceUrl, cancellationToken);
-                var response = await connection.Send(lookup);
+                var response = await connection.Send(lookup, cancellationToken);
                 response.Expect(BaseCommand.Type.LookupResponse);
 
                 if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
@@ -124,18 +124,18 @@
                 if (_connections.TryGetValue(serviceUrl, out Connection connection))
                     return connection;
 
-                return await EstablishNewConnection(serviceUrl);
+                return await EstablishNewConnection(serviceUrl, cancellationToken);
             }
         }
 
-        private async Task<Connection> EstablishNewConnection(Uri serviceUrl)
+        private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken)
         {
             var stream = await _connector.Connect(serviceUrl);
             var connection = new Connection(new PulsarStream(stream));
             DotPulsarEventSource.Log.ConnectionCreated();
             _connections[serviceUrl] = connection;
-            _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(serviceUrl));
-            var response = await connection.Send(_commandConnect);
+            _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl));
+            var response = await connection.Send(_commandConnect, cancellationToken);
             response.Expect(BaseCommand.Type.Connected);
             return connection;
         }
@@ -165,7 +165,7 @@
                             var connection = _connections[serviceUrl];
                             if (connection is null)
                                 continue;
-                            if (!await connection.HasChannels())
+                            if (!await connection.HasChannels(cancellationToken))
                                 await DisposeConnection(serviceUrl);
                         }
                     }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 440961d..9b623ad 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -96,21 +96,21 @@
         public async ValueTask Unsubscribe(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe()), cancellationToken);
+            _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken);
         }
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
             var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => _channel.Send(seek), cancellationToken);
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken);
             return;
         }
 
         public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId()), cancellationToken);
+            var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken);
             return new MessageId(response.LastMessageId);
         }
 
@@ -122,7 +122,7 @@
                 _cachedCommandAck.Type = ackType;
                 _cachedCommandAck.MessageIds.Clear();
                 _cachedCommandAck.MessageIds.Add(messageIdData);
-                return _channel.Send(_cachedCommandAck);
+                return _channel.Send(_cachedCommandAck, cancellationToken);
             }, cancellationToken);
         }
 
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 9373da1..735b078 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -52,7 +52,7 @@
             while (true)
             {
                 if (_sendWhenZero == 0)
-                    await SendFlow();
+                    await SendFlow(cancellationToken);
 
                 _sendWhenZero--;
 
@@ -64,7 +64,7 @@
 
                 if (!messagePackage.IsValid())
                 {
-                    await RejectPackage(messagePackage);
+                    await RejectPackage(messagePackage, cancellationToken);
                     continue;
                 }
 
@@ -79,7 +79,7 @@
             }
         }
 
-        public async Task Send(CommandAck command)
+        public async Task Send(CommandAck command, CancellationToken cancellationToken)
         {
             var messageId = command.MessageIds[0];
             if (messageId.BatchIndex != -1)
@@ -92,30 +92,30 @@
             }
 
             command.ConsumerId = _id;
-            await _connection.Send(command);
+            await _connection.Send(command, cancellationToken);
         }
 
-        public async Task<CommandSuccess> Send(CommandUnsubscribe command)
+        public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.Success);
             return response.Success;
         }
 
-        public async Task<CommandSuccess> Send(CommandSeek command)
+        public async Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.Success);
             _batchHandler.Clear();
             return response.Success;
         }
 
-        public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command)
+        public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
             return response.GetLastMessageIdResponse;
         }
@@ -125,7 +125,7 @@
             try
             {
                 _queue.Dispose();
-                await _connection.Send(new CommandCloseConsumer { ConsumerId = _id });
+                await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None);
             }
             catch
             {
@@ -133,9 +133,10 @@
             }
         }
 
-        private async ValueTask SendFlow()
+        private async ValueTask SendFlow(CancellationToken cancellationToken)
         {
-            await _connection.Send(_cachedCommandFlow); //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+            //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+            await _connection.Send(_cachedCommandFlow, cancellationToken);
 
             if (_firstFlow)
             {
@@ -146,7 +147,7 @@
             _sendWhenZero = _cachedCommandFlow.MessagePermits;
         }
 
-        private async Task RejectPackage(MessagePackage messagePackage)
+        private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
         {
             var ack = new CommandAck
             {
@@ -156,7 +157,7 @@
 
             ack.MessageIds.Add(messagePackage.MessageId);
 
-            await Send(ack);
+            await Send(ack, cancellationToken);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5424c32..9746c67 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -65,7 +65,7 @@
             var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
-            var response = await connection.Send(_subscribe, channel);
+            var response = await connection.Send(_subscribe, channel, cancellationToken);
             return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 4dc0ce9..ca958b7 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -28,17 +28,18 @@
 
         public ValueTask<Message> Receive(CancellationToken cancellationToken = default) => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload) => throw GetException();
+        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload) => throw GetException();
+        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) =>
+            throw GetException();
 
-        public Task Send(CommandAck command) => throw GetException();
+        public Task Send(CommandAck command, CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandUnsubscribe command) => throw GetException();
+        public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandSeek command) => throw GetException();
+        public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
+        public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) => throw GetException();
 
         private Exception GetException() => new ChannelNotReadyException();
     }
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
index a0e0c23..8b16349 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -12,6 +12,7 @@
  * limitations under the License.
  */
 
+using System.Threading;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 
@@ -28,9 +29,9 @@
             _pong = new CommandPong();
         }
 
-        public void Incoming(CommandPing ping)
+        public void Incoming(CommandPing ping, CancellationToken cancellationToken)
         {
-            _ = _connection.Send(_pong);
+            _ = _connection.Send(_pong, cancellationToken);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index f5750f1..2d850cf 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -76,7 +76,7 @@
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(data), cancellationToken);
+            var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken);
             return new MessageId(response.MessageId);
         }
 
@@ -89,7 +89,7 @@
         public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data), cancellationToken);
+            var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken);
             return new MessageId(response.MessageId);
         }
 
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index d9dc561..49b70a9 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -17,6 +17,7 @@
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal
@@ -53,7 +54,7 @@
         {
             try
             {
-                await _connection.Send(new CommandCloseProducer { ProducerId = _id });
+                await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None);
             }
             catch
             {
@@ -61,22 +62,22 @@
             }
         }
 
-        public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload)
+        public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             _cachedSendPackage.Metadata = _cachedMetadata;
             _cachedSendPackage.Payload = payload;
-            return await SendPackage(true);
+            return await SendPackage(true, cancellationToken);
         }
 
-        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload)
+        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             metadata.ProducerName = _cachedMetadata.ProducerName;
             _cachedSendPackage.Metadata = metadata;
             _cachedSendPackage.Payload = payload;
-            return await SendPackage(metadata.SequenceId == 0);
+            return await SendPackage(metadata.SequenceId == 0, cancellationToken);
         }
 
-        private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId)
+        private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
         {
             try
             {
@@ -90,7 +91,7 @@
                 else
                     _cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
 
-                var response = await _connection.Send(_cachedSendPackage);
+                var response = await _connection.Send(_cachedSendPackage, cancellationToken);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
                 if (autoAssignSequenceId)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index da31a54..7426c22 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -56,7 +56,7 @@
         {
             var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken);
             var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
-            var response = await connection.Send(_commandProducer, channel);
+            var response = await connection.Send(_commandProducer, channel, cancellationToken);
             return new ProducerChannel(response.ProducerId, response.ProducerName, _sequenceId, connection);
         }
     }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index e95687e..e140854 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -64,7 +64,7 @@
             var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
-            var response = await connection.Send(_subscribe, channel);
+            var response = await connection.Send(_subscribe, channel, cancellationToken);
             return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }