Only hash the key if the array is not empty
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 6176d40..8a932ec 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -41,7 +41,7 @@
public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata.KeyBytes;
- if (keyBytes is not null)
+ if (keyBytes is not null && keyBytes.Length > 0)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
return Interlocked.Increment(ref _partitionIndex) % numberOfPartitions;
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index cc2562b..e117653 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -49,7 +49,7 @@
public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata.KeyBytes;
- if (keyBytes is not null)
+ if (keyBytes is not null && keyBytes.Length > 0)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
if (_partitionIndex == -1)
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index 742454c..6384ed9 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -69,30 +69,16 @@
public async Task SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition()
{
//Arrange
- await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
- string topicName = $"single-partitioned-{Guid.NewGuid():N}";
+ var serviceUrl = _pulsarService.GetBrokerUri();
const string content = "test-message";
const int partitions = 3;
const int msgCount = 3;
- var consumers = new List<IConsumer<string>>();
-
+ var topicName = $"single-partitioned-{Guid.NewGuid():N}";
await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+ await using var client = PulsarClient.Builder().ServiceUrl(serviceUrl).Build();
//Act
- for (var i = 0; i < partitions; ++i)
- {
- await using var producer = client.NewProducer(Schema.String)
- .Topic(topicName)
- .MessageRouter(new SinglePartitionRouter(i))
- .Create();
-
- for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
- {
- await producer.Send($"{content}-{i}-{msgIndex}");
- _testOutputHelper.WriteLine($"Sent a message: {content}-{i}-{msgIndex}");
- }
- }
-
+ var consumers = new List<IConsumer<string>>();
for (var i = 0; i < partitions; ++i)
{
consumers.Add(client.NewConsumer(Schema.String)
@@ -102,12 +88,32 @@
.Create());
}
- var msg = await consumers[1].GetLastMessageId();
+ for (var i = 0; i < partitions; ++i)
+ {
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .MessageRouter(new SinglePartitionRouter(i))
+ .Create();
+
+ for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+ {
+ var message = $"{content}-{i}-{msgIndex}";
+ _ = await producer.Send(message);
+ _testOutputHelper.WriteLine($"Sent a message: {message}");
+ }
+ }
//Assert
for (var i = 0; i < partitions; ++i)
- for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
- (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
+ {
+ var consumer = consumers[i];
+
+ for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+ {
+ var message = await consumer.Receive();
+ message.Value().Should().Be($"{content}-{i}-{msgIndex}");
+ }
+ }
}
[Fact]