Added extension methods for handling conversation id and make ready for release 2.0.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b48d335..b5755bc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,7 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [2.0.0] - 2021-11-12
## Added
@@ -12,12 +12,15 @@
- [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support following the [guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md) from the [OpenTelemetry](https://opentelemetry.io/) project
- Sending a message will create a producer trace and add tracing metadata to the message
- The 'Process' extension method for IConsumer\<TMessage\> is no longer experimental and will create a consumer trace
+ - The 'GetConversationId' extension method for IMessage has been added
+ - The 'SetConversationId' and 'GetConversationId' extension methods for MessageMetadata have been added
- The client will send a 'ping' if there has been no activity on the connection for 30 seconds. This default can be changed by setting the 'KeepAliveInterval' on the IPulsarClientBuilder
### Changed
- **Breaking**: Sending a message without metadata is now an extension method and therefore no longer part of the ISend\<TMessage\> (and thereby IProducer\<TMessage\>) interface
- IMessageRouter: ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions) -> ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
+- The default behavior for the IOException has changed from 'Rethrow' to 'Retry'
- The default behavior for the MetadataException has changed from 'Retry' to 'Rethrow', meaning that it will fault the consumer, reader, and/or producer
## [1.1.2] - 2021-07-05
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index bedfefc..474afe3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0;net6.0</TargetFrameworks>
- <Version>1.1.2</Version>
+ <Version>2.0.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Extensions/MessageExtensions.cs b/src/DotPulsar/Extensions/MessageExtensions.cs
new file mode 100644
index 0000000..236f193
--- /dev/null
+++ b/src/DotPulsar/Extensions/MessageExtensions.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Internal;
+
+/// <summary>
+/// Extensions for IMessage.
+/// </summary>
+public static class MessageExtensions
+{
+ /// <summary>
+ /// Get the conversation id.
+ /// </summary>
+ public static string? GetConversationId(this IMessage message)
+ {
+ if (message.Properties.TryGetValue(Constants.ConversationId, out var conversationId))
+ return conversationId;
+
+ return null;
+ }
+}
diff --git a/src/DotPulsar/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Extensions/MessageMetadataExtensions.cs
new file mode 100644
index 0000000..896ea5d
--- /dev/null
+++ b/src/DotPulsar/Extensions/MessageMetadataExtensions.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions;
+
+using DotPulsar.Internal;
+
+/// <summary>
+/// Extensions for MessageMetadata.
+/// </summary>
+public static class MessageMetadataExtensions
+{
+ /// <summary>
+ /// Set the conversation id.
+ /// </summary>
+ public static void SetConversationId(this MessageMetadata messageMetadata, string conversationId)
+ => messageMetadata[Constants.ConversationId] = conversationId;
+
+ /// <summary>
+ /// Get the conversation id.
+ /// </summary>
+ public static string? GetConversationId(this MessageMetadata messageMetadata)
+ => messageMetadata[Constants.ConversationId];
+}
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index 0630812..d34b27b 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -39,6 +39,7 @@
MagicNumber = new byte[] { 0x0e, 0x01 };
MetadataSizeOffset = 6;
MetadataOffset = 10;
+ ConversationId = "messaging.conversation_id";
}
public static string ClientName { get; }
@@ -51,4 +52,5 @@
public static byte[] MagicNumber { get; }
public static int MetadataSizeOffset { get; }
public static int MetadataOffset { get; }
+ public static string ConversationId { get; }
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index f143ead..a42eeef 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -18,6 +18,7 @@
using DotPulsar.Exceptions;
using Exceptions;
using System;
+using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -53,6 +54,7 @@
AsyncQueueDisposedException _ => FaultAction.Retry,
OperationCanceledException _ => cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry,
DotPulsarException _ => FaultAction.Rethrow,
+ IOException _ => FaultAction.Retry,
SocketException socketException => socketException.SocketErrorCode switch
{
SocketError.HostNotFound => FaultAction.Rethrow,
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 641d927..5946d39 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -15,13 +15,13 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using DotPulsar.Internal.Extensions;
using System.Collections.Generic;
using System.Diagnostics;
public static class DotPulsarActivitySource
{
- private const string _conversationId = "conversation_id";
-
static DotPulsarActivitySource()
{
ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
@@ -34,23 +34,7 @@
if (!ActivitySource.HasListeners())
return null;
- var activity = ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
-
- if (activity is not null && activity.IsAllDataRequested)
- {
- for (var i = 0; i < tags.Length; ++i)
- {
- var tag = tags[i];
- activity.SetTag(tag.Key, tag.Value);
- }
-
-
- var properties = message.Properties;
- if (properties.TryGetValue(_conversationId, out var conversationId))
- activity.SetTag(_conversationId, conversationId);
- }
-
- return activity;
+ return StartActivity(operationName, ActivityKind.Consumer, tags, message.GetConversationId());
}
public static Activity? StartProducerActivity(MessageMetadata metadata, string operationName, KeyValuePair<string, object?>[] tags)
@@ -58,7 +42,12 @@
if (!ActivitySource.HasListeners())
return null;
- var activity = ActivitySource.StartActivity(operationName, ActivityKind.Producer);
+ return StartActivity(operationName, ActivityKind.Producer, tags, metadata.GetConversationId());
+ }
+
+ private static Activity? StartActivity(string operationName, ActivityKind kind, KeyValuePair<string, object?>[] tags, string? conversationId)
+ {
+ var activity = ActivitySource.StartActivity(operationName, kind);
if (activity is not null && activity.IsAllDataRequested)
{
@@ -68,9 +57,8 @@
activity.SetTag(tag.Key, tag.Value);
}
- var conversationId = metadata[_conversationId];
if (conversationId is not null)
- activity.SetTag(_conversationId, conversationId);
+ activity.SetConversationId(conversationId);
}
return activity;
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
index bb8e16a..77cfa67 100644
--- a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -46,6 +46,9 @@
public static void SetMessageId(this Activity activity, MessageId messageId)
=> activity.SetTag(_messageId, messageId.ToString());
+ public static void SetConversationId(this Activity activity, string conversationId)
+ => activity.SetTag(Constants.ConversationId, conversationId);
+
public static void SetPayloadSize(this Activity activity, long payloadSize)
=> activity.SetTag(_payloadSize, payloadSize);
}