Fixed last nullable warning and moved all public interfaces to ValueTask
diff --git a/src/DotPulsar.Tests/Internal/StateManagerTests.cs b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
index b7a6590..270a811 100644
--- a/src/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/src/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -219,7 +219,7 @@
 
             //Act
             cts.Cancel();
-            var exception = await Record.ExceptionAsync(() => task);
+            var exception = await Record.ExceptionAsync(() => task.AsTask());
 
             //Assert
             Assert.IsType<TaskCanceledException>(exception);
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 5c435f7..51920d4 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -13,27 +13,27 @@
         /// <summary>
         /// Acknowledge the consumption of a single message.
         /// </summary>
-        Task Acknowledge(Message message, CancellationToken cancellationToken = default);
+        ValueTask Acknowledge(Message message, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Acknowledge the consumption of a single message using the MessageId.
         /// </summary>
-        Task Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
+        ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
         /// </summary>
-        Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
+        ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId.
         /// </summary>
-        Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
+        ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Get the MessageId of the last message on the topic.
         /// </summary>
-        Task<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
+        ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Get an IAsyncEnumerable for consuming messages
@@ -43,11 +43,11 @@
         /// <summary>
         /// Reset the subscription associated with this consumer to a specific MessageId.
         /// </summary>
-        Task Seek(MessageId messageId, CancellationToken cancellationToken = default);
+        ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Unsubscribe the consumer.
         /// </summary>
-        Task Unsubscribe(CancellationToken cancellationToken = default);
+        ValueTask Unsubscribe(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/IMessageBuilder.cs b/src/DotPulsar/Abstractions/IMessageBuilder.cs
index 447c38f..c6e6ef8 100644
--- a/src/DotPulsar/Abstractions/IMessageBuilder.cs
+++ b/src/DotPulsar/Abstractions/IMessageBuilder.cs
@@ -48,6 +48,6 @@
         /// <summary>
         /// Set the consumer name.
         /// </summary>
-        Task<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(ReadOnlyMemory<byte> payload, CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 659b5ff..0127a98 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -12,11 +12,11 @@
         /// <summary>
         /// Sends a message.
         /// </summary>
-        Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Sends a message and metadata.
         /// </summary>
-        Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
+        ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Abstractions/IStateChanged.cs
index a9b9389..df52596 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IStateChanged.cs
@@ -17,7 +17,7 @@
         /// <remarks>
         /// If the state change to a final state, then all awaiting tasks will complete.
         /// </remarks>
-        Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
+        ValueTask<TState> StateChangedTo(TState state, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Wait for the state to change from a specific state.
@@ -28,7 +28,7 @@
         /// <remarks>
         /// If the state change to a final state, then all awaiting tasks will complete.
         /// </remarks>
-        Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
+        ValueTask<TState> StateChangedFrom(TState state, CancellationToken cancellationToken = default);
 
         /// <summary>
         /// Ask whether the current state is final, meaning that it will never change.
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
index 582632f..7e84ddd 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerStream.cs
@@ -6,6 +6,7 @@
 {
     public interface IProducerStream : IAsyncDisposable
     {
+        Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload);
         Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload);
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 4e21485..efd920a 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -37,8 +37,8 @@
             _throwIfClosedOrFaulted = () => { };
         }
 
-        public async Task<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
-        public async Task<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public async ValueTask<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
         public bool IsFinalState() => _stateManager.IsFinalState();
         public bool IsFinalState(ConsumerState state) => _stateManager.IsFinalState(state);
 
@@ -57,38 +57,38 @@
             }
         }
 
-        public async Task Acknowledge(Message message, CancellationToken cancellationToken)
+        public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
             => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken);
 
-        public async Task Acknowledge(MessageId messageId, CancellationToken cancellationToken)
+        public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken);
 
-        public async Task AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
+        public async ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
             => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
 
-        public async Task AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
+        public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken);
 
