Implementing tracing using the ActivitySource and Activity. This is still work in progress.
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 7fb4e16..976f54b 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -35,7 +35,17 @@
         ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
-        /// The topic of the consumer.
+        /// The consumer's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The consumer's subscription name.
+        /// </summary>
+        public string SubscriptionName { get; }
+
+        /// <summary>
+        /// The consumer's topic.
         /// </summary>
         string Topic { get; }
 
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 27c59ee..baf3ca4 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,7 +22,12 @@
     public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
     {
         /// <summary>
-        /// The topic of the producer.
+        /// The producer's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The producer's topic.
         /// </summary>
         string Topic { get; }
     }
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index ddce2e8..7189257 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -35,5 +35,10 @@
         /// Create a reader.
         /// </summary>
         IReader CreateReader(ReaderOptions options);
+
+        /// <summary>
+        /// The client's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
     }
 }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 6b4207b..7962d54 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -22,7 +22,12 @@
     public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
     {
         /// <summary>
-        /// The topic of the reader.
+        /// The reader's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The reader's topic.
         /// </summary>
         string Topic { get; }
     }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0e84b71..b22d835 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -30,6 +30,15 @@
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
     <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+
+  <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+    
+  <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
   </ItemGroup>
 
 </Project>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index b61971c..63b8d49 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,6 +15,10 @@
 namespace DotPulsar.Extensions
 {
     using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -36,6 +40,76 @@
             => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
 
         /// <summary>
+        /// Process and auto-acknowledge a message.
+        /// </summary>
+        public static async ValueTask Process(this IConsumer consumer, Func<Message, CancellationToken, ValueTask> processor, CancellationToken cancellationToken = default)  // TODO Allow user to set number of workers
+        {
+            const string operation = "process";
+            var operationName = $"{consumer.Topic} {operation}";
+
+            var tags = new List<KeyValuePair<string, object?>>
+            {
+                new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+                new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+                new KeyValuePair<string, object?>("messaging.operation", operation),
+                new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+                new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+                new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName) // TODO Ask Pulsar community to define Pulsar specific tags
+            };
+
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var message = await consumer.Receive(cancellationToken);
+
+                var activity = StartActivity(message, operationName, tags);
+
+                if (activity is not null && activity.IsAllDataRequested)
+                {
+                    activity.SetTag("messaging.message_id", message.MessageId.ToString());
+                    activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
+                }
+
+                try
+                {
+                    await processor(message, cancellationToken);
+                }
+                catch
+                {
+                    // Ignore
+                }
+
+                activity?.Dispose();
+
+                await consumer.Acknowledge(message.MessageId, cancellationToken);
+            }
+        }
+
+        private static Activity? StartActivity(Message message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
+        {
+            if (!DotPulsarActivitySource.ActivitySource.HasListeners())
+                return null;
+
+            var properties = message.Properties;
+
+            if (properties.TryGetValue("traceparent", out var traceparent))  // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
+            {
+                var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
+                if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+                    return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+            }
+
+            var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                foreach (var tag in tags)
+                    activity.SetTag(tag.Key, tag.Value);
+            }
+
+            return activity;
+        }
+
+        /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
         /// <returns>
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index abc1f75..9ee752b 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -15,18 +15,22 @@
 namespace DotPulsar.Internal
 {
     using System;
-    using System.Reflection;
 
     public static class Constants
     {
         static Constants()
         {
-            var assemblyName = Assembly.GetCallingAssembly().GetName();
+            var assembly = typeof(Constants).Assembly;
+            var assemblyName = assembly.GetName();
+            if (assemblyName.Name is null)
+                throw new Exception($"Assembly name of {assembly.FullName} is null");
+
             var assemblyVersion = assemblyName.Version;
             if (assemblyVersion is null)
-                throw new Exception("Assembly version of CallingAssembly is null");
+                throw new Exception($"Assembly version of {assembly.FullName} is null");
 
-            ClientVersion = $"{assemblyName.Name} {assemblyVersion.ToString(3)}";
+            ClientName = assemblyName.Name;
+            ClientVersion = assemblyVersion.ToString(3);
             ProtocolVersion = 14;
             PulsarScheme = "pulsar";
             PulsarSslScheme = "pulsar+ssl";
@@ -37,6 +41,7 @@
             MetadataOffset = 10;
         }
 
+        public static string ClientName { get; }
         public static string ClientVersion { get; }
         public static int ProtocolVersion { get; }
         public static string PulsarScheme { get; }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index d263df2..c29db02 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -37,10 +37,14 @@
         private readonly IStateChanged<ConsumerState> _state;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
+        public string SubscriptionName { get; }
         public string Topic { get; }
 
         public Consumer(
             Guid correlationId,
+            Uri serviceUrl,
+            string subscriptionName,
             string topic,
             IRegisterEvent eventRegister,
             IConsumerChannel initialChannel,
@@ -48,6 +52,8 @@
             IStateChanged<ConsumerState> state)
         {
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            SubscriptionName = subscriptionName;
             Topic = topic;
             _eventRegister = eventRegister;
             _channel = initialChannel;
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
new file mode 100644
index 0000000..09c8464
--- /dev/null
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using System.Diagnostics;
+
+    public static class DotPulsarActivitySource
+    {
+        static DotPulsarActivitySource()
+        {
+            ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+        }
+
+        public static ActivitySource ActivitySource { get; }
+    }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 12e4625..02f9bd3 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -36,10 +36,12 @@
         private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
             Guid correlationId,
+            Uri serviceUrl,
             string topic,
             ulong initialSequenceId,
             IRegisterEvent registerEvent,
@@ -50,6 +52,7 @@
             var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
             _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
             Topic = topic;
             _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 88c1245..120d5e5 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -41,7 +41,7 @@
             _commandConnect = new CommandConnect
             {
                 ProtocolVersion = Constants.ProtocolVersion,
-                ClientVersion = Constants.ClientVersion
+                ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
             };
 
             _exceptionHandlers = new List<IHandleException>();
@@ -157,7 +157,7 @@
             var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
             var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
 
-            return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
+            return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index ccbea59..731e4cd 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -32,10 +32,12 @@
         private readonly IStateChanged<ReaderState> _state;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Reader(
             Guid correlationId,
+            Uri serviceUrl,
             string topic,
             IRegisterEvent eventRegister,
             IConsumerChannel initialChannel,
@@ -43,6 +45,7 @@
             IStateChanged<ReaderState> state)
         {
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
             Topic = topic;
             _eventRegister = eventRegister;
             _channel = initialChannel;
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index e2c0577..c18c7d8 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -32,11 +32,18 @@
         private readonly IHandleException _exceptionHandler;
         private int _isDisposed;
 
-        internal PulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler)
+        public Uri ServiceUrl { get; }
+
+        internal PulsarClient(
+            IConnectionPool connectionPool,
+            ProcessManager processManager,
+            IHandleException exceptionHandler,
+            Uri serviceUrl)
         {
             _connectionPool = connectionPool;
             _processManager = processManager;
             _exceptionHandler = exceptionHandler;
+            ServiceUrl = serviceUrl;
             _isDisposed = 0;
             DotPulsarEventSource.Log.ClientCreated();
         }
@@ -57,7 +64,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+            var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
             var process = new ProducerProcess(correlationId, stateManager, factory, producer);
@@ -76,7 +83,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-            var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
             var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
@@ -95,7 +102,7 @@
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
-            var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
             var process = new ReaderProcess(correlationId, stateManager, factory, reader);