Fix single partition router test (#78)

* Fix single partition router test.

* small fix.
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 17788a5..9650efa 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -45,6 +45,11 @@
         IProducerBuilder<TMessage> Topic(string topic);
 
         /// <summary>
+        /// Set the message router for this producer. The default is RoundRobinPartitionRouter.
+        /// </summary>
+        IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter);
+
+        /// <summary>
         /// Create the producer.
         /// </summary>
         IProducer<TMessage> Create();
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index bcbaf69..742454c 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -22,7 +22,6 @@
     using System;
     using System.Collections.Generic;
     using System.Diagnostics;
-    using System.Linq;
     using System.Threading.Tasks;
     using Xunit;
     using Xunit.Abstractions;
@@ -74,14 +73,25 @@
             string topicName = $"single-partitioned-{Guid.NewGuid():N}";
             const string content = "test-message";
             const int partitions = 3;
+            const int msgCount = 3;
             var consumers = new List<IConsumer<string>>();
 
             await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
 
             //Act
-            await using var producer = client.NewProducer(Schema.String)
-                .Topic(topicName)
-                .Create();
+            for (var i = 0; i < partitions; ++i)
+            {
+                await using var producer = client.NewProducer(Schema.String)
+                    .Topic(topicName)
+                    .MessageRouter(new SinglePartitionRouter(i))
+                    .Create();
+
+                for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+                {
+                    await producer.Send($"{content}-{i}-{msgIndex}");
+                    _testOutputHelper.WriteLine($"Sent a message: {content}-{i}-{msgIndex}");
+                }
+            }
 
             for (var i = 0; i < partitions; ++i)
             {
@@ -92,11 +102,12 @@
                     .Create());
             }
 
-            await producer.Send(content);
-            _testOutputHelper.WriteLine($"Sent a message: {content}");
+            var msg = await consumers[1].GetLastMessageId();
 
             //Assert
-            (await Task.WhenAny(consumers.Select(c => c.Receive().AsTask()).ToList())).Result.Value().Should().Be(content);
+            for (var i = 0; i < partitions; ++i)
+            for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+                (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
         }
 
         [Fact]