Fix single partition router test (#78)
* Fix single partition router test.
* small fix.
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 17788a5..9650efa 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -45,6 +45,11 @@
IProducerBuilder<TMessage> Topic(string topic);
/// <summary>
+ /// Set the message router for this producer. The default is RoundRobinPartitionRouter.
+ /// </summary>
+ IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter);
+
+ /// <summary>
/// Create the producer.
/// </summary>
IProducer<TMessage> Create();
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index bcbaf69..742454c 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -22,7 +22,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
- using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
@@ -74,14 +73,25 @@
string topicName = $"single-partitioned-{Guid.NewGuid():N}";
const string content = "test-message";
const int partitions = 3;
+ const int msgCount = 3;
var consumers = new List<IConsumer<string>>();
await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
//Act
- await using var producer = client.NewProducer(Schema.String)
- .Topic(topicName)
- .Create();
+ for (var i = 0; i < partitions; ++i)
+ {
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .MessageRouter(new SinglePartitionRouter(i))
+ .Create();
+
+ for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+ {
+ await producer.Send($"{content}-{i}-{msgIndex}");
+ _testOutputHelper.WriteLine($"Sent a message: {content}-{i}-{msgIndex}");
+ }
+ }
for (var i = 0; i < partitions; ++i)
{
@@ -92,11 +102,12 @@
.Create());
}
- await producer.Send(content);
- _testOutputHelper.WriteLine($"Sent a message: {content}");
+ var msg = await consumers[1].GetLastMessageId();
//Assert
- (await Task.WhenAny(consumers.Select(c => c.Receive().AsTask()).ToList())).Result.Value().Should().Be(content);
+ for (var i = 0; i < partitions; ++i)
+ for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+ (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
}
[Fact]