Simplify the producer channel
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 9fe2eb1..b793181 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,6 @@
public interface IProducerChannel : IAsyncDisposable
{
- Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index da2aacd..8f4ed1a 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -18,6 +18,7 @@
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Events;
+ using Microsoft.Extensions.ObjectPool;
using System;
using System.Buffers;
using System.Threading;
@@ -25,6 +26,7 @@
public sealed class Producer : IProducer
{
+ private readonly ObjectPool<PulsarApi.MessageMetadata> _messageMetadataPool;
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private IProducerChannel _channel;
@@ -44,6 +46,8 @@
IExecute executor,
IStateChanged<ProducerState> state)
{
+ var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
+ _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
_correlationId = correlationId;
Topic = topic;
_sequenceId = new SequenceId(initialSequenceId);
@@ -93,9 +97,17 @@
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var sequenceId = _sequenceId.FetchNext();
- var response = await _executor.Execute(() => _channel.Send(sequenceId, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+ var metadata = _messageMetadataPool.Get();
+ try
+ {
+ metadata.SequenceId = _sequenceId.FetchNext();
+ var response = await _executor.Execute(() => _channel.Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
+ return new MessageId(response.MessageId);
+ }
+ finally
+ {
+ _messageMetadataPool.Return(metadata);
+ }
}
public ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index e7e062b..05390a9 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -25,7 +25,6 @@
public sealed class ProducerChannel : IProducerChannel
{
- private readonly ObjectPool<MessageMetadata> _messageMetadataPool;
private readonly ObjectPool<SendPackage> _sendPackagePool;
private readonly ulong _id;
private readonly string _name;
@@ -33,8 +32,6 @@
public ProducerChannel(ulong id, string name, IConnection connection)
{
- var messageMetadataPolicy = new DefaultPooledObjectPolicy<MessageMetadata>();
- _messageMetadataPool = new DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
_sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
_id = id;
@@ -55,50 +52,28 @@
}
}
- public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- {
- var metadata = _messageMetadataPool.Get();
- metadata.ProducerName = _name;
- metadata.SequenceId = sequenceId;
- try
- {
- return SendPackage(metadata, payload, cancellationToken);
- }
- finally
- {
- _messageMetadataPool.Return(metadata);
- }
- }
-
- public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- {
- metadata.ProducerName = _name;
- return SendPackage(metadata, payload, cancellationToken);
- }
-
- private async Task<CommandSendReceipt> SendPackage(
- MessageMetadata metadata,
- ReadOnlySequence<byte> payload,
- CancellationToken cancellationToken)
+ public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
var sendPackage = _sendPackagePool.Get();
- if (sendPackage.Command is null)
- {
- sendPackage.Command = new CommandSend
- {
- ProducerId = _id,
- NumMessages = 1
- };
- }
-
- sendPackage.Command.SequenceId = metadata.SequenceId;
- sendPackage.Metadata = metadata;
- sendPackage.Payload = payload;
- metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // TODO Benchmark against StopWatch
-
try
{
+ metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ metadata.ProducerName = _name;
+
+ if (sendPackage.Command is null)
+ {
+ sendPackage.Command = new CommandSend
+ {
+ ProducerId = _id,
+ NumMessages = 1
+ };
+ }
+
+ sendPackage.Command.SequenceId = metadata.SequenceId;
+ sendPackage.Metadata = metadata;
+ sendPackage.Payload = payload;
+
var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
return response.SendReceipt;