Adding metrics. Needs testing
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 5adaa5e..42b77af 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.2" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.2" />
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 3692c28..ca548b8 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -51,7 +51,7 @@
const string operation = "process";
var operationName = $"{consumer.Topic} {operation}";
- var tags = new KeyValuePair<string, object?>[]
+ var activityTags = new KeyValuePair<string, object?>[]
{
new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
@@ -61,12 +61,17 @@
new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
};
+ var meterTags = new KeyValuePair<string, object?>[]
+ {
+ new KeyValuePair<string, object?>("topic", consumer.Topic),
+ new KeyValuePair<string, object?>("subscription", consumer.SubscriptionName)
+ };
+
while (!cancellationToken.IsCancellationRequested)
{
var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
- var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, tags);
-
+ var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, activityTags);
if (activity is not null && activity.IsAllDataRequested)
{
activity.SetMessageId(message.MessageId);
@@ -74,6 +79,8 @@
activity.SetStatus(ActivityStatusCode.Ok);
}
+ var startTimestamp = DotPulsarMeter.MessageProcessedEnabled ? Stopwatch.GetTimestamp() : 0;
+
try
{
await processor(message, cancellationToken).ConfigureAwait(false);
@@ -84,6 +91,9 @@
activity.AddException(exception);
}
+ if (startTimestamp != 0)
+ DotPulsarMeter.MessageProcessed(startTimestamp, meterTags);
+
activity?.Dispose();
await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index e1eccb9..62fb53d 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -160,7 +160,7 @@
{
var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _authentication);
- DotPulsarEventSource.Log.ConnectionCreated();
+ DotPulsarMeter.ConnectionCreated();
_connections[url] = connection;
_ = connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t => DisposeConnection(url));
var commandConnect = _commandConnect;
@@ -178,7 +178,7 @@
if (_connections.TryRemove(serviceUrl, out Connection? connection) && connection is not null)
{
await connection.DisposeAsync().ConfigureAwait(false);
- DotPulsarEventSource.Log.ConnectionDisposed();
+ DotPulsarMeter.ConnectionDisposed();
}
}
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index d34b27b..8e1b117 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using System;
+using System.Diagnostics;
public static class Constants
{
@@ -40,6 +41,7 @@
MetadataSizeOffset = 6;
MetadataOffset = 10;
ConversationId = "messaging.conversation_id";
+ TimestampToTicks = TimeSpan.TicksPerSecond / (double) Stopwatch.Frequency;
}
public static string ClientName { get; }
@@ -53,4 +55,5 @@
public static int MetadataSizeOffset { get; }
public static int MetadataOffset { get; }
public static string ConversationId { get; }
+ public static double TimestampToTicks { get; }
}
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 5946d39..4e24458 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -24,7 +24,7 @@
{
static DotPulsarActivitySource()
{
- ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+ ActivitySource = new(Constants.ClientName, Constants.ClientVersion);
}
public static ActivitySource ActivitySource { get; }
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
deleted file mode 100644
index cf6c1fc..0000000
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal;
-
-#if NETSTANDARD2_0
-public sealed class DotPulsarEventSource
-{
- public static readonly DotPulsarEventSource Log = new();
-
- public void ClientCreated() { }
-
- public void ClientDisposed() { }
-
- public void ConnectionCreated() { }
-
- public void ConnectionDisposed() { }
-
- public void ConsumerCreated() { }
-
- public void ConsumerDisposed() { }
-
- public void ProducerCreated() { }
-
- public void ProducerDisposed() { }
-
- public void ReaderCreated() { }
-
- public void ReaderDisposed() { }
-}
-
-#else
-using System.Diagnostics.Tracing;
-using System.Threading;
-
-public sealed class DotPulsarEventSource : EventSource
-{
-#pragma warning disable IDE0052 // Remove unread private members
- private PollingCounter? _totalClientsCounter;
- private long _totalClients;
-
- private PollingCounter? _currentClientsCounter;
- private long _currentClients;
-
- private PollingCounter? _totalConnectionsCounter;
- private long _totalConnections;
-
- private PollingCounter? _currentConnectionsCounter;
- private long _currentConnections;
-
- private PollingCounter? _totalConsumersCounter;
- private long _totalConsumers;
-
- private PollingCounter? _currentConsumersCounter;
- private long _currentConsumers;
-
- private PollingCounter? _totalProducersCounter;
- private long _totalProducers;
-
- private PollingCounter? _currentProducersCounter;
- private long _currentProducers;
-
- private PollingCounter? _totalReadersCounter;
- private long _totalReaders;
-
- private PollingCounter? _currentReadersCounter;
- private long _currentReaders;
-
-#pragma warning restore IDE0052 // Remove unread private members
-
- public static readonly DotPulsarEventSource Log = new();
-
- public DotPulsarEventSource() : base("DotPulsar") { }
-
- public void ClientCreated()
- {
- Interlocked.Increment(ref _totalClients);
- Interlocked.Increment(ref _currentClients);
- }
-
- public void ClientDisposed()
- {
- Interlocked.Decrement(ref _currentClients);
- }
-
- public void ConnectionCreated()
- {
- Interlocked.Increment(ref _totalConnections);
- Interlocked.Increment(ref _currentConnections);
- }
-
- public void ConnectionDisposed()
- {
- Interlocked.Decrement(ref _currentConnections);
- }
-
- public void ConsumerCreated()
- {
- Interlocked.Increment(ref _totalConsumers);
- Interlocked.Increment(ref _currentConsumers);
- }
-
- public void ConsumerDisposed()
- {
- Interlocked.Decrement(ref _currentConsumers);
- }
-
- public void ProducerCreated()
- {
- Interlocked.Increment(ref _totalProducers);
- Interlocked.Increment(ref _currentProducers);
- }
-
- public void ProducerDisposed()
- {
- Interlocked.Decrement(ref _currentProducers);
- }
-
- public void ReaderCreated()
- {
- Interlocked.Increment(ref _totalReaders);
- Interlocked.Increment(ref _currentReaders);
- }
-
- public void ReaderDisposed()
- {
- Interlocked.Decrement(ref _currentReaders);
- }
-
- protected override void OnEventCommand(EventCommandEventArgs command)
- {
- if (command.Command != EventCommand.Enable)
- return;
-
- _totalClientsCounter ??= new PollingCounter("total-clients", this, () => Volatile.Read(ref _totalClients))
- {
- DisplayName = "Total number of clients"
- };
-
- _currentClientsCounter ??= new PollingCounter("current-clients", this, () => Volatile.Read(ref _currentClients))
- {
- DisplayName = "Current number of clients"
- };
-
- _totalConnectionsCounter ??= new PollingCounter("total-connections", this, () => Volatile.Read(ref _totalConnections))
- {
- DisplayName = "Total number of connections"
- };
-
- _currentConnectionsCounter ??= new PollingCounter("current-connections", this, () => Volatile.Read(ref _currentConnections))
- {
- DisplayName = "Current number of connections"
- };
-
- _totalConsumersCounter ??= new PollingCounter("total-consumers", this, () => Volatile.Read(ref _totalConsumers))
- {
- DisplayName = "Total number of consumers"
- };
-
- _currentConsumersCounter ??= new PollingCounter("current-consumers", this, () => Volatile.Read(ref _currentConsumers))
- {
- DisplayName = "Current number of consumers"
- };
-
- _totalProducersCounter ??= new PollingCounter("total-producers", this, () => Volatile.Read(ref _totalProducers))
- {
- DisplayName = "Total number of producers"
- };
-
- _currentProducersCounter ??= new PollingCounter("current-producers", this, () => Volatile.Read(ref _currentProducers))
- {
- DisplayName = "Current number of producers"
- };
-
- _totalReadersCounter ??= new PollingCounter("total-readers", this, () => Volatile.Read(ref _totalReaders))
- {
- DisplayName = "Total number of readers"
- };
-
- _currentReadersCounter ??= new PollingCounter("current-readers", this, () => Volatile.Read(ref _currentReaders))
- {
- DisplayName = "Current number of readers"
- };
- }
-}
-#endif
diff --git a/src/DotPulsar/Internal/DotPulsarMeter.cs b/src/DotPulsar/Internal/DotPulsarMeter.cs
new file mode 100644
index 0000000..db0e525
--- /dev/null
+++ b/src/DotPulsar/Internal/DotPulsarMeter.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using System.Threading;
+
+public static class DotPulsarMeter
+{
+#pragma warning disable IDE0079
+#pragma warning disable IDE0044
+ private static int _numberOfClients;
+ private static int _numberOfConnections;
+ private static int _numberOfReaders;
+ private static int _numberOfConsumers;
+ private static int _numberOfProducers;
+#pragma warning restore IDE0044
+#pragma warning restore IDE0079
+ private static readonly Histogram<double> _producerSendDuration;
+ private static readonly Histogram<double> _consumerProcessDuration;
+
+ static DotPulsarMeter()
+ {
+ Meter = new(Constants.ClientName, Constants.ClientVersion);
+ var numberOfClients = Meter.CreateObservableGauge("dotpulsar.client.count", GetNumberOfClients, "{clients}", "Number of clients");
+ var numberOfConnections = Meter.CreateObservableGauge("dotpulsar.connection.count", GetNumberOfConnections, "{connections}", "Number of connections");
+ var numberOfReaders = Meter.CreateObservableGauge("dotpulsar.reader.count", GetNumberOfReaders, "{readers}", "Number of readers");
+ var numberOfConsumers = Meter.CreateObservableGauge("dotpulsar.consumer.count", GetNumberOfConsumers, "{consumers}", "Number of consumers");
+ var numberOfProducers = Meter.CreateObservableGauge("dotpulsar.producer.count", GetNumberOfProducers, "{producers}", "Number of producers");
+ _producerSendDuration = Meter.CreateHistogram<double>("dotpulsar.producer.send.duration", "ms", "Measures the duration for sending a message");
+ _consumerProcessDuration = Meter.CreateHistogram<double>("dotpulsar.consumer.process.duration", "ms", "Measures the duration for processing a message");
+ }
+
+ public static Meter Meter { get; }
+
+ public static void ClientCreated() => Interlocked.Increment(ref _numberOfClients);
+ public static void ClientDisposed() => Interlocked.Decrement(ref _numberOfClients);
+ private static int GetNumberOfClients() => Volatile.Read(ref _numberOfClients);
+
+ public static void ConnectionCreated() => Interlocked.Increment(ref _numberOfConnections);
+ public static void ConnectionDisposed() => Interlocked.Decrement(ref _numberOfConnections);
+ private static int GetNumberOfConnections() => Volatile.Read(ref _numberOfConnections);
+
+ public static void ReaderCreated() => Interlocked.Increment(ref _numberOfReaders);
+ public static void ReaderDisposed() => Interlocked.Decrement(ref _numberOfReaders);
+ private static int GetNumberOfReaders() => Volatile.Read(ref _numberOfReaders);
+
+ public static void ConsumerCreated() => Interlocked.Increment(ref _numberOfConsumers);
+ public static void ConsumerDisposed() => Interlocked.Decrement(ref _numberOfConsumers);
+ private static int GetNumberOfConsumers() => Volatile.Read(ref _numberOfConsumers);
+
+ public static void ProducerCreated() => Interlocked.Increment(ref _numberOfProducers);
+ public static void ProducerDisposed() => Interlocked.Decrement(ref _numberOfProducers);
+ private static int GetNumberOfProducers() => Volatile.Read(ref _numberOfProducers);
+
+
+ public static bool MessageSentEnabled => _producerSendDuration.Enabled;
+ public static void MessageSent(long startTimestamp, KeyValuePair<string, object?>[] tags) =>
+ _producerSendDuration.Record(GetMillisecondsTillNow(startTimestamp), tags);
+
+ public static bool MessageProcessedEnabled => _consumerProcessDuration.Enabled;
+ public static void MessageProcessed(long startTimestamp, KeyValuePair<string, object?>[] tags)
+ => _consumerProcessDuration.Record(GetMillisecondsTillNow(startTimestamp), tags);
+
+ private static double GetMillisecondsTillNow(long startTimestamp)
+ {
+ var timestampDelta = Stopwatch.GetTimestamp() - startTimestamp;
+ var ticks = (long) (Constants.TimestampToTicks * timestampDelta);
+ return new TimeSpan(ticks).TotalMilliseconds;
+ }
+}
diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs
index 4ceae48..2c9777e 100644
--- a/src/DotPulsar/Internal/ProcessManager.cs
+++ b/src/DotPulsar/Internal/ProcessManager.cs
@@ -54,25 +54,25 @@
switch (e)
{
case ConsumerCreated _:
- DotPulsarEventSource.Log.ConsumerCreated();
+ DotPulsarMeter.ConsumerCreated();
break;
case ConsumerDisposed consumerDisposed:
Remove(consumerDisposed.CorrelationId);
- DotPulsarEventSource.Log.ConsumerDisposed();
+ DotPulsarMeter.ConsumerDisposed();
break;
case ProducerCreated _:
- DotPulsarEventSource.Log.ProducerCreated();
+ DotPulsarMeter.ProducerCreated();
break;
case ProducerDisposed producerDisposed:
Remove(producerDisposed.CorrelationId);
- DotPulsarEventSource.Log.ProducerDisposed();
+ DotPulsarMeter.ProducerDisposed();
break;
case ReaderCreated _:
- DotPulsarEventSource.Log.ReaderCreated();
+ DotPulsarMeter.ReaderCreated();
break;
case ReaderDisposed readerDisposed:
Remove(readerDisposed.CorrelationId);
- DotPulsarEventSource.Log.ReaderDisposed();
+ DotPulsarMeter.ReaderDisposed();
break;
default:
if (_processes.TryGetValue(e.CorrelationId, out var process))
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 9f4b1e7..c45d118 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -29,7 +29,8 @@
public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
{
private readonly string _operationName;
- private readonly KeyValuePair<string, object?>[] _tags;
+ private readonly KeyValuePair<string, object?>[] _activityTags;
+ private readonly KeyValuePair<string, object?>[] _meterTags;
private readonly SequenceId _sequenceId;
private readonly StateManager<ProducerState> _state;
private readonly IConnectionPool _connectionPool;
@@ -57,13 +58,17 @@
ICompressorFactory? compressorFactory)
{
_operationName = $"{options.Topic} send";
- _tags = new KeyValuePair<string, object?>[]
+ _activityTags = new KeyValuePair<string, object?>[]
{
new KeyValuePair<string, object?>("messaging.destination", options.Topic),
new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
new KeyValuePair<string, object?>("messaging.system", "pulsar"),
new KeyValuePair<string, object?>("messaging.url", serviceUrl),
};
+ _meterTags = new KeyValuePair<string, object?>[]
+ {
+ new KeyValuePair<string, object?>("topic", options.Topic)
+ };
_sequenceId = new SequenceId(options.InitialSequenceId);
_state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
ServiceUrl = serviceUrl;
@@ -238,15 +243,21 @@
if (autoAssignSequenceId)
metadata.SequenceId = _sequenceId.FetchNext();
- var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _tags);
+ var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _activityTags);
try
{
var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
var producer = _producers[partition];
var data = _options.Schema.Encode(message);
+
+ var startTimestamp = DotPulsarMeter.MessageSentEnabled ? Stopwatch.GetTimestamp() : 0;
+
var messageId = await producer.Send(metadata.Metadata, data, cancellationToken).ConfigureAwait(false);
+ if (startTimestamp != 0)
+ DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+
if (activity is not null && activity.IsAllDataRequested)
{
activity.SetMessageId(messageId);
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index 2cf4804..2ac035d 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -131,6 +131,12 @@
}
set
{
+ if (key is null)
+ throw new ArgumentNullException(nameof(key), $"The '{nameof(key)}' cannot be null");
+
+ if (value is null)
+ throw new ArgumentNullException(nameof(value), $"The '{nameof(value)}' cannot be null");
+
for (var i = 0; i < Metadata.Properties.Count; ++i)
{
var prop = Metadata.Properties[i];
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 2f9acb5..2767568 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -48,7 +48,7 @@
_exceptionHandler = exceptionHandler;
ServiceUrl = serviceUrl;
_isDisposed = 0;
- DotPulsarEventSource.Log.ClientCreated();
+ DotPulsarMeter.ClientCreated();
}
/// <summary>
@@ -164,7 +164,7 @@
if (_processManager is IAsyncDisposable disposable)
await disposable.DisposeAsync().ConfigureAwait(false);
- DotPulsarEventSource.Log.ClientDisposed();
+ DotPulsarMeter.ClientDisposed();
}
private void ThrowIfDisposed()