Implementing seeking on the message publish time from both the consumer and reader
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index ef060f4..48fe671 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -66,7 +66,7 @@
bool IsFinalState(ConsumerState state);
/// <summary>
- /// Get an IAsyncEnumerable for consuming messages
+ /// Get an IAsyncEnumerable for consuming messages.
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
@@ -76,6 +76,21 @@
ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using unix time in milliseconds.
+ /// </summary>
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using an UTC DateTime.
+ /// </summary>
+ ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this consumer to a specific message publish time using a DateTimeOffset.
+ /// </summary>
+ ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index a44ea9c..7b9502a 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -46,6 +46,26 @@
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
/// <summary>
+ /// Reset the subscription associated with this reader to a specific MessageId.
+ /// </summary>
+ ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using unix time in milliseconds.
+ /// </summary>
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using an UTC DateTime.
+ /// </summary>
+ ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the subscription associated with this reader to a specific message publish time using a DateTimeOffset.
+ /// </summary>
+ ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index 64c2289..a63762b 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -14,12 +14,14 @@
namespace DotPulsar.Internal.Abstractions
{
+ using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;
public interface IReaderChannel : IAsyncDisposable
{
+ Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 33e807f..dad2416 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -124,11 +124,31 @@
{
ThrowIfDisposed();
- var seek = new CommandSeek
- {
- MessageId = messageId.Data
- };
+ var seek = new CommandSeek { MessageId = messageId.Data };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
_ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
@@ -170,11 +190,7 @@
var redeliverUnacknowledgedMessages = new CommandRedeliverUnacknowledgedMessages();
redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
-
- await _executor.Execute(() =>
- {
- return _channel.Send(redeliverUnacknowledgedMessages, cancellationToken);
- }, cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => _channel.Send(redeliverUnacknowledgedMessages, cancellationToken), cancellationToken).ConfigureAwait(false);
}
internal async ValueTask SetChannel(IConsumerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 82f0e91..3dccb73 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -17,6 +17,7 @@
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.PulsarApi;
using Events;
using System;
using System.Collections.Generic;
@@ -80,6 +81,38 @@
yield return await _executor.Execute(() => _channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
+ public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessageId = messageId.Data };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)