-        public async Task Unsubscribe(CancellationToken cancellationToken)
+        public async ValueTask Unsubscribe(CancellationToken cancellationToken)
         {
             _ = await _executor.Execute(() => Stream.Send(new CommandUnsubscribe()), cancellationToken);
             HasClosed();
         }
 
-        public async Task Seek(MessageId messageId, CancellationToken cancellationToken)
+        public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
         {
             var seek = new CommandSeek { MessageId = messageId.Data };
             _ = await _executor.Execute(() => Stream.Send(seek), cancellationToken);
             return;
         }
 
-        public async Task<MessageId> GetLastMessageId(CancellationToken cancellationToken)
+        public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
         {
             var response = await _executor.Execute(() => Stream.Send(new CommandGetLastMessageId()), cancellationToken);
             return new MessageId(response.LastMessageId);
         }
 
-        private async Task Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+        private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
         {
             await _executor.Execute(() =>
             {
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index 43b8931..72cff30 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -58,6 +58,6 @@
             return this;
         }
 
-        public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
+        public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) => await _producer.Send(_metadata, data, cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/NotReadyStream.cs b/src/DotPulsar/Internal/NotReadyStream.cs
index eeb0968..4c6dda0 100644
--- a/src/DotPulsar/Internal/NotReadyStream.cs
+++ b/src/DotPulsar/Internal/NotReadyStream.cs
@@ -21,6 +21,8 @@
 
         public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
 
+        public Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload) => throw GetException();
+
         public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload) => throw GetException();
 
         private Exception GetException() => new StreamNotReadyException();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index f2332d2..af0ab07 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,8 +30,8 @@
             _throwIfClosedOrFaulted = () => { };
         }
 
-        public async Task<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
-        public async Task<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public async ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
         public bool IsFinalState() => _stateManager.IsFinalState();
         public bool IsFinalState(ProducerState state) => _stateManager.IsFinalState(state);
 
@@ -42,15 +42,15 @@
             await _connectTask;
         }
 
-        public async Task<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
-            => await Send(new MessageMetadata(), data, cancellationToken);
-
-        public async Task<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
-            => await Send(metadata.Metadata, data, cancellationToken);
-
-        private async Task<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload, CancellationToken cancellationToken)
+        public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
         {
-            var response = await _executor.Execute(() => Stream.Send(metadata, payload), cancellationToken);
+            var response = await _executor.Execute(() => Stream.Send(data), cancellationToken);
+            return new MessageId(response.MessageId);
+        }
+
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
+        {
+            var response = await _executor.Execute(() => Stream.Send(metadata.Metadata, data), cancellationToken);
             return new MessageId(response.MessageId);
         }
 
diff --git a/src/DotPulsar/Internal/ProducerStream.cs b/src/DotPulsar/Internal/ProducerStream.cs
index c8faba8..e51c1ca 100644
--- a/src/DotPulsar/Internal/ProducerStream.cs
+++ b/src/DotPulsar/Internal/ProducerStream.cs
@@ -8,6 +8,7 @@
 {
     public sealed class ProducerStream : IProducerStream
     {
+        private readonly PulsarApi.MessageMetadata _cachedMetadata;
         private readonly SendPackage _cachedSendPackage;
         private readonly ulong _id;
         private readonly string _name;
@@ -18,7 +19,8 @@
 
         public ProducerStream(ulong id, string name, SequenceId sequenceId, Connection connection, IFaultStrategy faultStrategy, IProducerProxy proxy)
         {
-            _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 });
+            _cachedMetadata = new PulsarApi.MessageMetadata();
+            _cachedSendPackage = new SendPackage(new CommandSend { ProducerId = id, NumMessages = 1 }, _cachedMetadata);
             _id = id;
             _name = name;
             _sequenceId = sequenceId;
@@ -39,6 +41,8 @@
             }
         }
 
