Implementing (OpenTelemetry) tracing
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e90c41d..2964b04 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,7 +2,20 @@
All notable changes to this project will be documented in this file.
-The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+
+## Added
+
+- [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support following the [guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md) from the [OpenTelemetry](https://opentelemetry.io/) project
+ - Sending a message will create a producer trace and add tracing metadata to the message
+ - The 'Process' extension method for IConsumer\<TMessage\> is no longer experimental and will create a consumer trace
+
+### Changed
+
+- **Breaking**: Sending a message without metadata is now an extension method and therefore no longer part of the ISend\<TMessage\> (and thereby IProducer\<TMessage\>) interface
+- IMessageRouter: ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions) -> ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
## [1.1.2] - 2021-07-05
diff --git a/src/DotPulsar/Abstractions/IMessageRouter.cs b/src/DotPulsar/Abstractions/IMessageRouter.cs
index b6ad294..d0d873c 100644
--- a/src/DotPulsar/Abstractions/IMessageRouter.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -22,6 +22,6 @@
/// <summary>
/// Choose a partition.
/// </summary>
- int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions);
+ int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions);
}
}
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
index 642f61d..affe029 100644
--- a/src/DotPulsar/Abstractions/ISend.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -23,11 +23,6 @@
public interface ISend<TMessage>
{
/// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default);
-
- /// <summary>
/// Sends a message with metadata.
/// </summary>
ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index f9a78a0..88f12cd 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -16,9 +16,9 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Internal;
+ using DotPulsar.Internal.Extensions;
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -40,7 +40,7 @@
=> await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
- /// Process and auto-acknowledge a message. This is experimental.
+ /// Process and auto-acknowledge a message.
/// </summary>
public static async ValueTask Process<TMessage>(
this IConsumer<TMessage> consumer,
@@ -50,7 +50,7 @@
const string operation = "process";
var operationName = $"{consumer.Topic} {operation}";
- var tags = new List<KeyValuePair<string, object?>>
+ var tags = new KeyValuePair<string, object?>[]
{
new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
@@ -64,13 +64,13 @@
{
var message = await consumer.Receive(cancellationToken).ConfigureAwait(false);
- var activity = StartActivity(message, operationName, tags);
+ var activity = DotPulsarActivitySource.StartConsumerActivity(message, operationName, tags);
if (activity is not null && activity.IsAllDataRequested)
{
- activity.SetTag("messaging.message_id", message.MessageId.ToString());
- activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
- activity.SetTag("otel.status_code", "OK");
+ activity.SetMessageId(message.MessageId);
+ activity.SetPayloadSize(message.Data.Length);
+ activity.SetStatusCode("OK");
}
try
@@ -80,21 +80,7 @@
catch (Exception exception)
{
if (activity is not null && activity.IsAllDataRequested)
- {
- activity.SetTag("otel.status_code", "ERROR");
-
- var exceptionTags = new ActivityTagsCollection
- {
- { "exception.type", exception.GetType().FullName },
- { "exception.stacktrace", exception.ToString() }
- };
-
- if (!string.IsNullOrWhiteSpace(exception.Message))
- exceptionTags.Add("exception.message", exception.Message);
-
- var activityEvent = new ActivityEvent("exception", default, exceptionTags);
- activity.AddEvent(activityEvent);
- }
+ activity.AddException(exception);
}
activity?.Dispose();
@@ -103,31 +89,6 @@
}
}
- private static Activity? StartActivity(IMessage message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
- {
- if (!DotPulsarActivitySource.ActivitySource.HasListeners())
- return null;
-
- var properties = message.Properties;
-
- if (properties.TryGetValue("traceparent", out var traceparent)) // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
- {
- var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
- if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
- return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
- }
-
- var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
-
- if (activity is not null && activity.IsAllDataRequested)
- {
- foreach (var tag in tags)
- activity.SetTag(tag.Key, tag.Value);
- }
-
- return activity;
- }
-
/// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
diff --git a/src/DotPulsar/Internal/SendExtensions.cs b/src/DotPulsar/Extensions/SendExtensions.cs
similarity index 63%
rename from src/DotPulsar/Internal/SendExtensions.cs
rename to src/DotPulsar/Extensions/SendExtensions.cs
index 535408b..479fe69 100644
--- a/src/DotPulsar/Internal/SendExtensions.cs
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Extensions
{
using Abstractions;
+ using Microsoft.Extensions.ObjectPool;
using System;
using System.Buffers;
using System.Threading;
@@ -25,17 +26,25 @@
/// </summary>
public static class SendExtensions
{
+ private static readonly ObjectPool<MessageMetadata> _messageMetadataPool;
+
+ static SendExtensions()
+ {
+ var messageMetadataPolicy = new DefaultPooledObjectPolicy<MessageMetadata>();
+ _messageMetadataPool = new DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+ }
+
/// <summary>
/// Sends a message.
/// </summary>
public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, byte[] data, CancellationToken cancellationToken = default)
- => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+ => await Send(sender, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
/// <summary>
/// Sends a message.
/// </summary>
public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
- => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+ => await Send(sender, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
/// <summary>
/// Sends a message with metadata.
@@ -48,5 +57,23 @@
/// </summary>
public static async ValueTask<MessageId> Send(this ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
=> await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message without metadata.
+ /// </summary>
+ public static async ValueTask<MessageId> Send<TMessage>(this ISend<TMessage> sender, TMessage message, CancellationToken cancellationToken = default)
+ {
+ var metadata = _messageMetadataPool.Get();
+
+ try
+ {
+ return await sender.Send(metadata, message, cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ metadata.Metadata.Properties.Clear();
+ _messageMetadataPool.Return(metadata);
+ }
+ }
}
}
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 09c8464..87e2c3e 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -14,15 +14,70 @@
namespace DotPulsar.Internal
{
+ using DotPulsar.Abstractions;
+ using System.Collections.Generic;
using System.Diagnostics;
public static class DotPulsarActivitySource
{
+ private const string _traceParent = "traceparent";
+ private const string _traceState = "tracestate";
+
static DotPulsarActivitySource()
{
ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
}
public static ActivitySource ActivitySource { get; }
+
+ public static Activity? StartConsumerActivity(IMessage message, string operationName, KeyValuePair<string, object?>[] tags)
+ {
+ if (!ActivitySource.HasListeners())
+ return null;
+
+ var properties = message.Properties;
+
+ if (properties.TryGetValue(_traceParent, out var traceparent))
+ {
+ var tracestate = properties.ContainsKey(_traceState) ? properties[_traceState] : null;
+ if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+ return ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+ }
+
+ var activity = ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ for (var i = 0; i < tags.Length; ++i)
+ {
+ var tag = tags[i];
+ activity.SetTag(tag.Key, tag.Value);
+ }
+ }
+
+ return activity;
+ }
+
+ public static Activity? StartProducerActivity(MessageMetadata metadata, string operationName, KeyValuePair<string, object?>[] tags)
+ {
+ if (!ActivitySource.HasListeners())
+ return null;
+
+ var activity = ActivitySource.StartActivity(operationName, ActivityKind.Producer);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ metadata[_traceParent] = activity.TraceId.ToHexString();
+ metadata[_traceState] = activity.TraceStateString;
+
+ for (var i = 0; i < tags.Length; ++i)
+ {
+ var tag = tags[i];
+ activity.SetTag(tag.Key, tag.Value);
+ }
+ }
+
+ return activity;
+ }
}
}
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
new file mode 100644
index 0000000..850e93e
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions
+{
+ using System;
+ using System.Diagnostics;
+
+ public static class ActivityExtensions
+ {
+ private const string _exceptionEventName = "exception";
+ private const string _exceptionStackTrace = "exception.stacktrace";
+ private const string _exceptionType = "exception.type";
+ private const string _exceptionMessage = "exception.message";
+ private const string _messageId = "messaging.message_id";
+ private const string _payloadSize = "messaging.message_payload_size_bytes";
+ private const string _statusCode = "otel.status_code";
+
+ public static void AddException(this Activity activity, Exception exception)
+ {
+ activity.SetStatusCode("ERROR");
+
+ var exceptionTags = new ActivityTagsCollection
+ {
+ { _exceptionType, exception.GetType().FullName },
+ { _exceptionStackTrace, exception.ToString() }
+ };
+
+ if (!string.IsNullOrWhiteSpace(exception.Message))
+ exceptionTags.Add(_exceptionMessage, exception.Message);
+
+ var activityEvent = new ActivityEvent(_exceptionEventName, default, exceptionTags);
+ activity.AddEvent(activityEvent);
+ }
+
+ public static void SetMessageId(this Activity activity, MessageId messageId)
+ => activity.SetTag(_messageId, messageId.ToString());
+
+ public static void SetStatusCode(this Activity activity, string statusCode)
+ => activity.SetTag(_statusCode, statusCode);
+
+ public static void SetPayloadSize(this Activity activity, long payloadSize)
+ => activity.SetTag(_payloadSize, payloadSize);
+ }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index a84d7c9..425d02d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -19,21 +19,24 @@
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Internal.Extensions;
- using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Concurrent;
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public sealed class Producer<TMessage> : IProducer<TMessage>, IRegisterEvent
{
+ private readonly string _operationName;
+ private readonly KeyValuePair<string, object?>[] _tags;
+ private readonly SequenceId _sequenceId;
private readonly StateManager<ProducerState> _state;
private readonly IConnectionPool _connectionPool;
private readonly IHandleException _exceptionHandler;
private readonly ICompressorFactory? _compressorFactory;
private readonly ProducerOptions<TMessage> _options;
private readonly ProcessManager _processManager;
- private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers;
+ private readonly ConcurrentDictionary<int, SubProducer<TMessage>> _producers;
private readonly IMessageRouter _messageRouter;
private readonly CancellationTokenSource _cts;
private readonly IExecute _executor;
@@ -52,6 +55,15 @@
IConnectionPool connectionPool,
ICompressorFactory? compressorFactory)
{
+ _operationName = $"{options.Topic} send";
+ _tags = new KeyValuePair<string, object?>[]
+ {
+ new KeyValuePair<string, object?>("messaging.destination", options.Topic),
+ new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+ new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+ new KeyValuePair<string, object?>("messaging.url", serviceUrl),
+ };
+ _sequenceId = new SequenceId(options.InitialSequenceId);
_state = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
ServiceUrl = serviceUrl;
Topic = options.Topic;
@@ -64,7 +76,7 @@
_messageRouter = options.MessageRouter;
_cts = new CancellationTokenSource();
_executor = new Executor(Guid.Empty, this, _exceptionHandler);
- _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
+ _producers = new ConcurrentDictionary<int, SubProducer<TMessage>>();
_ = Setup();
}
@@ -149,12 +161,11 @@
var correlationId = Guid.NewGuid();
var producerName = _options.ProducerName;
var schema = _options.Schema;
- var initialSequenceId = _options.InitialSequenceId;
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, _compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
- var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
+ var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, _processManager, initialChannel, executor, stateManager, factory, schema);
var process = new ProducerProcess(correlationId, stateManager, producer);
_processManager.Add(process);
process.Start();
@@ -164,12 +175,12 @@
private async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
{
var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
- var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
+ var commandPartitionedMetadata = new PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
- response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+ response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
- if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+ if (response.PartitionMetadataResponse.Response == PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
response.PartitionMetadataResponse.Throw();
return response.PartitionMetadataResponse.Partitions;
@@ -203,10 +214,8 @@
}
}
- private async ValueTask<int> ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken cancellationToken)
+ private async ValueTask<int> ChoosePartitions(MessageMetadata metadata, CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
if (_producerCount == 0)
{
_ = await _state.StateChangedFrom(ProducerState.Disconnected, cancellationToken).ConfigureAwait(false);
@@ -220,16 +229,46 @@
return _messageRouter.ChoosePartition(metadata, _producerCount);
}
- public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
+ public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
{
- var partition = await ChoosePartitions(null, cancellationToken).ConfigureAwait(false);
- return await _producers[partition].Send(message, cancellationToken).ConfigureAwait(false);
- }
+ ThrowIfDisposed();
- public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
- {
- var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
- return await _producers[partition].Send(metadata, message, cancellationToken).ConfigureAwait(false);
+ var autoAssignSequenceId = metadata.SequenceId == 0;
+ if (autoAssignSequenceId)
+ metadata.SequenceId = _sequenceId.FetchNext();
+
+ var activity = DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _tags);
+
+ try
+ {
+ var partition = await ChoosePartitions(metadata, cancellationToken).ConfigureAwait(false);
+ var producer = _producers[partition];
+ var data = _options.Schema.Encode(message);
+ var messageId = await producer.Send(metadata.Metadata, data, cancellationToken).ConfigureAwait(false);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ activity.SetMessageId(messageId);
+ activity.SetPayloadSize(data.Length);
+ activity.SetStatusCode("OK");
+ }
+
+ return messageId;
+ }
+ catch (Exception exception)
+ {
+ if (activity is not null && activity.IsAllDataRequested)
+ activity.AddException(exception);
+
+ throw;
+ }
+ finally
+ {
+ activity?.Dispose();
+
+ if (autoAssignSequenceId)
+ metadata.SequenceId = 0;
+ }
}
private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/SubProducer.cs b/src/DotPulsar/Internal/SubProducer.cs
index 78022d0..95fa217 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -16,10 +16,8 @@
{
using Abstractions;
using DotPulsar.Abstractions;
- using DotPulsar.Exceptions;
using DotPulsar.Internal.Extensions;
using Events;
- using Microsoft.Extensions.ObjectPool;
using System;
using System.Buffers;
using System.Threading;
@@ -27,7 +25,6 @@
public sealed class SubProducer<TMessage> : IEstablishNewChannel, IProducer<TMessage>
{
- private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private IProducerChannel _channel;
@@ -35,7 +32,6 @@
private readonly IStateChanged<ProducerState> _state;
private readonly IProducerChannelFactory _factory;
private readonly ISchema<TMessage> _schema;
- private readonly SequenceId _sequenceId;
private int _isDisposed;
public Uri ServiceUrl { get; }
@@ -45,7 +41,6 @@
Guid correlationId,
Uri serviceUrl,
string topic,
- ulong initialSequenceId,
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
@@ -53,12 +48,9 @@
IProducerChannelFactory factory,
ISchema<TMessage> schema)
{
- var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
- _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
_correlationId = correlationId;
ServiceUrl = serviceUrl;
Topic = topic;
- _sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
_channel = initialChannel;
_executor = executor;
@@ -91,48 +83,14 @@
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
- => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
- => await Send(metadata, _schema.Encode(message), cancellationToken).ConfigureAwait(false);
+ => await _executor.Execute(() => InternalSend(metadata.Metadata, _schema.Encode(message), cancellationToken), cancellationToken).ConfigureAwait(false);
- public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
+ public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+ => await _executor.Execute(() => InternalSend(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- var metadata = _messageMetadataPool.Get();
- try
- {
- metadata.SequenceId = _sequenceId.FetchNext();
- return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _messageMetadataPool.Return(metadata);
- }
- }
-
- public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var autoAssignSequenceId = metadata.SequenceId == 0;
- if (autoAssignSequenceId)
- metadata.SequenceId = _sequenceId.FetchNext();
-
- try
- {
- return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- if (autoAssignSequenceId)
- metadata.SequenceId = 0;
- }
- }
-
- private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+ private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
return response.MessageId.ToMessageId();
@@ -148,11 +106,5 @@
if (oldChannel is not null)
await oldChannel.DisposeAsync().ConfigureAwait(false);
}
-
- private void ThrowIfDisposed()
- {
- if (_isDisposed != 0)
- throw new ProducerDisposedException(typeof(Producer<TMessage>).FullName!);
- }
}
}
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 0a972bb..6176d40 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -38,9 +38,9 @@
/// <summary>
/// Choose a partition in round robin routing mode
/// </summary>
- public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
+ public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
- var keyBytes = messageMetadata?.KeyBytes;
+ var keyBytes = messageMetadata.KeyBytes;
if (keyBytes is not null)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index e9ecfbc..cc2562b 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -46,9 +46,9 @@
/// <summary>
/// Choose a partition in single partition routing mode
/// </summary>
- public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
+ public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
- var keyBytes = messageMetadata?.KeyBytes;
+ var keyBytes = messageMetadata.KeyBytes;
if (keyBytes is not null)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;