Fixed last nullable warning and moved all public interfaces to ValueTask
diff --git a/src/DotPulsar.Tests/Internal/StateManagerTests.cs b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
index b7a6590..270a811 100644
--- a/src/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -219,7 +219,7 @@
//Act
cts.Cancel();
- var exception = await Record.ExceptionAsync(() => task);
+ var exception = await Record.ExceptionAsync(() => task.AsTask());
//Assert
Assert.IsType<TaskCanceledException>(exception);
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 5c435f7..51920d4 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -13,27 +13,27 @@
/// <summary>
/// Acknowledge the consumption of a single message.
/// </summary>
- Task Acknowledge(Message message, CancellationToken cancellationToken = default);
+ ValueTask Acknowledge(Message message, CancellationToken cancellationToken = default);
/// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
/// </summary>
- Task Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
+ ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
/// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
/// </summary>
- Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
+ ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
/// <summary>
/// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId.
/// </summary>
- Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
+ ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
/// Get the MessageId of the last message on the topic.
/// </summary>
- Task<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
+ ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
/// <summary>
/// Get an IAsyncEnumerable for consuming messages
@@ -43,11 +43,11 @@
/// <summary>
/// Reset the subscription associated with this consumer to a specific MessageId.
/// </summary>
- Task Seek(MessageId messageId, CancellationToken cancellationToken = default);
+ ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
/// Unsubscribe the consumer.
/// </summary>
- Task Unsubscribe(CancellationToken cancellationToken = default);
+ ValueTask Unsubscribe(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 447c38f..c6e6ef8 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -48,6 +48,6 @@
/// <summary>
/// Set the consumer name.
/// </summary>
- Task<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 659b5ff..0127a98 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -12,11 +12,11 @@
/// <summary>
/// Sends a message.
/// </summary>
- Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a message and metadata.
/// </summary>
- Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+ ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
index a9b9389..df52596 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -17,7 +17,7 @@
/// <remarks>
/// If the state change to a final state, then all awaiting tasks will complete.
/// </remarks>
- Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
+ ValueTask<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
/// <summary>
/// Wait for the state to change from a specific state.
@@ -28,7 +28,7 @@
/// <remarks>
/// If the state change to a final state, then all awaiting tasks will complete.
/// </remarks>
- Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
+ ValueTask<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
/// <summary>
/// Ask whether the current state is final, meaning that it will never change.
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
index 582632f..7e84ddd 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
@@ -6,6 +6,7 @@
{
public interface IProducerStream : IAsyncDisposable
{
+ Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload);
Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 4e21485..efd920a 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -37,8 +37,8 @@
_throwIfClosedOrFaulted = () => { };
}
- public async Task<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
- public async Task<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public async ValueTask<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
public bool IsFinalState() => _stateManager.IsFinalState();
public bool IsFinalState(ConsumerState state) => _stateManager.IsFinalState(state);
@@ -57,38 +57,38 @@
}
}
- public async Task Acknowledge(Message message, CancellationToken cancellationToken)
+ public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
=> await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
- public async Task Acknowledge(MessageId messageId, CancellationToken cancellationToken)
+ public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
- public async Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
+ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
=> await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
- public async Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
+ public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
- public async Task Unsubscribe(CancellationToken cancellationToken)
+ public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
_ = await _executor.Execute(() => Stream.Send(new CommandUnsubscribe()), cancellationToken);
HasClosed();
}
- public async Task Seek(MessageId messageId, CancellationToken cancellationToken)
+ public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
var seek = new CommandSeek { MessageId = messageId.Data };
_ = await _executor.Execute(() => Stream.Send(seek), cancellationToken);
return;
}
- public async Task<MessageId> GetLastMessageId(CancellationToken cancellationToken)
+ public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
var response = await _executor.Execute(() => Stream.Send(new CommandGetLastMessageId()), cancellationToken);
return new MessageId(response.LastMessageId);
}
- private async Task Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+ private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
{
await _executor.Execute(() =>
{
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index 43b8931..72cff30 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -58,6 +58,6 @@
return this;
}
- public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
+ public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyStream.cs b/src/DotPulsar/Internal/NotReadyStream.cs
index eeb0968..4c6dda0 100644
--- a/src/DotPulsar/Internal/NotReadyStream.cs
+++ b/src/DotPulsar/Internal/NotReadyStream.cs
@@ -21,6 +21,8 @@
public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
+ public Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload) => throw GetException();
+
public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload) => throw GetException();
private Exception GetException() => new StreamNotReadyException();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index f2332d2..af0ab07 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,8 +30,8 @@
_throwIfClosedOrFaulted = () => { };
}
- public async Task<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
- public async Task<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public async ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
public bool IsFinalState() => _stateManager.IsFinalState();
public bool IsFinalState(ProducerState state) => _stateManager.IsFinalState(state);
@@ -42,15 +42,15 @@
await _connectTask;
}
- public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await Send(new MessageMetadata(), data, cancellationToken);
-
- public async Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => await Send(metadata.Metadata, data, cancellationToken);
-
- private async Task<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken)
+ public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
- var response = await _executor.Execute(() => Stream.Send(metadata, payload), cancellationToken);
+ var response = await _executor.Execute(() => Stream.Send(data), cancellationToken);
+ return new MessageId(response.MessageId);
+ }
+
+ public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+ {
+ var response = await _executor.Execute(() => Stream.Send(metadata.Metadata, data), cancellationToken);
return new MessageId(response.MessageId);
}
diff --git a/src/DotPulsar/Internal/ProducerStream.cs b/src/DotPulsar/Internal/ProducerStream.cs
index c8faba8..e51c1ca 100644
--- a/src/DotPulsar/Internal/ProducerStream.cs
+++ b/src/DotPulsar/Internal/ProducerStream.cs
@@ -8,6 +8,7 @@
{
public sealed class ProducerStream : IProducerStream
{
+ private readonly PulsarApi.MessageMetadata _cachedMetadata;
private readonly SendPackage _cachedSendPackage;
private readonly ulong _id;
private readonly string _name;
@@ -18,7 +19,8 @@
public ProducerStream(ulong id, string name, SequenceId sequenceId, Connection connection, IFaultStrategy faultStrategy, IProducerProxy proxy)
{
- _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 });
+ _cachedMetadata = new PulsarApi.MessageMetadata();
+ _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 }, _cachedMetadata);
_id = id;
_name = name;
_sequenceId = sequenceId;
@@ -39,6 +41,8 @@
}
}
+ public async Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload) => await Send(_cachedMetadata, payload);
+
public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
{
try
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 979109c..d1cc22b 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -32,8 +32,8 @@
_throwIfClosedOrFaulted = () => { };
}
- public async Task<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
- public async Task<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+ public async ValueTask<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+ public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
public bool IsFinalState() => _stateManager.IsFinalState();
public bool IsFinalState(ReaderState state) => _stateManager.IsFinalState(state);
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index 7d9fbfd..17ed150 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -5,7 +5,11 @@
{
public sealed class SendPackage
{
- public SendPackage(CommandSend command) => Command = command;
+ public SendPackage(CommandSend command, PulsarApi.MessageMetadata metadata)
+ {
+ Command = command;
+ Metadata = metadata;
+ }
public CommandSend Command { get; }
public PulsarApi.MessageMetadata Metadata { get; set; }
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
index 2162fbc..b2b9963 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -39,23 +39,23 @@
}
}
- public Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
+ public ValueTask<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
{
lock (_lock)
{
if (IsFinalState(CurrentState) || CurrentState.Equals(state))
- return Task.FromResult(CurrentState);
- return _stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken);
+ return new ValueTask<TState>(CurrentState);
+ return new ValueTask<TState>(_stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken));
}
}
- public Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
+ public ValueTask<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
{
lock (_lock)
{
if (IsFinalState(CurrentState) || !CurrentState.Equals(state))
- return Task.FromResult(CurrentState);
- return _stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken);
+ return new ValueTask<TState>(CurrentState);
+ return new ValueTask<TState>(_stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken));
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 5e30d22..ff28461 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -34,7 +34,7 @@
ThrowIfClosed();
var producer = new Producer(new ProducerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
_disposabels.AddFirst(producer);
- producer.StateChangedTo(ProducerState.Closed, default).ContinueWith(t => Remove(producer));
+ producer.StateChangedTo(ProducerState.Closed, default).AsTask().ContinueWith(t => Remove(producer));
return producer;
}
}
@@ -46,7 +46,7 @@
ThrowIfClosed();
var consumer = new Consumer(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy, options.SubscriptionType != SubscriptionType.Failover);
_disposabels.AddFirst(consumer);
- consumer.StateChangedTo(ConsumerState.Closed, default).ContinueWith(t => Remove(consumer));
+ consumer.StateChangedTo(ConsumerState.Closed, default).AsTask().ContinueWith(t => Remove(consumer));
return consumer;
}
}
@@ -58,7 +58,7 @@
ThrowIfClosed();
var reader = new Reader(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
_disposabels.AddFirst(reader);
- reader.StateChangedTo(ReaderState.Closed, default).ContinueWith(t => Remove(reader));
+ reader.StateChangedTo(ReaderState.Closed, default).AsTask().ContinueWith(t => Remove(reader));
return reader;
}
}