Guard Consumers/Readers and Producers from sending messages that the broker will ignore if it has sent a CommandClose[Producer/Consumer]
diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index 40f8af5..d9409dd 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+
     public interface IChannel
     {
         void Received(MessagePackage message);
@@ -24,5 +26,6 @@
         void Disconnected();
         void ReachedEndOfTopic();
         void Unsubscribed();
+        IDisposable SenderLock();
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs b/src/DotPulsar/Internal/Abstractions/IRequest.cs
index f97314f..8ea7913 100644
--- a/src/DotPulsar/Internal/Abstractions/IRequest.cs
+++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs
@@ -16,5 +16,9 @@
 {
     using System;
 
-    public interface IRequest : IEquatable<IRequest> { }
+    public interface IRequest : IEquatable<IRequest>
+    {
+        bool SenderIsProducer(ulong producerId);
+        bool SenderIsConsumer(ulong consumerId);
+    }
 }
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index 8d71572..b441680 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -17,15 +17,18 @@
     using Abstractions;
     using Events;
     using System;
+    using System.Threading;
 
     public sealed class Channel : IChannel
     {
+        private readonly Lock _senderLock;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private readonly IEnqueue<MessagePackage> _enqueue;
 
         public Channel(Guid correlationId, IRegisterEvent eventRegister, IEnqueue<MessagePackage> enqueue)
         {
+            _senderLock = new Lock();
             _correlationId = correlationId;
             _eventRegister = eventRegister;
             _enqueue = enqueue;
@@ -38,7 +41,10 @@
             => _eventRegister.Register(new ChannelActivated(_correlationId));
 
         public void ClosedByServer()
-            => _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+        {
+            _senderLock.Disable();
+            _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+        }
 
         public void Connected()
             => _eventRegister.Register(new ChannelConnected(_correlationId));
@@ -47,12 +53,51 @@
             => _eventRegister.Register(new ChannelDeactivated(_correlationId));
 
         public void Disconnected()
-            => _eventRegister.Register(new ChannelDisconnected(_correlationId));
+        {
+            _senderLock.Disable();
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+        }
 
         public void ReachedEndOfTopic()
             => _eventRegister.Register(new ChannelReachedEndOfTopic(_correlationId));
 
         public void Unsubscribed()
             => _eventRegister.Register(new ChannelUnsubscribed(_correlationId));
+
+        public IDisposable SenderLock()
+            => _senderLock.Enter();
+
+        private sealed class Lock : IDisposable
+        {
+            private readonly object _lock;
+            private bool _canSend;
+
+            public Lock()
+            {
+                _lock = new object();
+                _canSend = true;
+            }
+
+            public void Disable()
+            {
+                Monitor.Enter(_lock);
+                _canSend = false;
+                Monitor.Exit(_lock);
+            }
+
+            public IDisposable Enter()
+            {
+                Monitor.Enter(_lock);
+
+                if (_canSend)
+                    return this;
+
+                Monitor.Exit(_lock);
+                throw new OperationCanceledException();
+            }
+
+            public void Dispose()
+                => Monitor.Exit(_lock);
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index d2eda4d..ed3fa51 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -23,29 +23,37 @@
 
     public sealed class ChannelManager : IDisposable
     {
+        private readonly RequestResponseHandler _requestResponseHandler;
         private readonly IdLookup<IChannel> _consumerChannels;
         private readonly IdLookup<IChannel> _producerChannels;
+        private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _incoming;
 
         public ChannelManager()
         {
+            _requestResponseHandler = new RequestResponseHandler();
             _consumerChannels = new IdLookup<IChannel>();
             _producerChannels = new IdLookup<IChannel>();
+            _incoming = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+            _incoming.Set(BaseCommand.Type.CloseConsumer, cmd => Incoming(cmd.CloseConsumer));
+            _incoming.Set(BaseCommand.Type.CloseProducer, cmd => Incoming(cmd.CloseProducer));
+            _incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd => Incoming(cmd.ActiveConsumerChange));
+            _incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => Incoming(cmd.ReachedEndOfTopic));
         }
 
         public bool HasChannels()
             => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
 
-        public Task<ProducerResponse> Outgoing(CommandProducer command, Task<BaseCommand> response, IChannel channel)
+        public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
         {
             var producerId = _producerChannels.Add(channel);
-
             command.ProducerId = producerId;
+            var response = _requestResponseHandler.Outgoing(command);
 
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
                 {
-                    _producerChannels.Remove(producerId);
+                    _ = _producerChannels.Remove(producerId);
                     result.Result.Error.Throw();
                 }
 
@@ -55,65 +63,164 @@
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
         }
 
-        public Task<SubscribeResponse> Outgoing(CommandSubscribe command, Task<BaseCommand> response, IChannel channel)
+        public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel channel)
         {
             var consumerId = _consumerChannels.Add(channel);
-
             command.ConsumerId = consumerId;
+            var response = _requestResponseHandler.Outgoing(command);
 
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
                 {
-                    _consumerChannels.Remove(consumerId);
+                    _ = _consumerChannels.Remove(consumerId);
                     result.Result.Error.Throw();
                 }
 
                 channel.Connected();
+
                 return new SubscribeResponse(consumerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
         }
 
-        public void Outgoing(CommandCloseConsumer command, Task<BaseCommand> response)
+        public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
         {
             var consumerId = command.ConsumerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeConsumerSenderLock(consumerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _consumerChannels.Remove(consumerId);
+                    _ = _consumerChannels.Remove(consumerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
         }
 
-        public void Outgoing(CommandCloseProducer command, Task<BaseCommand> response)
+        public Task<BaseCommand> Outgoing(CommandCloseProducer command)
         {
             var producerId = command.ProducerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeProducerSenderLock(producerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _producerChannels.Remove(producerId);
+                    _ = _producerChannels.Remove(producerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
         }
 
-        public void Outgoing(CommandUnsubscribe command, Task<BaseCommand> response)
+        public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
         {
             var consumerId = command.ConsumerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeConsumerSenderLock(consumerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _consumerChannels.Remove(consumerId)?.Unsubscribed();
+                   _consumerChannels.Remove(consumerId)?.Unsubscribed();
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
         }
 
-        public void Incoming(CommandCloseConsumer command)
-            => _consumerChannels.Remove(command.ConsumerId)?.ClosedByServer();
+        public Task<BaseCommand> Outgoing(CommandSend command)
+        {
+            using (TakeProducerSenderLock(command.ProducerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
+        }
 
-        public void Incoming(CommandCloseProducer command)
-            => _producerChannels.Remove(command.ProducerId)?.ClosedByServer();
+        public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
+            => _requestResponseHandler.Outgoing(command);
 
-        public void Incoming(CommandActiveConsumerChange command)
+        public Task<BaseCommand> Outgoing(CommandConnect command)
+            => _requestResponseHandler.Outgoing(command);
+
+        public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+            => _requestResponseHandler.Outgoing(command);
+
+        public Task<BaseCommand> Outgoing(CommandSeek command)
+        {
+            using (TakeConsumerSenderLock(command.ConsumerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+        {
+            using (TakeConsumerSenderLock(command.ConsumerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
+        }
+
+        public void Incoming(BaseCommand command)
+            => _incoming.Get(command.CommandType)(command);
+
+        public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
+            => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
+
+        public void Dispose()
+        {
+            _requestResponseHandler.Dispose();
+
+            foreach (var channel in _consumerChannels.RemoveAll())
+                channel.Disconnected();
+
+            foreach (var channel in _producerChannels.RemoveAll())
+                channel.Disconnected();
+        }
+
+        private void Incoming(CommandReachedEndOfTopic command)
+            => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
+
+        private void Incoming(CommandCloseConsumer command)
+        {
+            var channel = _consumerChannels[command.ConsumerId];
+
+            if (channel is null)
+                return;
+
+            _ = _consumerChannels.Remove(command.ConsumerId);
+            _requestResponseHandler.Incoming(command);
+            channel.ClosedByServer();
+        }
+
+        private void Incoming(CommandCloseProducer command)
+        {
+            var channel = _producerChannels[command.ProducerId];
+
+            if (channel is null)
+                return;
+
+            _ = _producerChannels.Remove(command.ProducerId);
+            _requestResponseHandler.Incoming(command);
+            channel.ClosedByServer();
+        }
+
+        private void Incoming(CommandActiveConsumerChange command)
         {
             var channel = _consumerChannels[command.ConsumerId];
 
@@ -126,19 +233,22 @@
                 channel.Deactivated();
         }
 
-        public void Incoming(CommandReachedEndOfTopic command)
-            => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
-
-        public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
-            => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
-
-        public void Dispose()
+        private IDisposable TakeConsumerSenderLock(ulong consumerId)
         {
-            foreach (var channel in _consumerChannels.RemoveAll())
-                channel.Disconnected();
+            var channel = _consumerChannels[consumerId];
+            if (channel is null)
+                throw new OperationCanceledException();
 
-            foreach (var channel in _producerChannels.RemoveAll())
-                channel.Disconnected();
+            return channel.SenderLock();
+        }
+
+        private IDisposable TakeProducerSenderLock(ulong producerId)
+        {
+            var channel = _producerChannels[producerId];
+            if (channel is null)
+                throw new OperationCanceledException();
+
+            return channel.SenderLock();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 8b24df8..efc1b12 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@
     using Exceptions;
     using Extensions;
     using PulsarApi;
-    using System;
     using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
@@ -27,7 +26,6 @@
     {
         private readonly AsyncLock _lock;
         private readonly ChannelManager _channelManager;
-        private readonly RequestResponseHandler _requestResponseHandler;
         private readonly PingPongHandler _pingPongHandler;
         private readonly IPulsarStream _stream;
         private int _isDisposed;
@@ -36,7 +34,6 @@
         {
             _lock = new AsyncLock();
             _channelManager = new ChannelManager();
-            _requestResponseHandler = new RequestResponseHandler();
             _pingPongHandler = new PingPongHandler(this);
             _stream = stream;
         }
@@ -59,10 +56,8 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
-                responseTask = _channelManager.Outgoing(command, requestResponseTask, channel);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command, channel);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -77,10 +72,8 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
-                responseTask = _channelManager.Outgoing(command, requestResponseTask, channel);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command, channel);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -110,27 +103,77 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        public Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+        public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
 
-        public Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            Task<BaseCommand>? responseTask;
 
-        public Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
 
-        public Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            return await responseTask.ConfigureAwait(false);
+        }
+
+        public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            Task<BaseCommand>? responseTask;
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
+
+        public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            Task<BaseCommand>? responseTask;
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
+
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            Task<BaseCommand>? responseTask;
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
 
         public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
         {
@@ -140,10 +183,8 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -158,10 +199,8 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -172,17 +211,16 @@
         {
             ThrowIfDisposed();
 
-            Task<BaseCommand>? response;
+            Task<BaseCommand>? responseTask;
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.Command!.AsBaseCommand();
-                response = _requestResponseHandler.Outgoing(baseCommand);
-                var sequence = Serializer.Serialize(baseCommand, command.Metadata!, command.Payload);
+                responseTask = _channelManager.Outgoing(command.Command!);
+                var sequence = Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!, command.Payload);
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
-            return await response.ConfigureAwait(false);
+            return await responseTask.ConfigureAwait(false);
         }
 
         public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken)
@@ -193,38 +231,22 @@
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            Task<BaseCommand>? response;
-
-            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
-            {
-                response = _requestResponseHandler.Outgoing(command);
-                var sequence = Serializer.Serialize(command);
-                await _stream.Send(sequence).ConfigureAwait(false);
-            }
-
-            return await response.ConfigureAwait(false);
-        }
-
         private async Task Send(BaseCommand command, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
+            var sequence = Serializer.Serialize(command);
+
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var sequence = Serializer.Serialize(command);
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
         }
@@ -233,19 +255,6 @@
         {
             await Task.Yield();
 
-            var lookup = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
-
-            lookup.Set(BaseCommand.Type.CloseConsumer, cmd => _channelManager.Incoming(cmd.CloseConsumer));
-            lookup.Set(BaseCommand.Type.ActiveConsumerChange, cmd => _channelManager.Incoming(cmd.ActiveConsumerChange));
-            lookup.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => _channelManager.Incoming(cmd.ReachedEndOfTopic));
-            lookup.Set(BaseCommand.Type.Ping, cmd => _pingPongHandler.GotPing());
-            lookup.Set(BaseCommand.Type.CloseProducer, cmd =>
-            {
-                _channelManager.Incoming(cmd.CloseProducer);
-                _requestResponseHandler.Incoming(cmd.CloseProducer);
-            });
-            
-
             try
             {
                 await foreach (var frame in _stream.Frames())
@@ -253,10 +262,18 @@
                     var commandSize = frame.ReadUInt32(0, true);
                     var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
 
-                    if (command.CommandType == BaseCommand.Type.Message)
-                        _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
-                    else
-                        lookup.Get(command.CommandType)(command);
+                    switch (command.CommandType)
+                    {
+                        case BaseCommand.Type.Message:
+                            _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+                            break;
+                        case BaseCommand.Type.Ping:
+                            _pingPongHandler.GotPing();
+                            break;
+                        default:
+                            _channelManager.Incoming(command);
+                            break;
+                    }
                 }
             }
             catch
@@ -271,7 +288,6 @@
                 return;
 
             await _lock.DisposeAsync().ConfigureAwait(false);
-            _requestResponseHandler.Dispose();
             _channelManager.Dispose();
             await _stream.DisposeAsync().ConfigureAwait(false);
         }
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index b3b7d0a..5cbd632 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -24,57 +24,97 @@
     {
         private readonly RequestId _requestId;
         private readonly Awaiter<IRequest, BaseCommand> _requests;
-        private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _setRequestId;
         private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>> _getResponseIdentifier;
 
         public RequestResponseHandler()
         {
             _requestId = new RequestId();
-
             _requests = new Awaiter<IRequest, BaseCommand>();
 
-            _setRequestId = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => { });
-            _setRequestId.Set(BaseCommand.Type.Seek, cmd => cmd.Seek.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Error, cmd => cmd.Error.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Producer, cmd => cmd.Producer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Lookup, cmd => cmd.LookupTopic.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Subscribe, cmd => cmd.Subscribe.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Unsubscribe, cmd => cmd.Unsubscribe.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.CloseConsumer, cmd => cmd.CloseConsumer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.CloseProducer, cmd => cmd.CloseProducer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.GetLastMessageId, cmd => cmd.GetLastMessageId.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.GetOrCreateSchema, cmd => cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext());
-
-            _getResponseIdentifier = new EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>>(cmd => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type"));
-            _getResponseIdentifier.Set(BaseCommand.Type.Connect, cmd => new ConnectRequest());
+            _getResponseIdentifier = new EnumLookup<BaseCommand.Type, Func<BaseCommand, IRequest>>(cmd => throw new Exception($"CommandType '{cmd.CommandType}' not supported as request/response type"));
             _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new ConnectRequest());
-            _getResponseIdentifier.Set(BaseCommand.Type.Seek, cmd => new StandardRequest(cmd.Seek.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Send, cmd => new SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId));
             _getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
             _getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd => new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Producer, cmd => new StandardRequest(cmd.Producer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd => new StandardRequest(cmd.ProducerSuccess.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => new StandardRequest(cmd.CloseConsumer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => new StandardRequest(cmd.CloseProducer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Lookup, cmd => new StandardRequest(cmd.LookupTopic.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => new StandardRequest(cmd.LookupTopicResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Subscribe, cmd => new StandardRequest(cmd.Subscribe.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Unsubscribe, cmd => new StandardRequest(cmd.Unsubscribe.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageId, cmd => new StandardRequest(cmd.GetLastMessageId.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => new StandardRequest(cmd.GetLastMessageIdResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchema, cmd => new StandardRequest(cmd.GetOrCreateSchema.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => new StandardRequest(cmd.Success.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => !_requestId.IsPastInitialId() ? new ConnectRequest() : new StandardRequest(cmd.Error.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd => StandardRequest.WithRequestId(cmd.ProducerSuccess.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => StandardRequest.WithConsumerId(cmd.CloseConsumer.RequestId, cmd.CloseConsumer.ConsumerId));
+            _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => StandardRequest.WithProducerId(cmd.CloseProducer.RequestId, cmd.CloseProducer.ProducerId));
+            _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => StandardRequest.WithRequestId(cmd.Success.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => !_requestId.IsPastInitialId() ? new ConnectRequest() : StandardRequest.WithRequestId(cmd.Error.RequestId));
         }
 
         public void Dispose()
-                => _requests.Dispose();
+            => _requests.Dispose();
 
-        public Task<BaseCommand> Outgoing(BaseCommand command)
+        public Task<BaseCommand> Outgoing(CommandProducer command)
         {
-            _setRequestId.Get(command.CommandType)(command);
-            return _requests.CreateTask(_getResponseIdentifier.Get(command.CommandType)(command));
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithProducerId(command.RequestId, command.ProducerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandCloseProducer command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithProducerId(command.RequestId, command.ProducerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSubscribe command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSend command)
+        {
+            var request = new SendRequest(command.ProducerId, command.SequenceId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithRequestId(command.RequestId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandConnect _1)
+            => _requests.CreateTask(new ConnectRequest());
+
+        public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSeek command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return _requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId));
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return _requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
         }
 
         public void Incoming(BaseCommand command)
@@ -85,12 +125,22 @@
                 _requests.SetResult(identifier, command);
         }
 
+        public void Incoming(CommandCloseConsumer command)
+        {
+            var requests = _requests.Keys;
+            foreach (var request in requests)
+            {
+                if (request.SenderIsConsumer(command.ConsumerId))
+                    _requests.Cancel(request);
+            }
+        }
+
         public void Incoming(CommandCloseProducer command)
         {
             var requests = _requests.Keys;
             foreach (var request in requests)
             {
-                if (request is SendRequest sendRequest && sendRequest.ProducerId == command.ProducerId)
+                if (request.SenderIsProducer(command.ProducerId))
                     _requests.Cancel(request);
             }
         }
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index ceca386..5ab0e20 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -19,6 +19,12 @@
 
     public struct ConnectRequest : IRequest
     {
+        public bool SenderIsConsumer(ulong consumerId)
+            => false;
+
+        public bool SenderIsProducer(ulong producerId)
+            => false;
+
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
 #else
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs b/src/DotPulsar/Internal/Requests/SendRequest.cs
index 5f15fd2..36498b6 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -20,15 +20,21 @@
 
     public struct SendRequest : IRequest
     {
-        public ulong ProducerId { get; }
-        public ulong SequenceId { get; }
+        private readonly ulong _producerId;
+        private readonly ulong _sequenceId;
 
         public SendRequest(ulong producerId, ulong sequenceId)
         {
-            ProducerId = producerId;
-            SequenceId = sequenceId;
+            _producerId = producerId;
+            _sequenceId = sequenceId;
         }
 
+        public bool SenderIsConsumer(ulong consumerId)
+            => false;
+
+        public bool SenderIsProducer(ulong producerId)
+            => _producerId == producerId;
+
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
 #else
@@ -36,12 +42,12 @@
 #endif
         {
             if (other is SendRequest request)
-                return ProducerId.Equals(request.ProducerId) && SequenceId.Equals(request.SequenceId);
+                return _producerId.Equals(request._producerId) && _sequenceId.Equals(request._sequenceId);
 
             return false;
         }
 
         public override int GetHashCode()
-            => HashCode.Combine(ProducerId, SequenceId);
+            => HashCode.Combine(_producerId, _sequenceId);
     }
 }
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 61b656e..666b227 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -20,10 +20,31 @@
 
     public struct StandardRequest : IRequest
     {
-        public ulong RequestId { get; }
+        private readonly ulong _requestId;
+        private readonly ulong? _consumerId;
+        private readonly ulong? _producerId;
 
-        public StandardRequest(ulong requestId)
-            => RequestId = requestId;
+        private StandardRequest(ulong requestId, ulong? consumerId, ulong? producerId)
+        {
+            _requestId = requestId;
+            _consumerId = consumerId;
+            _producerId = producerId;
+        }
+
+        public static StandardRequest WithRequestId(ulong requestId)
+            => new(requestId, null, null);
+
+        public static StandardRequest WithConsumerId(ulong requestId, ulong consumerId)
+            => new(requestId, consumerId, null);
+
+        public static StandardRequest WithProducerId(ulong requestId, ulong producerId)
+            => new (requestId, null, producerId);
+
+        public bool SenderIsConsumer(ulong consumerId)
+            => _consumerId.HasValue && _consumerId.Value == consumerId;
+
+        public bool SenderIsProducer(ulong producerId)
+            => _producerId.HasValue && _producerId.Value == producerId;
 
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
@@ -32,12 +53,12 @@
 #endif
         {
             if (other is StandardRequest request)
-                return RequestId.Equals(request.RequestId);
+                return _requestId.Equals(request._requestId);
 
             return false;
         }
 
         public override int GetHashCode()
-            => HashCode.Combine(RequestId);
+            => HashCode.Combine(_requestId);
     }
 }