Remove locking from the producers, consumers and readers.
Use object pooling to improve performance of multi-threaded usage of the producers.
Make consumers more thread-safe when calling Messages() and other methods.
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index a3d5c47..19c4995 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -21,6 +21,7 @@
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="2.4.6" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.2" />
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index cb6ab13..3da59f3 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -22,12 +22,14 @@
public sealed class BatchHandler
{
+ private readonly object _lock;
private readonly bool _trackBatches;
private readonly Queue<Message> _messages;
private readonly LinkedList<Batch> _batches;
public BatchHandler(bool trackBatches)
{
+ _lock = new object();
_trackBatches = trackBatches;
_messages = new Queue<Message>();
_batches = new LinkedList<Batch>();
@@ -35,56 +37,68 @@
public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
- if (_trackBatches)
- _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
-
- long index = 0;
-
- for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+ lock (_lock)
{
- var singleMetadataSize = data.ReadUInt32(index, true);
- index += 4;
- var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
- index += singleMetadataSize;
- var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
- var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
- _messages.Enqueue(message);
- index += (uint) singleMetadata.PayloadSize;
- }
+ if (_trackBatches)
+ _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
- return _messages.Dequeue();
+ long index = 0;
+
+ for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+ {
+ var singleMetadataSize = data.ReadUInt32(index, true);
+ index += 4;
+ var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+ index += singleMetadataSize;
+ var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+ var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+ _messages.Enqueue(message);
+ index += (uint) singleMetadata.PayloadSize;
+ }
+
+ return _messages.Dequeue();
+ }
}
public Message? GetNext()
- => _messages.Count == 0 ? null : _messages.Dequeue();
+ {
+ lock (_lock)
+ return _messages.Count == 0 ? null : _messages.Dequeue();
+ }
public void Clear()
{
- _messages.Clear();
- _batches.Clear();
+ lock (_lock)
+ {
+ _messages.Clear();
+ _batches.Clear();
+ }
}
public MessageIdData? Acknowledge(MessageIdData messageId)
{
- foreach (var batch in _batches)
+ lock (_lock)
{
- if (messageId.LedgerId != batch.MessageId.LedgerId ||
- messageId.EntryId != batch.MessageId.EntryId ||
- messageId.Partition != batch.MessageId.Partition)
- continue;
-
- batch.Acknowledge(messageId.BatchIndex);
-
- if (batch.IsAcknowledged())
+ foreach (var batch in _batches)
{
- _batches.Remove(batch);
- return batch.MessageId;
+ if (messageId.LedgerId != batch.MessageId.LedgerId ||
+ messageId.EntryId != batch.MessageId.EntryId ||
+ messageId.Partition != batch.MessageId.Partition)
+ continue;
+
+ batch.Acknowledge(messageId.BatchIndex);
+
+ if (batch.IsAcknowledged())
+ {
+ _batches.Remove(batch);
+ return batch.MessageId;
+ }
+
+ break;
}
- break;
+ return null;
}
-
- return null;
}
private sealed class Batch
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 989aa36..d444586 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,6 +18,7 @@
using Exceptions;
using Extensions;
using PulsarApi;
+ using System;
using System.Threading;
using System.Threading.Tasks;
@@ -170,6 +171,12 @@
{
ThrowIfDisposed();
+ if (command.Command is null)
+ throw new ArgumentNullException(nameof(command.Command));
+
+ if (command.Metadata is null)
+ throw new ArgumentNullException(nameof(command.Metadata));
+
Task<BaseCommand>? response;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index fe5fce3..c6162d2 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -18,6 +18,7 @@
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Events;
+ using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
@@ -31,8 +32,7 @@
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private IConsumerChannel _channel;
- private readonly CommandAck _cachedCommandAck;
- private readonly CommandRedeliverUnacknowledgedMessages _cachedCommandRedeliverUnacknowledgedMessages;
+ private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
private readonly IStateChanged<ConsumerState> _state;
private int _isDisposed;
@@ -53,8 +53,7 @@
_channel = initialChannel;
_executor = executor;
_state = state;
- _cachedCommandAck = new CommandAck();
- _cachedCommandRedeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
+ _commandAckPool = new DefaultObjectPool<CommandAck>(new DefaultPooledObjectPolicy<CommandAck>());
_isDisposed = 0;
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
@@ -117,7 +116,8 @@
{
ThrowIfDisposed();
- _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken).ConfigureAwait(false);
+ var unsubscribe = new CommandUnsubscribe();
+ _ = await _executor.Execute(() => _channel.Send(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -136,7 +136,8 @@
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken).ConfigureAwait(false);
+ var getLastMessageId = new CommandGetLastMessageId();
+ var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
return new MessageId(response.LastMessageId);
}
@@ -145,26 +146,37 @@
{
ThrowIfDisposed();
- await _executor.Execute(() =>
+ var commandAck = _commandAckPool.Get();
+ commandAck.Type = ackType;
+ commandAck.MessageIds.Clear();
+ commandAck.MessageIds.Add(messageIdData);
+
+ try
{
- _cachedCommandAck.Type = ackType;
- _cachedCommandAck.MessageIds.Clear();
- _cachedCommandAck.MessageIds.Add(messageIdData);
- return _channel.Send(_cachedCommandAck, cancellationToken);
- }, cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() =>
+ {
+ return _channel.Send(commandAck, cancellationToken);
+ }, cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ _commandAckPool.Return(commandAck);
+ }
}
private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
{
ThrowIfDisposed();
+ var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
+ redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
+
await _executor.Execute(() =>
{
- _cachedCommandRedeliverUnacknowledgedMessages.MessageIds.Clear();
- _cachedCommandAck.MessageIds.AddRange(messageIds);
- return _channel.Send(_cachedCommandRedeliverUnacknowledgedMessages, cancellationToken);
+ return _channel.Send(redeliverUnacknowledgedMessages, cancellationToken);
}, cancellationToken).ConfigureAwait(false);
}
+
internal async ValueTask SetChannel(IConsumerChannel channel)
{
if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index c08649d..a84e574 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -28,6 +28,7 @@
private readonly IConnection _connection;
private readonly BatchHandler _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
+ private readonly AsyncLock _lock;
private uint _sendWhenZero;
private bool _firstFlow;
@@ -43,6 +44,8 @@
_connection = connection;
_batchHandler = batchHandler;
+ _lock = new AsyncLock();
+
_cachedCommandFlow = new CommandFlow
{
ConsumerId = id,
@@ -55,35 +58,38 @@
public async ValueTask<Message> Receive(CancellationToken cancellationToken)
{
- while (true)
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- if (_sendWhenZero == 0)
- await SendFlow(cancellationToken).ConfigureAwait(false);
-
- _sendWhenZero--;
-
- var message = _batchHandler.GetNext();
-
- if (message != null)
- return message;
-
- var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
-
- if (!messagePackage.IsValid())
+ while (true)
{
- await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
- continue;
+ if (_sendWhenZero == 0)
+ await SendFlow(cancellationToken).ConfigureAwait(false);
+
+ _sendWhenZero--;
+
+ var message = _batchHandler.GetNext();
+
+ if (message != null)
+ return message;
+
+ var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+ if (!messagePackage.IsValid())
+ {
+ await RejectPackage(messagePackage, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+
+ var metadataSize = messagePackage.GetMetadataSize();
+ var redeliveryCount = messagePackage.RedeliveryCount;
+ var data = messagePackage.ExtractData(metadataSize);
+ var metadata = messagePackage.ExtractMetadata(metadataSize);
+ var messageId = messagePackage.MessageId;
+
+ return metadata.NumMessagesInBatch == 1
+ ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
+ : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
}
-
- var metadataSize = messagePackage.GetMetadataSize();
- var redeliveryCount = messagePackage.RedeliveryCount;
- var data = messagePackage.ExtractData(metadataSize);
- var metadata = messagePackage.ExtractMetadata(metadataSize);
- var messageId = messagePackage.MessageId;
-
- return metadata.NumMessagesInBatch == 1
- ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
- : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
}
}
@@ -141,8 +147,9 @@
try
{
_queue.Dispose();
-
- await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false);
+ await _lock.DisposeAsync();
+ var closeConsumer = new CommandCloseConsumer { ConsumerId = _id };
+ await _connection.Send(closeConsumer, CancellationToken.None).ConfigureAwait(false);
}
catch
{
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 3303d01..efe5bc0 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -91,7 +91,6 @@
{
ThrowIfDisposed();
var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false);
-
return new MessageId(response.MessageId);
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 4ae8503..824ed87 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -16,6 +16,7 @@
{
using Abstractions;
using Extensions;
+ using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Buffers;
@@ -25,20 +26,18 @@
public sealed class ProducerChannel : IProducerChannel
{
private readonly MessageMetadata _cachedMetadata;
- private readonly SendPackage _cachedSendPackage;
+ private readonly ObjectPool<SendPackage> _sendPackagePool;
private readonly ulong _id;
+ private readonly string _name;
private readonly SequenceId _sequenceId;
private readonly IConnection _connection;
public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
{
_cachedMetadata = new MessageMetadata { ProducerName = name };
-
- var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 };
-
- _cachedSendPackage = new SendPackage(commandSend, _cachedMetadata);
-
+ _sendPackagePool = new DefaultObjectPool<SendPackage>(new DefaultPooledObjectPolicy<SendPackage>());
_id = id;
+ _name = name;
_sequenceId = sequenceId;
_connection = connection;
}
@@ -47,7 +46,8 @@
{
try
{
- await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false);
+ var closeProducer = new CommandCloseProducer { ProducerId = _id };
+ await _connection.Send(closeProducer, CancellationToken.None).ConfigureAwait(false);
}
catch
{
@@ -56,37 +56,47 @@
}
public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- {
- _cachedSendPackage.Metadata = _cachedMetadata;
- _cachedSendPackage.Payload = payload;
-
- return SendPackage(true, cancellationToken);
- }
+ => SendPackage(_cachedMetadata, payload, true, cancellationToken);
public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
- metadata.ProducerName = _cachedMetadata.ProducerName;
- _cachedSendPackage.Metadata = metadata;
- _cachedSendPackage.Payload = payload;
-
- return SendPackage(metadata.SequenceId == 0, cancellationToken);
+ metadata.ProducerName = _name;
+ return SendPackage(metadata, payload, metadata.SequenceId == 0, cancellationToken);
}
- private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
+ private async Task<CommandSendReceipt> SendPackage(
+ MessageMetadata metadata,
+ ReadOnlySequence<byte> payload,
+ bool autoAssignSequenceId,
+ CancellationToken cancellationToken)
{
+ var sendPackage = _sendPackagePool.Get();
+
+ if (sendPackage.Command is null)
+ {
+ sendPackage.Command = new CommandSend
+ {
+ ProducerId = _id,
+ NumMessages = 1
+ };
+ }
+
+ sendPackage.Metadata = metadata;
+ sendPackage.Payload = payload;
+
try
{
- _cachedSendPackage.Metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (autoAssignSequenceId)
{
- _cachedSendPackage.Command.SequenceId = _sequenceId.Current;
- _cachedSendPackage.Metadata.SequenceId = _sequenceId.Current;
+ sendPackage.Command.SequenceId = _sequenceId.Current;
+ sendPackage.Metadata.SequenceId = _sequenceId.Current;
}
else
- _cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
+ sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
- var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false);
+ var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
if (autoAssignSequenceId)
@@ -98,7 +108,9 @@
{
// Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
if (autoAssignSequenceId)
- _cachedSendPackage.Metadata.SequenceId = 0;
+ sendPackage.Metadata.SequenceId = 0;
+
+ _sendPackagePool.Return(sendPackage);
}
}
}
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index 297b248..f1b87db 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -19,14 +19,8 @@
public sealed class SendPackage
{
- public SendPackage(CommandSend command, MessageMetadata metadata)
- {
- Command = command;
- Metadata = metadata;
- }
-
- public CommandSend Command { get; }
- public MessageMetadata Metadata { get; set; }
+ public CommandSend? Command { get; set; }
+ public MessageMetadata? Metadata { get; set; }
public ReadOnlySequence<byte> Payload { get; set; }
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 290ac74..260f3ce 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+ var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
var process = new ProducerProcess(correlationId, stateManager, factory, producer);
_processManager.Add(process);
process.Start();
@@ -75,7 +75,7 @@
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+ var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
@@ -92,7 +92,7 @@
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
- var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager);
+ var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
var process = new ReaderProcess(correlationId, stateManager, factory, reader);
_processManager.Add(process);
process.Start();
diff --git a/tests/docker-compose-standalone-tests.yml b/tests/docker-compose-standalone-tests.yml
index d591b22..06d0a06 100644
--- a/tests/docker-compose-standalone-tests.yml
+++ b/tests/docker-compose-standalone-tests.yml
@@ -1,10 +1,10 @@
-version: '3.5'
+version: '3.5'
services:
pulsar:
container_name: pulsar-stresstests
- image: 'apachepulsar/pulsar:2.5.0'
+ image: 'apachepulsar/pulsar:2.6.0'
ports:
- '54546:8080'
- '54545:6650'