Adding metrics. Needs testing
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 5adaa5e..42b77af 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -23,7 +23,7 @@
 
   <ItemGroup>    
     <PackageReference Include="HashDepot" Version="2.0.3" />
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.2" />
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.3" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="3.0.101" />
     <PackageReference Include="System.IO.Pipelines" Version="6.0.2" />
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 3692c28..ca548b8 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -51,7 +51,7 @@
         const string operation = "process";
         var operationName = $"{consumer.Topic} {operation}";
 
-        var tags = new KeyValuePair<string, object?>[]
+        var activityTags = new KeyValuePair<string, object?>[]
         {
             new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
             new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
@@ -61,12 +61,17 @@
             new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
         };
 
+        var meterTags = new KeyValuePair<string, object?>[]
+        {
+            new KeyValuePair<string, object?>("topic", consumer.Topic),
+            new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
+        };
+
         while (!cancellationToken.IsCancellationRequested)
         {
             var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
 
-            var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, tags);
-
+            var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, activityTags);
             if (activity is not null && activity.IsAllDataRequested)
             {
                 activity.SetMessageId(message.MessageId);
@@ -74,6 +79,8 @@
                 activity.SetStatus(ActivityStatusCode.Ok);
             }
 
+            var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
+
             try
             {
                 await processor(message, cancellationToken).ConfigureAwait(false);
@@ -84,6 +91,9 @@
                     activity.AddException(exception);
             }
 
+            if (startTimestamp != 0)
+                DotPulsarMeter.MessageProcessed(startTimestamp, meterTags);
+
             activity?.Dispose();
 
             await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index e1eccb9..62fb53d 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -160,7 +160,7 @@
     {
         var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
         var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _authentication);
-        DotPulsarEventSource.Log.ConnectionCreated();
+        DotPulsarMeter.ConnectionCreated();
         _connections[url] = connection;
         _ = connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t => DisposeConnection(url));
         var commandConnect = _commandConnect;
@@ -178,7 +178,7 @@
         if (_connections.TryRemove(serviceUrl, out Connection? connection) && connection is not null)
         {
             await connection.DisposeAsync().ConfigureAwait(false);
-            DotPulsarEventSource.Log.ConnectionDisposed();
+            DotPulsarMeter.ConnectionDisposed();
         }
     }
 
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index d34b27b..8e1b117 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using System;
+using System.Diagnostics;
 
 public static class Constants
 {
@@ -40,6 +41,7 @@
         MetadataSizeOffset = 6;
         MetadataOffset = 10;
         ConversationId = "messaging.conversation_id";
+        TimestampToTicks = TimeSpan.TicksPerSecond / (double) Stopwatch.Frequency;
     }
 
     public static string ClientName { get; }
@@ -53,4 +55,5 @@
     public static int MetadataSizeOffset { get; }
     public static int MetadataOffset { get; }
     public static string ConversationId { get; }
