Redeliver unacknowledged messages (#47)

* Dockerise samples

* Added basecommand extension for RedeliverUnacknowledgedMessages

* Added send overloads to IConsumerChannel and IConnection for CommandRedeliverUnacknowledgedMessages

* Connection impelmentation

* Added implementation for Send CommandRedeliverUnacknowledgedMessages in consumerchannel

* Implementation for Send CommandRedeliverUnacknowledgedMessages in NotReadyChannel

* Added public API for RedeliverUnacknowledgedMessages with given message Ids or none

* Reverted changes to samples

* Indent

* IEnumerable for consumer API

* Use IEnumerable in consumer impl

* New enumerable empty rather than list

* ConfigureAwait's for Consumer.RedeliverUnacknowledgedMessages
diff --git a/.gitignore b/.gitignore
index 35c74e6..abe8c84 100644
--- a/.gitignore
+++ b/.gitignore
@@ -287,3 +287,7 @@
 Network Trash Folder
 Temporary Items
 .apdisk
+
+# VSCode
+.devcontainer
+.vscode
\ No newline at end of file
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index e818a9c..ef060f4 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -106,5 +106,15 @@
         /// Unsubscribe the consumer.
         /// </summary>
         ValueTask Unsubscribe(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken);
+
+        /// <summary>
+        /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 1adbe8f..f4abd8b 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -30,6 +30,7 @@
         Task Send(CommandPong command, CancellationToken cancellationToken);
         Task Send(CommandAck command, CancellationToken cancellationToken);
         Task Send(CommandFlow command, CancellationToken cancellationToken);
+        Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
 
         Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
         Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index d398cd8..a86ff5a 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -22,6 +22,7 @@
     public interface IConsumerChannel : IAsyncDisposable
     {
         Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
         Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 2582e73..989aa36 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -97,6 +97,9 @@
         public Task Send(CommandFlow command, CancellationToken cancellationToken)
             => Send(command.AsBaseCommand(), cancellationToken);
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+            => Send(command.AsBaseCommand(), cancellationToken);
+
         public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 80e6369..fe5fce3 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -21,6 +21,7 @@
     using PulsarApi;
     using System;
     using System.Collections.Generic;
+    using System.Linq;
     using System.Runtime.CompilerServices;
     using System.Threading;
     using System.Threading.Tasks;
@@ -31,6 +32,7 @@
         private readonly IRegisterEvent _eventRegister;
         private IConsumerChannel _channel;
         private readonly CommandAck _cachedCommandAck;
+        private readonly CommandRedeliverUnacknowledgedMessages _cachedCommandRedeliverUnacknowledgedMessages;
         private readonly IExecute _executor;
         private readonly IStateChanged<ConsumerState> _state;
         private int _isDisposed;
@@ -52,6 +54,7 @@
             _executor = executor;
             _state = state;
             _cachedCommandAck = new CommandAck();
+            _cachedCommandRedeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
             _isDisposed = 0;
 
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
@@ -104,6 +107,12 @@
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
+
         public async ValueTask Unsubscribe(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
@@ -145,6 +154,17 @@
             }, cancellationToken).ConfigureAwait(false);
         }
 
+        private async ValueTask RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            await _executor.Execute(() =>
+            {
+                _cachedCommandRedeliverUnacknowledgedMessages.MessageIds.Clear();
+                _cachedCommandAck.MessageIds.AddRange(messageIds);
+                return _channel.Send(_cachedCommandRedeliverUnacknowledgedMessages, cancellationToken);
+            }, cancellationToken).ConfigureAwait(false);
+        }
         internal async ValueTask SetChannel(IConsumerChannel channel)
         {
             if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index e0768c6..c08649d 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -105,6 +105,12 @@
             await _connection.Send(command, cancellationToken).ConfigureAwait(false);
         }
 
+        public async Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+        {
+            command.ConsumerId = _id;
+            await _connection.Send(command, cancellationToken).ConfigureAwait(false);
+        }
+
         public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index f72fa31..4b182dc 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -166,5 +166,12 @@
                 CommandType = BaseCommand.Type.Seek,
                 Seek = command
             };
+        
+        public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
+            => new BaseCommand
+            {
+                CommandType = BaseCommand.Type.RedeliverUnacknowledgedMessages,
+                RedeliverUnacknowledgedMessages = command
+            };
     }
 }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index aec3f34..93f572a 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -39,6 +39,9 @@
         public Task Send(CommandAck command, CancellationToken cancellationToken)
             => throw GetException();
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+            => throw GetException();
+
         public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
             => throw GetException();