Minor cleanup
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 4daf83e..31140b2 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -27,6 +27,6 @@
Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken);
- ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+ ValueTask ClosedByClient(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index dd806f4..b214237 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -23,6 +23,6 @@
public interface IProducerChannel : IAsyncDisposable
{
Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
- ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+ ValueTask ClosedByClient(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index a692c18..55135ea 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -23,7 +23,7 @@
{
Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
- ValueTask<Message> Receive(CancellationToken cancellationToken = default);
- ValueTask ClosedByClient(CancellationToken cancellationToken = default);
+ ValueTask<Message> Receive(CancellationToken cancellationToken);
+ ValueTask ClosedByClient(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 2282711..c6e9dfc 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -83,7 +83,7 @@
return;
_eventRegister.Register(new ConsumerDisposed(_correlationId, this));
- await _channel.ClosedByClient().ConfigureAwait(false);
+ await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
@@ -111,7 +111,13 @@
=> await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
- => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+ {
+ ThrowIfDisposed();
+
+ var command = new CommandRedeliverUnacknowledgedMessages();
+ command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+ await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
=> await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
@@ -121,7 +127,12 @@
ThrowIfDisposed();
var unsubscribe = new CommandUnsubscribe();
- _ = await _executor.Execute(() => _channel.Send(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Unsubscribe(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
+ {
+ _ = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
@@ -161,7 +172,12 @@
ThrowIfDisposed();
var getLastMessageId = new CommandGetLastMessageId();
- var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() => GetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ {
+ var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
return new MessageId(response.LastMessageId);
}
@@ -179,10 +195,7 @@
try
{
- await _executor.Execute(() =>
- {
- return _channel.Send(commandAck, cancellationToken);
- }, cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Acknowledge(commandAck, cancellationToken), cancellationToken).ConfigureAwait(false);
}
finally
{
@@ -190,14 +203,11 @@
}
}
- private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
+ private async ValueTask Acknowledge(CommandAck command, CancellationToken cancellationToken)
+ => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
- redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
- await _executor.Execute(() => _channel.Send(redeliverUnacknowledgedMessages, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
+ private async ValueTask RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+ => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
internal async ValueTask SetChannel(IConsumerChannel channel)
{
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 1ac6886..2347eee 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -84,7 +84,7 @@
return;
_eventRegister.Register(new ProducerDisposed(_correlationId, this));
- await _channel.ClosedByClient().ConfigureAwait(false);
+ await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
@@ -97,12 +97,12 @@
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
+
var metadata = _messageMetadataPool.Get();
try
{
metadata.SequenceId = _sequenceId.FetchNext();
- var response = await _executor.Execute(() => _channel.Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+ return await _executor.Execute(() => Send(metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
}
finally
{
@@ -126,8 +126,7 @@
try
{
- var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+ return await _executor.Execute(() => Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
}
finally
{
@@ -136,6 +135,12 @@
}
}
+ private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+ {
+ var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
+ return new MessageId(response.MessageId);
+ }
+
internal async ValueTask SetChannel(IProducerChannel channel)
{
if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index eeb95f5..571c5b6 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,10 +73,17 @@
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
- public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default)
+ public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
+ ThrowIfDisposed();
+
var getLastMessageId = new CommandGetLastMessageId();
- var response = await _executor.Execute(() => _channel.Send(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() => GetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ {
+ var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
return new MessageId(response.LastMessageId);
}
@@ -129,7 +136,7 @@
return;
_eventRegister.Register(new ReaderDisposed(_correlationId, this));
- await _channel.ClosedByClient().ConfigureAwait(false);
+ await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}