Moving SequenceId from the ProducerChannel to the Producer.
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 134d406..8db5b3d 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -61,8 +61,10 @@
             {
                 while (!cancellationToken.IsCancellationRequested)
                 {
-                    var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow);
-                    _ = await producer.Send(data, cancellationToken).ConfigureAwait(false);
+                    var data = DateTime.UtcNow.ToLongTimeString();
+                    var bytes = Encoding.UTF8.GetBytes(data);
+                    _ = await producer.Send(bytes, cancellationToken).ConfigureAwait(false);
+                    Console.WriteLine("Sent: " + data);
                     await Task.Delay(delay).ConfigureAwait(false);
                 }
             }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 8375447..9fe2eb1 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,7 @@
 
     public interface IProducerChannel : IAsyncDisposable
     {
-        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+        Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
         Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 94a68b9..f98d9fd 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -66,7 +66,6 @@
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
             var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-
             return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 93f572a..fee9bb4 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -30,7 +30,7 @@
         public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
             => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+        public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
             => throw GetException();
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index efe5bc0..da2aacd 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,6 +30,7 @@
         private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
+        private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
         public string Topic { get; }
@@ -37,6 +38,7 @@
         public Producer(
             Guid correlationId,
             string topic,
+            ulong initialSequenceId,
             IRegisterEvent registerEvent,
             IProducerChannel initialChannel,
             IExecute executor,
@@ -44,6 +46,7 @@
         {
             _correlationId = correlationId;
             Topic = topic;
+            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
             _channel = initialChannel;
             _executor = executor;
@@ -90,7 +93,8 @@
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false);
+            var sequenceId = _sequenceId.FetchNext();
+            var response = await _executor.Execute(() => _channel.Send(sequenceId, data, cancellationToken), cancellationToken).ConfigureAwait(false);
             return new MessageId(response.MessageId);
         }
 
@@ -103,8 +107,21 @@
         public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            return new MessageId(response.MessageId);
+
+            var autoAssignSequenceId = metadata.SequenceId == 0;
+            if (autoAssignSequenceId)
+                metadata.SequenceId = _sequenceId.FetchNext();
+
+            try
+            {
+                var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
+                return new MessageId(response.MessageId);
+            }
+            finally
+            {
+                if (autoAssignSequenceId)
+                    metadata.SequenceId = 0;
+            }
         }
 
         internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 0ba8cd1..e7e062b 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -25,20 +25,20 @@
 
     public sealed class ProducerChannel : IProducerChannel
     {
-        private readonly MessageMetadata _cachedMetadata;
+        private readonly ObjectPool<MessageMetadata> _messageMetadataPool;
         private readonly ObjectPool<SendPackage> _sendPackagePool;
         private readonly ulong _id;
         private readonly string _name;
-        private readonly SequenceId _sequenceId;
         private readonly IConnection _connection;
 
-        public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
+        public ProducerChannel(ulong id, string name, IConnection connection)
         {
-            _cachedMetadata = new MessageMetadata { ProducerName = name };
-            _sendPackagePool = new DefaultObjectPool<SendPackage>(new DefaultPooledObjectPolicy<SendPackage>());
+            var messageMetadataPolicy = new DefaultPooledObjectPolicy<MessageMetadata>();
+            _messageMetadataPool = new DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+            var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
+            _sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
             _id = id;
             _name = name;
-            _sequenceId = sequenceId;
             _connection = connection;
         }
 
@@ -55,19 +55,30 @@
             }
         }
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
-            => SendPackage(_cachedMetadata, payload, true, cancellationToken);
+        public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+        {
+            var metadata = _messageMetadataPool.Get();
+            metadata.ProducerName = _name;
+            metadata.SequenceId = sequenceId;
+            try
+            {
+                return SendPackage(metadata, payload, cancellationToken);
+            }
+            finally
+            {
+                _messageMetadataPool.Return(metadata);
+            }
+        }
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             metadata.ProducerName = _name;
-            return SendPackage(metadata, payload, metadata.SequenceId == 0, cancellationToken);
+            return SendPackage(metadata, payload, cancellationToken);
         }
 
         private async Task<CommandSendReceipt> SendPackage(
             MessageMetadata metadata,
             ReadOnlySequence<byte> payload,
-            bool autoAssignSequenceId,
             CancellationToken cancellationToken)
         {
             var sendPackage = _sendPackagePool.Get();
@@ -81,36 +92,21 @@
                 };
             }
 
