Expose 'RedeliveryCount' as requested in #41
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index 8209956..cb6ab13 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -33,7 +33,7 @@
_batches = new LinkedList<Batch>();
}
- public Message Add(MessageIdData messageId, MessageMetadata metadata, ReadOnlySequence<byte> data)
+ public Message Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
if (_trackBatches)
_batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
@@ -47,7 +47,7 @@
var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
index += singleMetadataSize;
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
- var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+ var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
_messages.Enqueue(message);
index += (uint) singleMetadata.PayloadSize;
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index 07c1d5a..d2eda4d 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -130,7 +130,7 @@
=> _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
- => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, data));
+ => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
public void Dispose()
{
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index cd0ba24..e0768c6 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -76,13 +76,14 @@
}
var metadataSize = messagePackage.GetMetadataSize();
+ var redeliveryCount = messagePackage.RedeliveryCount;
var data = messagePackage.ExtractData(metadataSize);
var metadata = messagePackage.ExtractMetadata(metadataSize);
var messageId = messagePackage.MessageId;
return metadata.NumMessagesInBatch == 1
- ? new Message(new MessageId(messageId), metadata, null, data)
- : _batchHandler.Add(messageId, metadata, data);
+ ? new Message(new MessageId(messageId), redeliveryCount, metadata, null, data)
+ : _batchHandler.Add(messageId, redeliveryCount, metadata, data);
}
}
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index b953635..ef0bb32 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -19,13 +19,15 @@
public readonly struct MessagePackage
{
- public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
+ public MessagePackage(MessageIdData messageId, uint redeliveryCount, ReadOnlySequence<byte> data)
{
MessageId = messageId;
+ RedeliveryCount = redeliveryCount;
Data = data;
}
public MessageIdData MessageId { get; }
+ public uint RedeliveryCount { get; }
public ReadOnlySequence<byte> Data { get; }
}
}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 0e4dfac..cc462c0 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -27,11 +27,13 @@
internal Message(
MessageId messageId,
+ uint redeliveryCount,
Internal.PulsarApi.MessageMetadata metadata,
SingleMessageMetadata? singleMetadata,
ReadOnlySequence<byte> data)
{
MessageId = messageId;
+ RedeliveryCount = redeliveryCount;
ProducerName = metadata.ProducerName;
PublishTime = metadata.PublishTime;
Data = data;
@@ -60,6 +62,7 @@
public ReadOnlySequence<byte> Data { get; }
public string ProducerName { get; }
public ulong SequenceId { get; }
+ public uint RedeliveryCount { get; }
public bool HasEventTime => EventTime != 0;
public ulong EventTime { get; }