Implementing seeking on the message publish time from both the consumer and reader
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index ef060f4..48fe671 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -66,7 +66,7 @@
         bool IsFinalState(ConsumerState state);
 
         /// <summary>
-        /// Get an IAsyncEnumerable for consuming messages
+        /// Get an IAsyncEnumerable for consuming messages.
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
 
@@ -76,6 +76,21 @@
         ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Reset the subscription associated with this consumer to a specific message publish time using unix time in milliseconds.
+        /// </summary>
+        ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this consumer to a specific message publish time using an UTC DateTime.
+        /// </summary>
+        ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this consumer to a specific message publish time using a DateTimeOffset.
+        /// </summary>
+        ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
         /// <returns>
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index a44ea9c..7b9502a 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -46,6 +46,26 @@
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Reset the subscription associated with this reader to a specific MessageId.
+        /// </summary>
+        ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this reader to a specific message publish time using unix time in milliseconds.
+        /// </summary>
+        ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this reader to a specific message publish time using an UTC DateTime.
+        /// </summary>
+        ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Reset the subscription associated with this reader to a specific message publish time using a DateTimeOffset.
+        /// </summary>
+        ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
         /// <returns>
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index 64c2289..a63762b 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -14,12 +14,14 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using DotPulsar.Internal.PulsarApi;
     using System;
     using System.Threading;
     using System.Threading.Tasks;
 
     public interface IReaderChannel : IAsyncDisposable
     {
+        Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken = default);
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 33e807f..dad2416 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -124,11 +124,31 @@
         {
             ThrowIfDisposed();
 
-            var seek = new CommandSeek
-            {
-                MessageId = messageId.Data
-            };
+            var seek = new CommandSeek { MessageId = messageId.Data };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
 
+        public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = publishTime };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
             _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
@@ -170,11 +190,7 @@
 
             var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
             redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
-
-            await _executor.Execute(() =>
-            {
-                return _channel.Send(redeliverUnacknowledgedMessages, cancellationToken);
-            }, cancellationToken).ConfigureAwait(false);
+            await _executor.Execute(() => _channel.Send(redeliverUnacknowledgedMessages, cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
         internal async ValueTask SetChannel(IConsumerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 82f0e91..3dccb73 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -17,6 +17,7 @@
     using Abstractions;
     using DotPulsar.Abstractions;
     using DotPulsar.Exceptions;
+    using DotPulsar.Internal.PulsarApi;
     using Events;
     using System;
     using System.Collections.Generic;
@@ -80,6 +81,38 @@
                 yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
+        public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessageId = messageId.Data };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = publishTime };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
+            _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
         public async ValueTask DisposeAsync()
         {
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)