+            sendPackage.Command.SequenceId = metadata.SequenceId;
             sendPackage.Metadata = metadata;
             sendPackage.Payload = payload;
+            metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // TODO Benchmark against StopWatch
 
             try
             {
-                metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
-
-                if (autoAssignSequenceId)
-                {
-                    var newSequenceId = _sequenceId.FetchNext();
-                    sendPackage.Command.SequenceId = newSequenceId;
-                    sendPackage.Metadata.SequenceId = newSequenceId;
-                }
-                else
-                    sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
-
                 var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
-
                 return response.SendReceipt;
             }
             finally
             {
-                // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
-                if (autoAssignSequenceId)
-                    sendPackage.Metadata.SequenceId = 0;
-
                 _sendPackagePool.Return(sendPackage);
             }
         }
     }
 }
-
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index e7d8327..87c76de 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,7 +26,6 @@
         private readonly IRegisterEvent _eventRegister;
         private readonly IConnectionPool _connectionPool;
         private readonly IExecute _executor;
-        private readonly SequenceId _sequenceId;
         private readonly CommandProducer _commandProducer;
 
         public ProducerChannelFactory(
@@ -40,7 +39,6 @@
             _eventRegister = eventRegister;
             _connectionPool = connectionPool;
             _executor = executor;
-            _sequenceId = new SequenceId(options.InitialSequenceId);
 
             _commandProducer = new CommandProducer
             {
@@ -57,8 +55,7 @@
             var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
             var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
             var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
-
-            return new ProducerChannel(response.ProducerId, response.ProducerName, _sequenceId, connection);
+            return new ProducerChannel(response.ProducerId, response.ProducerName, connection);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 03140ad..9efe033 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -65,7 +65,6 @@
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, messageQueue);
             var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-
             return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/RequestId.cs b/src/DotPulsar/Internal/RequestId.cs
new file mode 100644
index 0000000..128ac5e
--- /dev/null
+++ b/src/DotPulsar/Internal/RequestId.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    public sealed class RequestId
+    {
+        private ulong _current;
+
+        public RequestId()
+            => _current = 0;
+
+        public bool IsPastInitialId()
+            => _current != 0;
+
+        public ulong FetchNext()
+            => _current++;
+    }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c855337..1885aa4 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@
         private const string ConnectResponseIdentifier = "Connected";
 
         private readonly Awaiter<string, BaseCommand> _responses;
-        private SequenceId _requestId;
+        private RequestId _requestId;
 
         public RequestResponseHandler()
         {
             _responses = new Awaiter<string, BaseCommand>();
-            _requestId = new SequenceId(1);
+            _requestId = new RequestId();
         }
 
         public void Dispose()
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 55ba94a..aba1c25 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,32 +12,22 @@
  * limitations under the License.
  */
 
-using System.Threading;
-
 namespace DotPulsar.Internal
 {
+    using System.Threading;
+
     public sealed class SequenceId
     {
         private long _current;
-        private ulong _initial;
 
         public SequenceId(ulong initialSequenceId)
         {
             // Subtracting one because Interlocked.Increment will return the post-incremented value
             // which is expected to be the initialSequenceId for the first call
-            _current = unchecked((long)initialSequenceId - 1);
-            _initial = initialSequenceId - 1;
-        }
-
-        // Returns false if FetchNext has not been called on this object before (or if it somehow wrapped around 2^64)
-        public bool IsPastInitialId()
-        {
-            return unchecked((ulong)_current != _initial);
+            _current = unchecked((long) initialSequenceId - 1);
         }
 
         public ulong FetchNext()
-        {
-            return unchecked((ulong)Interlocked.Increment(ref _current));
-        }
+            => unchecked((ulong) Interlocked.Increment(ref _current));
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 260f3ce..982ee3b 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
             var process = new ProducerProcess(correlationId, stateManager, factory, producer);
             _processManager.Add(process);
             process.Start();