+        public async Task<CommandSendReceipt> Send(ReadOnlyMemory<byte> payload) => await Send(_cachedMetadata, payload);
+
         public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlyMemory<byte> payload)
         {
             try
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 979109c..d1cc22b 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -32,8 +32,8 @@
             _throwIfClosedOrFaulted = () => { };
         }
 
-        public async Task<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
-        public async Task<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
+        public async ValueTask<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedTo(state, cancellationToken);
+        public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken) => await _stateManager.StateChangedFrom(state, cancellationToken);
         public bool IsFinalState() => _stateManager.IsFinalState();
         public bool IsFinalState(ReaderState state) => _stateManager.IsFinalState(state);
 
diff --git a/src/DotPulsar/Internal/SendPackage.cs b/src/DotPulsar/Internal/SendPackage.cs
index 7d9fbfd..17ed150 100644
--- a/src/DotPulsar/Internal/SendPackage.cs
+++ b/src/DotPulsar/Internal/SendPackage.cs
@@ -5,7 +5,11 @@
 {
     public sealed class SendPackage
     {
-        public SendPackage(CommandSend command) => Command = command;
+        public SendPackage(CommandSend command, PulsarApi.MessageMetadata metadata)
+        {
+            Command = command;
+            Metadata = metadata;
+        }
 
         public CommandSend Command { get; }
         public PulsarApi.MessageMetadata Metadata { get; set; }
diff --git a/src/DotPulsar/Internal/StateManager.cs b/src/DotPulsar/Internal/StateManager.cs
index 2162fbc..b2b9963 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -39,23 +39,23 @@
             }
         }
 
-        public Task<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
+        public ValueTask<TState> StateChangedTo(TState state, CancellationToken cancellationToken)
         {
             lock (_lock)
             {
                 if (IsFinalState(CurrentState) || CurrentState.Equals(state))
-                    return Task.FromResult(CurrentState);
-                return _stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken);
+                    return new ValueTask<TState>(CurrentState);
+                return new ValueTask<TState>(_stateTasks.CreateTaskFor(state, StateChanged.To, cancellationToken));
             }
         }
 
-        public Task<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
+        public ValueTask<TState> StateChangedFrom(TState state, CancellationToken cancellationToken)
         {
             lock (_lock)
             {
                 if (IsFinalState(CurrentState) || !CurrentState.Equals(state))
-                    return Task.FromResult(CurrentState);
-                return _stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken);
+                    return new ValueTask<TState>(CurrentState);
+                return new ValueTask<TState>(_stateTasks.CreateTaskFor(state, StateChanged.From, cancellationToken));
             }
         }
 
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 5e30d22..ff28461 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -34,7 +34,7 @@
                 ThrowIfClosed();
                 var producer = new Producer(new ProducerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
                 _disposabels.AddFirst(producer);
-                producer.StateChangedTo(ProducerState.Closed, default).ContinueWith(t => Remove(producer));
+                producer.StateChangedTo(ProducerState.Closed, default).AsTask().ContinueWith(t => Remove(producer));
                 return producer;
             }
         }
@@ -46,7 +46,7 @@
                 ThrowIfClosed();
                 var consumer = new Consumer(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy, options.SubscriptionType != SubscriptionType.Failover);
                 _disposabels.AddFirst(consumer);
-                consumer.StateChangedTo(ConsumerState.Closed, default).ContinueWith(t => Remove(consumer));
+                consumer.StateChangedTo(ConsumerState.Closed, default).AsTask().ContinueWith(t => Remove(consumer));
                 return consumer;
             }
         }
@@ -58,7 +58,7 @@
                 ThrowIfClosed();
                 var reader = new Reader(new ConsumerStreamFactory(_connectionPool, options, _faultStrategy), _faultStrategy);
                 _disposabels.AddFirst(reader);
-                reader.StateChangedTo(ReaderState.Closed, default).ContinueWith(t => Remove(reader));
+                reader.StateChangedTo(ReaderState.Closed, default).AsTask().ContinueWith(t => Remove(reader));
                 return reader;
             }
         }