+    public static double TimestampToTicks { get; }
 }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 5946d39..4e24458 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -24,7 +24,7 @@
 {
     static DotPulsarActivitySource()
     {
-        ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+        ActivitySource = new(Constants.ClientName, Constants.ClientVersion);
     }
 
     public static ActivitySource ActivitySource { get; }
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
deleted file mode 100644
index cf6c1fc..0000000
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal;
-
-#if NETSTANDARD2_0
-public sealed class DotPulsarEventSource
-{
-    public static readonly DotPulsarEventSource Log = new();
-
-    public void ClientCreated() { }
-
-    public void ClientDisposed() { }
-
-    public void ConnectionCreated() { }
-
-    public void ConnectionDisposed() { }
-
-    public void ConsumerCreated() { }
-
-    public void ConsumerDisposed() { }
-
-    public void ProducerCreated() { }
-
-    public void ProducerDisposed() { }
-
-    public void ReaderCreated() { }
-
-    public void ReaderDisposed() { }
-}
-
-#else
-using System.Diagnostics.Tracing;
-using System.Threading;
-
-public sealed class DotPulsarEventSource : EventSource
-{
-#pragma warning disable IDE0052 // Remove unread private members
-    private PollingCounter? _totalClientsCounter;
-    private long _totalClients;
-
-    private PollingCounter? _currentClientsCounter;
-    private long _currentClients;
-
-    private PollingCounter? _totalConnectionsCounter;
-    private long _totalConnections;
-
-    private PollingCounter? _currentConnectionsCounter;
-    private long _currentConnections;
-
-    private PollingCounter? _totalConsumersCounter;
-    private long _totalConsumers;
-
-    private PollingCounter? _currentConsumersCounter;
-    private long _currentConsumers;
-
-    private PollingCounter? _totalProducersCounter;
-    private long _totalProducers;
-
-    private PollingCounter? _currentProducersCounter;
-    private long _currentProducers;
-
-    private PollingCounter? _totalReadersCounter;
-    private long _totalReaders;
-
-    private PollingCounter? _currentReadersCounter;
-    private long _currentReaders;
-
-#pragma warning restore IDE0052 // Remove unread private members
-
-    public static readonly DotPulsarEventSource Log = new();
-
-    public DotPulsarEventSource() : base("DotPulsar") { }
-
-    public void ClientCreated()
-    {
-        Interlocked.Increment(ref _totalClients);
-        Interlocked.Increment(ref _currentClients);
-    }
-
-    public void ClientDisposed()
-    {
-        Interlocked.Decrement(ref _currentClients);
-    }
-
-    public void ConnectionCreated()
-    {
-        Interlocked.Increment(ref _totalConnections);
-        Interlocked.Increment(ref _currentConnections);
-    }
-
-    public void ConnectionDisposed()
-    {
-        Interlocked.Decrement(ref _currentConnections);
-    }
-
-    public void ConsumerCreated()
-    {
-        Interlocked.Increment(ref _totalConsumers);
-        Interlocked.Increment(ref _currentConsumers);
-    }
-
-    public void ConsumerDisposed()
-    {
-        Interlocked.Decrement(ref _currentConsumers);
-    }
-
-    public void ProducerCreated()
-    {
-        Interlocked.Increment(ref _totalProducers);
-        Interlocked.Increment(ref _currentProducers);
-    }
-
-    public void ProducerDisposed()
-    {
-        Interlocked.Decrement(ref _currentProducers);
-    }
-
-    public void ReaderCreated()
-    {
-        Interlocked.Increment(ref _totalReaders);
-        Interlocked.Increment(ref _currentReaders);
-    }
-
-    public void ReaderDisposed()
-    {
-        Interlocked.Decrement(ref _currentReaders);
-    }
-
-    protected override void OnEventCommand(EventCommandEventArgs command)
-    {
-        if (command.Command != EventCommand.Enable)
-            return;
-
-        _totalClientsCounter ??= new PollingCounter("total-clients", this, () => Volatile.Read(ref _totalClients))
-        {
-            DisplayName = "Total number of clients"
-        };
-
-        _currentClientsCounter ??= new PollingCounter("current-clients", this, () => Volatile.Read(ref _currentClients))
-        {
-            DisplayName = "Current number of clients"
-        };
-
-        _totalConnectionsCounter ??= new PollingCounter("total-connections", this, () => Volatile.Read(ref _totalConnections))
-        {
-            DisplayName = "Total number of connections"
-        };
-
-        _currentConnectionsCounter ??= new PollingCounter("current-connections", this, () => Volatile.Read(ref _currentConnections))
-        {
-            DisplayName = "Current number of connections"
-        };
-
-        _totalConsumersCounter ??= new PollingCounter("total-consumers", this, () => Volatile.Read(ref _totalConsumers))
-        {
-            DisplayName = "Total number of consumers"
-        };
-
-        _currentConsumersCounter ??= new PollingCounter("current-consumers", this, () => Volatile.Read(ref _currentConsumers))
-        {
-            DisplayName = "Current number of consumers"
-        };
-
-        _totalProducersCounter ??= new PollingCounter("total-producers", this, () => Volatile.Read(ref _totalProducers))
-        {
-            DisplayName = "Total number of producers"
-        };
-
-        _currentProducersCounter ??= new PollingCounter("current-producers", this, () => Volatile.Read(ref _currentProducers))
-        {
-            DisplayName = "Current number of producers"
-        };
-
-        _totalReadersCounter ??= new PollingCounter("total-readers", this, () => Volatile.Read(ref _totalReaders))
-        {
-            DisplayName = "Total number of readers"
-        };
-
-        _currentReadersCounter ??= new PollingCounter("current-readers", this, () => Volatile.Read(ref _currentReaders))
-        {
-            DisplayName = "Current number of readers"
-        };
-    }
-}
-#endif
diff --git a/src/DotPulsar/Internal/DotPulsarMeter.cs b/src/DotPulsar/Internal/DotPulsarMeter.cs
new file mode 100644
index 0000000..db0e525
--- /dev/null
+++ b/src/DotPulsar/Internal/DotPulsarMeter.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using System.Threading;
+
+public static class DotPulsarMeter
+{
+#pragma warning disable IDE0079
+#pragma warning disable IDE0044
+    private static int _numberOfClients;
+    private static int _numberOfConnections;
+    private static int _numberOfReaders;
+    private static int _numberOfConsumers;
+    private static int _numberOfProducers;
+#pragma warning restore IDE0044
+#pragma warning restore IDE0079
+    private static readonly Histogram<double> _producerSendDuration;
+    private static readonly Histogram<double> _consumerProcessDuration;
+
+    static DotPulsarMeter()
+    {
+        Meter = new(Constants.ClientName, Constants.ClientVersion);
+        var numberOfClients = Meter.CreateObservableGauge("dotpulsar.client.count", GetNumberOfClients, "{clients}", "Number of clients");
+        var numberOfConnections = Meter.CreateObservableGauge("dotpulsar.connection.count", GetNumberOfConnections, "{connections}", "Number of connections");
+        var numberOfReaders = Meter.CreateObservableGauge("dotpulsar.reader.count", GetNumberOfReaders, "{readers}", "Number of readers");
+        var numberOfConsumers = Meter.CreateObservableGauge("dotpulsar.consumer.count", GetNumberOfConsumers, "{consumers}", "Number of consumers");
+        var numberOfProducers = Meter.CreateObservableGauge("dotpulsar.producer.count", GetNumberOfProducers, "{producers}", "Number of producers");
+        _producerSendDuration = Meter.CreateHistogram<double>("dotpulsar.producer.send.duration", "ms", "Measures the duration for sending a message");
+        _consumerProcessDuration = Meter.CreateHistogram<double>("dotpulsar.consumer.process.duration", "ms", "Measures the duration for processing a message");
+    }
+
+    public static Meter Meter { get; }
+
+    public static void ClientCreated() => Interlocked.Increment(ref _numberOfClients);
+    public static void ClientDisposed() => Interlocked.Decrement(ref _numberOfClients);
+    private static int GetNumberOfClients() => Volatile.Read(ref _numberOfClients);
+
+    public static void ConnectionCreated() => Interlocked.Increment(ref _numberOfConnections);
+    public static void ConnectionDisposed() => Interlocked.Decrement(ref _numberOfConnections);
+    private static int GetNumberOfConnections() => Volatile.Read(ref _numberOfConnections);
+
+    public static void ReaderCreated() => Interlocked.Increment(ref _numberOfReaders);
+    public static void ReaderDisposed() => Interlocked.Decrement(ref _numberOfReaders);
+    private static int GetNumberOfReaders() => Volatile.Read(ref _numberOfReaders);
+
+    public static void ConsumerCreated() => Interlocked.Increment(ref _numberOfConsumers);
+    public static void ConsumerDisposed() => Interlocked.Decrement(ref _numberOfConsumers);
+    private static int GetNumberOfConsumers() => Volatile.Read(ref _numberOfConsumers);
+
+    public static void ProducerCreated() => Interlocked.Increment(ref _numberOfProducers);
+    public static void ProducerDisposed() => Interlocked.Decrement(ref _numberOfProducers);
+    private static int GetNumberOfProducers() => Volatile.Read(ref _numberOfProducers);
+
+
+    public static bool MessageSentEnabled => _producerSendDuration.Enabled;
+    public static void MessageSent(long startTimestamp, KeyValuePair<string, object?>[] tags) =>
+        _producerSendDuration.Record(GetMillisecondsTillNow(startTimestamp), tags);
+
+    public static bool MessageProcessedEnabled => _consumerProcessDuration.Enabled;
+    public static void MessageProcessed(long startTimestamp, KeyValuePair<string, object?>[] tags)
+        => _consumerProcessDuration.Record(GetMillisecondsTillNow(startTimestamp), tags);
+
+    private static double GetMillisecondsTillNow(long startTimestamp)
+    {
+        var timestampDelta = Stopwatch.GetTimestamp() - startTimestamp;
+        var ticks = (long) (Constants.TimestampToTicks * timestampDelta);
+        return new TimeSpan(ticks).TotalMilliseconds;
+    }
+}
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index 4ceae48..2c9777e 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -54,25 +54,25 @@
         switch (e)
         {
             case ConsumerCreated _:
-                DotPulsarEventSource.Log.ConsumerCreated();
+                DotPulsarMeter.ConsumerCreated();
                 break;
             case ConsumerDisposed consumerDisposed:
                 Remove(consumerDisposed.CorrelationId);
-                DotPulsarEventSource.Log.ConsumerDisposed();
+                DotPulsarMeter.ConsumerDisposed();
                 break;
             case ProducerCreated _:
-                DotPulsarEventSource.Log.ProducerCreated();
+                DotPulsarMeter.ProducerCreated();
                 break;
             case ProducerDisposed producerDisposed:
                 Remove(producerDisposed.CorrelationId);
-                DotPulsarEventSource.Log.ProducerDisposed();
+                DotPulsarMeter.ProducerDisposed();
                 break;
             case ReaderCreated _:
-                DotPulsarEventSource.Log.ReaderCreated();
+                DotPulsarMeter.ReaderCreated();
                 break;
             case ReaderDisposed readerDisposed:
                 Remove(readerDisposed.CorrelationId);
-                DotPulsarEventSource.Log.ReaderDisposed();
+                DotPulsarMeter.ReaderDisposed();
                 break;
             default:
                 if (_processes.TryGetValue(e.CorrelationId, out var process))
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 9f4b1e7..c45d118 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -29,7 +29,8 @@
 public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
 {
     private readonly string _operationName;
-    private readonly KeyValuePair<string, object?>[] _tags;
+    private readonly KeyValuePair<string, object?>[] _activityTags;
+    private readonly KeyValuePair<string, object?>[] _meterTags;
     private readonly SequenceId _sequenceId;
     private readonly StateManager<ProducerState> _state;
     private readonly IConnectionPool _connectionPool;
@@ -57,13 +58,17 @@
         ICompressorFactory? compressorFactory)
     {
         _operationName = $"{options.Topic} send";
-        _tags = new KeyValuePair<string, object?>[]
+        _activityTags = new KeyValuePair<string, object?>[]
         {
                 new KeyValuePair<string, object?>("messaging.destination", options.Topic),
                 new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
                 new KeyValuePair<string, object?>("messaging.system", "pulsar"),
                 new KeyValuePair<string, object?>("messaging.url", serviceUrl),
         };
+        _meterTags = new KeyValuePair<string, object?>[]
+        {
+                new KeyValuePair<string, object?>("topic", options.Topic)
+        };
         _sequenceId = new SequenceId(options.InitialSequenceId);
         _state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
         ServiceUrl = serviceUrl;
@@ -238,15 +243,21 @@
         if (autoAssignSequenceId)
             metadata.SequenceId = _sequenceId.FetchNext();
 
-        var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _tags);
+        var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _activityTags);
 
         try
         {
             var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
             var producer = _producers[partition];
             var data = _options.Schema.Encode(message);
+
+            var startTimestamp = DotPulsarMeter.MessageSentEnabled ? Stopwatch.GetTimestamp() : 0;
+
             var messageId = await producer.Send(metadata.Metadata, data, cancellationToken).ConfigureAwait(false);
 
+            if (startTimestamp != 0)
+                DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+
             if (activity is not null && activity.IsAllDataRequested)
             {
                 activity.SetMessageId(messageId);
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index 2cf4804..2ac035d 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -131,6 +131,12 @@
         }
         set
         {
+            if (key is null)
+                throw new ArgumentNullException(nameof(key), $"The '{nameof(key)}' cannot be null");
+
+            if (value is null)
+                throw new ArgumentNullException(nameof(value), $"The '{nameof(value)}' cannot be null");
+
             for (var i = 0; i < Metadata.Properties.Count; ++i)
             {
                 var prop = Metadata.Properties[i];
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 2f9acb5..2767568 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -48,7 +48,7 @@
         _exceptionHandler = exceptionHandler;
         ServiceUrl = serviceUrl;
         _isDisposed = 0;
-        DotPulsarEventSource.Log.ClientCreated();
+        DotPulsarMeter.ClientCreated();
     }
 
     /// <summary>
@@ -164,7 +164,7 @@
         if (_processManager is IAsyncDisposable disposable)
             await disposable.DisposeAsync().ConfigureAwait(false);
 
-        DotPulsarEventSource.Log.ClientDisposed();
+        DotPulsarMeter.ClientDisposed();
     }
 
     private void ThrowIfDisposed()