Implementing (OpenTelemetry) tracing
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e90c41d..2964b04 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,7 +2,20 @@
 
 All notable changes to this project will be documented in this file.
 
-The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+
+## Added
+
+- [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support following the [guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md) from the [OpenTelemetry](https://opentelemetry.io/) project
+    - Sending a message will create a producer trace and add tracing metadata to the message
+    - The 'Process' extension method for IConsumer\<TMessage\> is no longer experimental and will create a consumer trace
+
+### Changed
+
+- **Breaking**: Sending a message without metadata is now an extension method and therefore no longer part of the ISend\<TMessage\> (and thereby IProducer\<TMessage\>) interface
+- IMessageRouter: ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions) -> ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
 
 ## [1.1.2] - 2021-07-05
 
diff --git a/src/DotPulsar/Abstractions/IMessageRouter.cs b/src/DotPulsar/Abstractions/IMessageRouter.cs
index b6ad294..d0d873c 100644
--- a/src/DotPulsar/Abstractions/IMessageRouter.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -22,6 +22,6 @@
         /// <summary>
         /// Choose a partition.
         /// </summary>
-        int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions);
+        int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions);
     }
 }
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
index 642f61d..affe029 100644
--- a/src/DotPulsar/Abstractions/ISend.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -23,11 +23,6 @@
     public interface ISend<TMessage>
     {
         /// <summary>
-        /// Sends a message.
-        /// </summary>
-        ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
-
-        /// <summary>
         /// Sends a message with metadata.
         /// </summary>
         ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index f9a78a0..88f12cd 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -16,9 +16,9 @@
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Internal;
+    using DotPulsar.Internal.Extensions;
     using System;
     using System.Collections.Generic;
-    using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -40,7 +40,7 @@
             => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
 
         /// <summary>
-        /// Process and auto-acknowledge a message. This is experimental.
+        /// Process and auto-acknowledge a message.
         /// </summary>
         public static async ValueTask Process<TMessage>(
             this IConsumer<TMessage> consumer,
@@ -50,7 +50,7 @@
             const string operation = "process";
             var operationName = $"{consumer.Topic} {operation}";
 
-            var tags = new List<KeyValuePair<string, object?>>
+            var tags = new KeyValuePair<string, object?>[]
             {
                 new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
                 new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
@@ -64,13 +64,13 @@
             {
                 var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
 
-                var activity = StartActivity(message, operationName, tags);
+                var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, tags);
 
                 if (activity is not null && activity.IsAllDataRequested)
                 {
-                    activity.SetTag("messaging.message_id", message.MessageId.ToString());
-                    activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
-                    activity.SetTag("otel.status_code", "OK");
+                    activity.SetMessageId(message.MessageId);
+                    activity.SetPayloadSize(message.Data.Length);
+                    activity.SetStatusCode("OK");
                 }
 
                 try
@@ -80,21 +80,7 @@
                 catch (Exception exception)
                 {
                     if (activity is not null && activity.IsAllDataRequested)
-                    {
-                        activity.SetTag("otel.status_code", "ERROR");
-
-                        var exceptionTags = new ActivityTagsCollection
-                        {
-                            { "exception.type", exception.GetType().FullName },
-                            { "exception.stacktrace", exception.ToString() }
-                        };
-
-                        if (!string.IsNullOrWhiteSpace(exception.Message))
-                            exceptionTags.Add("exception.message", exception.Message);
-
-                        var activityEvent = new ActivityEvent("exception", default, exceptionTags);
-                        activity.AddEvent(activityEvent);
-                    }
+                        activity.AddException(exception);
                 }
 
                 activity?.Dispose();
@@ -103,31 +89,6 @@
             }
         }
 
-        private static Activity? StartActivity(IMessage message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
-        {
-            if (!DotPulsarActivitySource.ActivitySource.HasListeners())
-                return null;
-
-            var properties = message.Properties;
-
-            if (properties.TryGetValue("traceparent", out var traceparent))  // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
-            {
-                var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
-                if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
-                    return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
-            }
-
-            var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
-
-            if (activity is not null && activity.IsAllDataRequested)
-            {
-                foreach (var tag in tags)
-                    activity.SetTag(tag.Key, tag.Value);
-            }
-
-            return activity;
-        }
-
         /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
diff --git a/src/DotPulsar/Internal/SendExtensions.cs b/src/DotPulsar/Extensions/SendExtensions.cs
similarity index 63%
rename from src/DotPulsar/Internal/SendExtensions.cs
rename to src/DotPulsar/Extensions/SendExtensions.cs
index 535408b..479fe69 100644
--- a/src/DotPulsar/Internal/SendExtensions.cs
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Extensions
 {
     using Abstractions;
+    using Microsoft.Extensions.ObjectPool;
     using System;
     using System.Buffers;
     using System.Threading;
@@ -25,17 +26,25 @@
     /// </summary>
     public static class SendExtensions
     {
+        private static readonly ObjectPool<MessageMetadata> _messageMetadataPool;
+
+        static SendExtensions()
+        {
+            var messageMetadataPolicy = new DefaultPooledObjectPolicy<MessageMetadata>();
+            _messageMetadataPool = new DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+        }
+
         /// <summary>
         /// Sends a message.
         /// </summary>
         public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, byte[] data, CancellationToken cancellationToken = default)
-            => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+            => await Send(sender, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message.
         /// </summary>
         public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
-            => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+            => await Send(sender, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message with metadata.
@@ -48,5 +57,23 @@
         /// </summary>
         public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
             => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message without metadata.
+        /// </summary>
+        public static async ValueTask<MessageId> Send<TMessage>(this ISend<TMessage> sender, TMessage message, CancellationToken cancellationToken = default)
+        {
+            var metadata = _messageMetadataPool.Get();
+
+            try
+            {
+                return await sender.Send(metadata, message, cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                metadata.Metadata.Properties.Clear();
+                _messageMetadataPool.Return(metadata);
+            }
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 09c8464..87e2c3e 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -14,15 +14,70 @@
 
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using System.Collections.Generic;
     using System.Diagnostics;
 
     public static class DotPulsarActivitySource
     {
+        private const string _traceParent = "traceparent";
+        private const string _traceState = "tracestate";
+
         static DotPulsarActivitySource()
         {
             ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
         }
 
         public static ActivitySource ActivitySource { get; }
+
+        public static Activity? StartConsumerActivity(IMessage message, string operationName, KeyValuePair<string, object?>[] tags)
+        {
+            if (!ActivitySource.HasListeners())
+                return null;
+
+            var properties = message.Properties;
+
+            if (properties.TryGetValue(_traceParent, out var traceparent))
+            {
+                var tracestate = properties.ContainsKey(_traceState) ? properties[_traceState] : null;
+                if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+                    return ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+            }
+
+            var activity = ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                for (var i = 0; i < tags.Length; ++i)
+                {
+                    var tag = tags[i];
+                    activity.SetTag(tag.Key, tag.Value);
+                }
+            }
+
+            return activity;
+        }
+
+        public static Activity? StartProducerActivity(MessageMetadata metadata, string operationName, KeyValuePair<string, object?>[] tags)
+        {
+            if (!ActivitySource.HasListeners())
+                return null;
+
+            var activity = ActivitySource.StartActivity(operationName, ActivityKind.Producer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                metadata[_traceParent] = activity.TraceId.ToHexString();
+                metadata[_traceState] = activity.TraceStateString;
+
+                for (var i = 0; i < tags.Length; ++i)
+                {
+                    var tag = tags[i];
+                    activity.SetTag(tag.Key, tag.Value);
+                }
+            }
+
+            return activity;
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
new file mode 100644
index 0000000..850e93e
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions
+{
+    using System;
+    using System.Diagnostics;
+
+    public static class ActivityExtensions
+    {
+        private const string _exceptionEventName = "exception";
+        private const string _exceptionStackTrace = "exception.stacktrace";
+        private const string _exceptionType = "exception.type";
+        private const string _exceptionMessage = "exception.message";
+        private const string _messageId = "messaging.message_id";
+        private const string _payloadSize = "messaging.message_payload_size_bytes";
+        private const string _statusCode = "otel.status_code";
+
+        public static void AddException(this Activity activity, Exception exception)
+        {
+            activity.SetStatusCode("ERROR");
+
+            var exceptionTags = new ActivityTagsCollection
+            {
+                { _exceptionType, exception.GetType().FullName },
+                { _exceptionStackTrace, exception.ToString() }
+            };
+
+            if (!string.IsNullOrWhiteSpace(exception.Message))
+                exceptionTags.Add(_exceptionMessage, exception.Message);
+
+            var activityEvent = new ActivityEvent(_exceptionEventName, default, exceptionTags);
+            activity.AddEvent(activityEvent);
+        }
+
+        public static void SetMessageId(this Activity activity, MessageId messageId)
+            => activity.SetTag(_messageId, messageId.ToString());
+
+        public static void SetStatusCode(this Activity activity, string statusCode)
+            => activity.SetTag(_statusCode, statusCode);
+
+        public static void SetPayloadSize(this Activity activity, long payloadSize)
+            => activity.SetTag(_payloadSize, payloadSize);
+    }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index a84d7c9..425d02d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -19,21 +19,24 @@
     using DotPulsar.Exceptions;
     using DotPulsar.Extensions;
     using DotPulsar.Internal.Extensions;
-    using DotPulsar.Internal.PulsarApi;
     using System;
     using System.Collections.Concurrent;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
     {
+        private readonly string _operationName;
+        private readonly KeyValuePair<string, object?>[] _tags;
+        private readonly SequenceId _sequenceId;
         private readonly StateManager<ProducerState> _state;
         private readonly IConnectionPool _connectionPool;
         private readonly IHandleException _exceptionHandler;
         private readonly ICompressorFactory? _compressorFactory;
         private readonly ProducerOptions<TMessage> _options;
         private readonly ProcessManager _processManager;
-        private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+        private readonly ConcurrentDictionary<int, SubProducer<TMessage>> _producers;
         private readonly IMessageRouter _messageRouter;
         private readonly CancellationTokenSource _cts;
         private readonly IExecute _executor;
@@ -52,6 +55,15 @@
             IConnectionPool connectionPool,
             ICompressorFactory? compressorFactory)
         {
+            _operationName = $"{options.Topic} send";
+            _tags = new KeyValuePair<string, object?>[]
+            {
+                new KeyValuePair<string, object?>("messaging.destination", options.Topic),
+                new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+                new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+                new KeyValuePair<string, object?>("messaging.url", serviceUrl),
+            };
+            _sequenceId = new SequenceId(options.InitialSequenceId);
             _state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             ServiceUrl = serviceUrl;
             Topic = options.Topic;
@@ -64,7 +76,7 @@
             _messageRouter = options.MessageRouter;
             _cts = new CancellationTokenSource();
             _executor = new Executor(Guid.Empty, this, _exceptionHandler);
-            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
+            _producers = new ConcurrentDictionary<int, SubProducer<TMessage>>();
             _ = Setup();
         }
 
@@ -149,12 +161,11 @@
             var correlationId = Guid.NewGuid();
             var producerName = _options.ProducerName;
             var schema = _options.Schema;
-            var initialSequenceId = _options.InitialSequenceId;
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, _compressorFactory);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
             var initialChannel = new NotReadyChannel<TMessage>();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+            var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, _processManager, initialChannel, executor, stateManager, factory, schema);
             var process = new ProducerProcess(correlationId, stateManager, producer);
             _processManager.Add(process);
             process.Start();
@@ -164,12 +175,12 @@
         private async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
         {
             var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
-            var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
+            var commandPartitionedMetadata = new PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
             var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
 
-            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+            response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
 
-            if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+            if (response.PartitionMetadataResponse.Response == PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
                 response.PartitionMetadataResponse.Throw();
 
             return response.PartitionMetadataResponse.Partitions;
@@ -203,10 +214,8 @@
             }
         }
 
-        private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
+        private async ValueTask<int> ChoosePartitions(MessageMetadata metadata, CancellationToken cancellationToken)
         {
-            ThrowIfDisposed();
-
             if (_producerCount == 0)
             {
                 _ = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
@@ -220,16 +229,46 @@
             return _messageRouter.ChoosePartition(metadata, _producerCount);
         }
 
-        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
         {
-            var partition = await ChoosePartitions(null, cancellationToken).ConfigureAwait(false);
-            return await _producers[partition].Send(message, cancellationToken).ConfigureAwait(false);
-        }
+            ThrowIfDisposed();
 
-        public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
-        {
-            var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
-            return await _producers[partition].Send(metadata, message, cancellationToken).ConfigureAwait(false);
+            var autoAssignSequenceId = metadata.SequenceId == 0;
+            if (autoAssignSequenceId)
+                metadata.SequenceId = _sequenceId.FetchNext();
+
+            var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _tags);
+
+            try
+            {
+                var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
+                var producer = _producers[partition];
+                var data = _options.Schema.Encode(message);
+                var messageId = await producer.Send(metadata.Metadata, data, cancellationToken).ConfigureAwait(false);
+
+                if (activity is not null && activity.IsAllDataRequested)
+                {
+                    activity.SetMessageId(messageId);
+                    activity.SetPayloadSize(data.Length);
+                    activity.SetStatusCode("OK");
+                }
+
+                return messageId;
+            }
+            catch (Exception exception)
+            {
+                if (activity is not null && activity.IsAllDataRequested)
+                    activity.AddException(exception);
+
+                throw;
+            }
+            finally
+            {
+                activity?.Dispose();
+
+                if (autoAssignSequenceId)
+                    metadata.SequenceId = 0;
+            }
         }
 
         private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/SubProducer.cs b/src/DotPulsar/Internal/SubProducer.cs
index 78022d0..95fa217 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -16,10 +16,8 @@
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
     using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
     using System;
     using System.Buffers;
     using System.Threading;
@@ -27,7 +25,6 @@
 
     public sealed class SubProducer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private IProducerChannel _channel;
@@ -35,7 +32,6 @@
         private readonly IStateChanged<ProducerState> _state;
         private readonly IProducerChannelFactory _factory;
         private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
         public Uri ServiceUrl { get; }
@@ -45,7 +41,6 @@
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
             IProducerChannel initialChannel,
             IExecute executor,
@@ -53,12 +48,9 @@
             IProducerChannelFactory factory,
             ISchema<TMessage> schema)
         {
-            var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
             _channel = initialChannel;
             _executor = executor;
@@ -91,48 +83,14 @@
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
-        public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
-            => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
 
         public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
-            => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
+            => await _executor.Execute(() => InternalSend(metadata.Metadata, _schema.Encode(message), cancellationToken), cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
+        public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+            => await _executor.Execute(() => InternalSend(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
 
-            var metadata = _messageMetadataPool.Get();
-            try
-            {
-                metadata.SequenceId = _sequenceId.FetchNext();
-                return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
-            {
-                _messageMetadataPool.Return(metadata);
-            }
-        }
-
-        public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var autoAssignSequenceId = metadata.SequenceId == 0;
-            if (autoAssignSequenceId)
-                metadata.SequenceId = _sequenceId.FetchNext();
-
-            try
-            {
-                return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
-            {
-                if (autoAssignSequenceId)
-                    metadata.SequenceId = 0;
-            }
-        }
-
-        private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+        private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
             return response.MessageId.ToMessageId();
@@ -148,11 +106,5 @@
             if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
-
-        private void ThrowIfDisposed()
-        {
-            if (_isDisposed != 0)
-                throw new ProducerDisposedException(typeof(Producer<TMessage>).FullName!);
-        }
     }
 }
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 0a972bb..6176d40 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -38,9 +38,9 @@
         /// <summary>
         /// Choose a partition in round robin routing mode
         /// </summary>
-        public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
+        public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
         {
-            var keyBytes = messageMetadata?.KeyBytes;
+            var keyBytes = messageMetadata.KeyBytes;
             if (keyBytes is not null)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
 
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index e9ecfbc..cc2562b 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -46,9 +46,9 @@
         /// <summary>
         /// Choose a partition in single partition routing mode
         /// </summary>
-        public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
+        public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
         {
-            var keyBytes = messageMetadata?.KeyBytes;
+            var keyBytes = messageMetadata.KeyBytes;
             if (keyBytes is not null)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;