Remove locking from the producers, consumers and readers.
Use object pooling to improve performance of multi-threaded usage of the producers.
Make consumers more thread-safe when calling Messages() and other methods.
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index a3d5c47..19c4995 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -21,6 +21,7 @@
   </PropertyGroup>
 
   <ItemGroup>    
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.5" />    
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="2.4.6" />
     <PackageReference Include="System.IO.Pipelines" Version="4.7.2" />
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index cb6ab13..3da59f3 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -22,12 +22,14 @@
 
     public sealed class BatchHandler
     {
+        private readonly object _lock;
         private readonly bool _trackBatches;
         private readonly Queue<Message> _messages;
         private readonly LinkedList<Batch> _batches;
 
         public BatchHandler(bool trackBatches)
         {
+            _lock = new object();
             _trackBatches = trackBatches;
             _messages = new Queue<Message>();
             _batches = new LinkedList<Batch>();
@@ -35,56 +37,68 @@
 
         public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
         {
-            if (_trackBatches)
-                _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
-
-            long index = 0;
-
-            for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+            lock (_lock)
             {
-                var singleMetadataSize = data.ReadUInt32(index, true);
-                index += 4;
-                var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
-                index += singleMetadataSize;
-                var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
-                var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
-                _messages.Enqueue(message);
-                index += (uint) singleMetadata.PayloadSize;
-            }
+                if (_trackBatches)
+                    _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
 
-            return _messages.Dequeue();
+                long index = 0;
+
+                for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+                {
+                    var singleMetadataSize = data.ReadUInt32(index, true);
+                    index += 4;
+                    var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+                    index += singleMetadataSize;
+                    var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+                    var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+                    _messages.Enqueue(message);
+                    index += (uint) singleMetadata.PayloadSize;
+                }
+
+                return _messages.Dequeue();
+            }
         }
 
         public Message? GetNext()
-            => _messages.Count == 0 ? null : _messages.Dequeue();
+        {
+            lock (_lock)
+                return _messages.Count == 0 ? null : _messages.Dequeue();
+        }
 
         public void Clear()
         {
-            _messages.Clear();
-            _batches.Clear();
+            lock (_lock)
+            {
+                _messages.Clear();
+                _batches.Clear();
+            }
         }
 
         public MessageIdData? Acknowledge(MessageIdData messageId)
         {
-            foreach (var batch in _batches)
+            lock (_lock)
             {
-                if (messageId.LedgerId != batch.MessageId.LedgerId ||
-                    messageId.EntryId != batch.MessageId.EntryId ||
-                    messageId.Partition != batch.MessageId.Partition)
-                    continue;
-
-                batch.Acknowledge(messageId.BatchIndex);
-
-                if (batch.IsAcknowledged())
+                foreach (var batch in _batches)
                 {
-                    _batches.Remove(batch);
-                    return batch.MessageId;
+                    if (messageId.LedgerId != batch.MessageId.LedgerId ||
+                        messageId.EntryId != batch.MessageId.EntryId ||
+                        messageId.Partition != batch.MessageId.Partition)
+                        continue;
+
+                    batch.Acknowledge(messageId.BatchIndex);
+
+                    if (batch.IsAcknowledged())
+                    {
+                        _batches.Remove(batch);
+                        return batch.MessageId;
+                    }
+
+                    break;
                 }
 
-                break;
+                return null;
             }
-
-            return null;
         }
 
         private sealed class Batch
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 989aa36..d444586 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,6 +18,7 @@
     using Exceptions;
     using Extensions;
     using PulsarApi;
+    using System;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -170,6 +171,12 @@
         {
             ThrowIfDisposed();
 
+            if (command.Command is null)
+                throw new ArgumentNullException(nameof(command.Command));
+
+            if (command.Metadata is null)
+                throw new ArgumentNullException(nameof(command.Metadata));
+
             Task<BaseCommand>? response;
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index fe5fce3..c6162d2 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -18,6 +18,7 @@
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
     using Events;
+    using Microsoft.Extensions.ObjectPool;
     using PulsarApi;
     using System;
     using System.Collections.Generic;
@@ -31,8 +32,7 @@
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private IConsumerChannel _channel;
-        private readonly CommandAck _cachedCommandAck;
-        private readonly CommandRedeliverUnacknowledgedMessages _cachedCommandRedeliverUnacknowledgedMessages;
+        private readonly ObjectPool<CommandAck> _commandAckPool;
         private readonly IExecute _executor;
         private readonly IStateChanged<ConsumerState> _state;
         private int _isDisposed;
@@ -53,8 +53,7 @@
             _channel = initialChannel;
             _executor = executor;
             _state = state;
-            _cachedCommandAck = new CommandAck();
-            _cachedCommandRedeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
+            _commandAckPool = new DefaultObjectPool<CommandAck>(new DefaultPooledObjectPolicy<CommandAck>());
             _isDisposed = 0;
 
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
@@ -117,7 +116,8 @@
         {
             ThrowIfDisposed();
 
-            _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken).ConfigureAwait(false);
+            var unsubscribe = new CommandUnsubscribe();
+            _ = await _executor.Execute(() => _channel.Send(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -136,7 +136,8 @@
         {
             ThrowIfDisposed();
 
-            var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken).ConfigureAwait(false);
+            var getLastMessageId = new CommandGetLastMessageId();
+            var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
 
             return new MessageId(response.LastMessageId);
         }
@@ -145,26 +146,37 @@
         {
             ThrowIfDisposed();
 
-            await _executor.Execute(() =>
+            var commandAck = _commandAckPool.Get();
+            commandAck.Type = ackType;
+            commandAck.MessageIds.Clear();
+            commandAck.MessageIds.Add(messageIdData);
+
+            try
             {
-                _cachedCommandAck.Type = ackType;
-                _cachedCommandAck.MessageIds.Clear();
-                _cachedCommandAck.MessageIds.Add(messageIdData);
-                return _channel.Send(_cachedCommandAck, cancellationToken);
-            }, cancellationToken).ConfigureAwait(false);
+                await _executor.Execute(() =>
+                {
+                    return _channel.Send(commandAck, cancellationToken);
+                }, cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                _commandAckPool.Return(commandAck);
+            }
         }
 
         private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
+            var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
+            redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
+
             await _executor.Execute(() =>
             {
-                _cachedCommandRedeliverUnacknowledgedMessages.MessageIds.Clear();
-                _cachedCommandAck.MessageIds.AddRange(messageIds);
-                return _channel.Send(_cachedCommandRedeliverUnacknowledgedMessages, cancellationToken);
+                return _channel.Send(redeliverUnacknowledgedMessages, cancellationToken);
             }, cancellationToken).ConfigureAwait(false);
         }
+
         internal async ValueTask SetChannel(IConsumerChannel channel)
         {
             if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index c08649d..a84e574 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -28,6 +28,7 @@
         private readonly IConnection _connection;
         private readonly BatchHandler _batchHandler;
         private readonly CommandFlow _cachedCommandFlow;
+        private readonly AsyncLock _lock;
         private uint _sendWhenZero;
         private bool _firstFlow;
 
@@ -43,6 +44,8 @@
             _connection = connection;
             _batchHandler = batchHandler;
 
+            _lock = new AsyncLock();
+
             _cachedCommandFlow = new CommandFlow
             {
                 ConsumerId = id,
@@ -55,35 +58,38 @@
 
         public async ValueTask<Message> Receive(CancellationToken cancellationToken)
         {
-            while (true)
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                if (_sendWhenZero == 0)
-                    await SendFlow(cancellationToken).ConfigureAwait(false);
-
-                _sendWhenZero--;
-
-                var message = _batchHandler.GetNext();
-
-                if (message != null)
-                    return message;
-
-                var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
-
-                if (!messagePackage.IsValid())
+                while (true)
                 {
-                    await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
-                    continue;
+                    if (_sendWhenZero == 0)
+                        await SendFlow(cancellationToken).ConfigureAwait(false);
+
+                    _sendWhenZero--;
+
+                    var message = _batchHandler.GetNext();
+
+                    if (message != null)
+                        return message;
+
+                    var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+                    if (!messagePackage.IsValid())
+                    {
+                        await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
+                        continue;
+                    }
+
+                    var metadataSize = messagePackage.GetMetadataSize();
+                    var redeliveryCount = messagePackage.RedeliveryCount;
+                    var data = messagePackage.ExtractData(metadataSize);
+                    var metadata = messagePackage.ExtractMetadata(metadataSize);
+                    var messageId = messagePackage.MessageId;
+
+                    return metadata.NumMessagesInBatch == 1
+                        ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
+                        : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
                 }
-
-                var metadataSize = messagePackage.GetMetadataSize();
-                var redeliveryCount = messagePackage.RedeliveryCount;
-                var data = messagePackage.ExtractData(metadataSize);
-                var metadata = messagePackage.ExtractMetadata(metadataSize);
-                var messageId = messagePackage.MessageId;
-
-                return metadata.NumMessagesInBatch == 1
-                    ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
-                    : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
             }
         }
 
@@ -141,8 +147,9 @@
             try
             {
                 _queue.Dispose();
-
-                await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false);
+                await _lock.DisposeAsync();
+                var closeConsumer = new CommandCloseConsumer { ConsumerId = _id };
+                await _connection.Send(closeConsumer, CancellationToken.None).ConfigureAwait(false);
             }
             catch
             {
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 3303d01..efe5bc0 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -91,7 +91,6 @@
         {
             ThrowIfDisposed();
             var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false);
-
             return new MessageId(response.MessageId);
         }
 
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 4ae8503..824ed87 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -16,6 +16,7 @@
 {
     using Abstractions;
     using Extensions;
+    using Microsoft.Extensions.ObjectPool;
     using PulsarApi;
     using System;
     using System.Buffers;
@@ -25,20 +26,18 @@
     public sealed class ProducerChannel : IProducerChannel
     {
         private readonly MessageMetadata _cachedMetadata;
-        private readonly SendPackage _cachedSendPackage;
+        private readonly ObjectPool<SendPackage> _sendPackagePool;
         private readonly ulong _id;
+        private readonly string _name;
         private readonly SequenceId _sequenceId;
         private readonly IConnection _connection;
 
         public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
         {
             _cachedMetadata = new MessageMetadata { ProducerName = name };
-
-            var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 };
-
-            _cachedSendPackage = new SendPackage(commandSend, _cachedMetadata);
-
+            _sendPackagePool = new DefaultObjectPool<SendPackage>(new DefaultPooledObjectPolicy<SendPackage>());
             _id = id;
+            _name = name;
             _sequenceId = sequenceId;
             _connection = connection;
         }
@@ -47,7 +46,8 @@
         {
             try
             {
-                await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false);
+                var closeProducer = new CommandCloseProducer { ProducerId = _id };
+                await _connection.Send(closeProducer, CancellationToken.None).ConfigureAwait(false);
             }
             catch
             {
@@ -56,37 +56,47 @@
         }
 
         public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
-        {
-            _cachedSendPackage.Metadata = _cachedMetadata;
-            _cachedSendPackage.Payload = payload;
-
-            return SendPackage(true, cancellationToken);
-        }
+            => SendPackage(_cachedMetadata, payload, true, cancellationToken);
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
-            metadata.ProducerName = _cachedMetadata.ProducerName;
-            _cachedSendPackage.Metadata = metadata;
-            _cachedSendPackage.Payload = payload;
-
-            return SendPackage(metadata.SequenceId == 0, cancellationToken);
+            metadata.ProducerName = _name;
+            return SendPackage(metadata, payload, metadata.SequenceId == 0, cancellationToken);
         }
 
-        private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
+        private async Task<CommandSendReceipt> SendPackage(
+            MessageMetadata metadata,
+            ReadOnlySequence<byte> payload,
+            bool autoAssignSequenceId,
+            CancellationToken cancellationToken)
         {
+            var sendPackage = _sendPackagePool.Get();
+
+            if (sendPackage.Command is null)
+            {
+                sendPackage.Command = new CommandSend
+                {
+                    ProducerId = _id,
+                    NumMessages = 1
+                };
+            }
+
+            sendPackage.Metadata = metadata;
+            sendPackage.Payload = payload;
+
             try
             {
-                _cachedSendPackage.Metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+                metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
 
                 if (autoAssignSequenceId)
                 {
-                    _cachedSendPackage.Command.SequenceId = _sequenceId.Current;
-                    _cachedSendPackage.Metadata.SequenceId = _sequenceId.Current;
+                    sendPackage.Command.SequenceId = _sequenceId.Current;
+                    sendPackage.Metadata.SequenceId = _sequenceId.Current;
                 }
                 else
-                    _cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
+                    sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
 
-                var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
+                var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
                 if (autoAssignSequenceId)
@@ -98,7 +108,9 @@
             {
                 // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
                 if (autoAssignSequenceId)
-                    _cachedSendPackage.Metadata.SequenceId = 0;
+                    sendPackage.Metadata.SequenceId = 0;
+
+                _sendPackagePool.Return(sendPackage);
             }
         }
     }
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index 297b248..f1b87db 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -19,14 +19,8 @@
 
     public sealed class SendPackage
     {
-        public SendPackage(CommandSend command, MessageMetadata metadata)
-        {
-            Command = command;
-            Metadata = metadata;
-        }
-
-        public CommandSend Command { get; }
-        public MessageMetadata Metadata { get; set; }
+        public CommandSend? Command { get; set; }
+        public MessageMetadata? Metadata { get; set; }
         public ReadOnlySequence<byte> Payload { get; set; }
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 290ac74..260f3ce 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+            var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             var process = new ProducerProcess(correlationId, stateManager, factory, producer);
             _processManager.Add(process);
             process.Start();
@@ -75,7 +75,7 @@
             var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
 
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-            var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+            var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
             _processManager.Add(process);
             process.Start();
@@ -92,7 +92,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
-            var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+            var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             var process = new ReaderProcess(correlationId, stateManager, factory, reader);
             _processManager.Add(process);
             process.Start();
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index d591b22..06d0a06 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -1,10 +1,10 @@
-version: '3.5'
+version: '3.5'
 
 services:
 
   pulsar:
     container_name: pulsar-stresstests
-    image: 'apachepulsar/pulsar:2.5.0'
+    image: 'apachepulsar/pulsar:2.6.0'
     ports:
       - '54546:8080'
       - '54545:6650'