Minor cleanup and updated NuGet package
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 9c2ef0e..42a706b 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -71,7 +71,7 @@
{
ProducerState.Connected => "is connected",
ProducerState.Disconnected => "is disconnected",
- ProducerState.PartiallyConnected => "has partially connected",
+ ProducerState.PartiallyConnected => "is partially connected",
ProducerState.Closed => "has closed",
ProducerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ProducerState}'"
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 9cce4c3..0613e86 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.7" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index ffa9295..9cea7cc 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -192,7 +192,7 @@
};
public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command)
- => new BaseCommand
+ => new()
{
CommandType = BaseCommand.Type.PartitionedMetadata, PartitionMetadata = command
};
diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
index 3e558fc..f2bac5f 100644
--- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal.Extensions
{
using System;
+ using System.Text;
using Metadata = PulsarApi.MessageMetadata;
public static class MessageMetadataExtensions
@@ -47,7 +48,15 @@
// Key
public static byte[]? GetKeyAsBytes(this Metadata metadata)
- => metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null;
+ {
+ if (metadata.PartitionKey is null)
+ return null;
+
+ if (metadata.PartitionKeyB64Encoded)
+ return Convert.FromBase64String(metadata.PartitionKey);
+
+ return Encoding.UTF8.GetBytes(metadata.PartitionKey);
+ }
public static void SetKey(this Metadata metadata, string? key)
{
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 95a7bdc..7470fe9 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -85,7 +85,8 @@
StateChangedHandler = _stateChangedHandler
};
- if (_messageRouter != null) options.MessageRouter = _messageRouter;
+ if (_messageRouter is not null)
+ options.MessageRouter = _messageRouter;
return _pulsarClient.CreateProducer<TMessage>(options);
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index e7230fe..36834df 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -79,8 +79,6 @@
case ProducerState.Faulted:
_stateManager.SetState(ProducerState.Faulted);
break;
- case ProducerState.PartiallyConnected: break;
- default: throw new ArgumentOutOfRangeException();
}
break;
diff --git a/src/DotPulsar/Internal/PulsarClientFactory.cs b/src/DotPulsar/Internal/PulsarClientFactory.cs
index fc58aa9..e703f73 100644
--- a/src/DotPulsar/Internal/PulsarClientFactory.cs
+++ b/src/DotPulsar/Internal/PulsarClientFactory.cs
@@ -21,8 +21,6 @@
public sealed class PulsarClientFactory
{
public static PulsarClient CreatePulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler, Uri serviceUrl)
- {
- return new PulsarClient(connectionPool, processManager, exceptionHandler, serviceUrl);
- }
+ => new(connectionPool, processManager, exceptionHandler, serviceUrl);
}
}
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 3e8cd0e..1444198 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -74,7 +74,7 @@
public string Topic { get; set; }
/// <summary>
- /// Set the message router. The default router is Round Robin partition router.
+ /// Set the message router. The default router is the Round Robin partition router.
/// </summary>
public IMessageRouter MessageRouter { get; set; }
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 5415485..453ba26 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -105,9 +105,7 @@
ICompressorFactory? compressorFactory = null;
if (partitionIndex.HasValue)
- {
topic = $"{topic}-partition-{partitionIndex}";
- }
if (options.CompressionType != CompressionType.None)
{
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 05595d6..a7da420 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -16,7 +16,6 @@
{
using Abstractions;
using HashDepot;
- using System.Text;
using System.Threading;
/// <summary>
@@ -31,14 +30,13 @@
private int _partitionIndex = -1;
/// <summary>
- /// Choose a partition in round robin routig mode
+ /// Choose a partition in round robin routing mode
/// </summary>
public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
{
- if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
- {
- return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
- }
+ var keyBytes = messageMetadata?.KeyBytes;
+ if (keyBytes is not null)
+ return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount;
return Interlocked.Increment(ref _partitionIndex) % partitionsCount;
}
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index 3a2b563..69a82f5 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -17,7 +17,6 @@
using Abstractions;
using HashDepot;
using System;
- using System.Text;
/// <summary>
/// If no key is provided, the producer will randomly pick one single partition and publish all the messages
@@ -38,13 +37,11 @@
/// </summary>
public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
{
- if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key))
- {
- return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount;
- }
-
+ var keyBytes = messageMetadata?.KeyBytes;
+ if (keyBytes is not null)
+ return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount;
+
_partitionIndex ??= new Random().Next(0, partitionsCount);
-
return _partitionIndex.Value;
}
}