Support reading/consuming batched messages and make ready for 0.6.0
diff --git a/src/DotPulsar.Tests/DotPulsar.Tests.csproj b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
index 3dff614..57ac1e6 100644
--- a/src/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFramework>netcoreapp3.0</TargetFramework>
+ <TargetFrameworks>netcoreapp3.0;netcoreapp22</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 397d98e..e50959b 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
- <Version>0.5.0</Version>
+ <Version>0.6.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>DanskeCommodities;dblank</Authors>
@@ -11,7 +11,7 @@
<Title>DotPulsar</Title>
<PackageTags>Apache;Pulsar</PackageTags>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
- <PackageReleaseNotes>Beta release - Support sending ReadOnlySequence of bytes</PackageReleaseNotes>
+ <PackageReleaseNotes>Beta release - Support KeyShared subscription and consuming batched messages</PackageReleaseNotes>
<Description>.NET/C# client library for Apache Pulsar</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs b/src/DotPulsar/Exceptions/ChecksumException.cs
index 4068590..4bb382a 100644
--- a/src/DotPulsar/Exceptions/ChecksumException.cs
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -3,7 +3,5 @@
public sealed class ChecksumException : DotPulsarException
{
public ChecksumException(string message) : base(message) { }
-
- public ChecksumException(uint expectedChecksum, uint actualChecksum) : base($"Checksum mismatch. Excepted {expectedChecksum} but was actually {actualChecksum}") { }
}
}
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
new file mode 100644
index 0000000..840676e
--- /dev/null
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -0,0 +1,98 @@
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System.Buffers;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace DotPulsar.Internal
+{
+ public sealed class BatchHandler
+ {
+ private readonly bool _trackBatches;
+ private readonly Queue<Message> _messages;
+ private readonly LinkedList<Batch> _batches;
+
+ public BatchHandler(bool trackBatches)
+ {
+ _trackBatches = trackBatches;
+ _messages = new Queue<Message>();
+ _batches = new LinkedList<Batch>();
+ }
+
+ public Message Add(MessageIdData messageId, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data)
+ {
+ if (_trackBatches)
+ _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
+
+ long index = 0;
+ for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+ {
+ var singleMetadataSize = data.ReadUInt32(index, true);
+ index += 4;
+ var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+ index += singleMetadataSize;
+ var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+ var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+ _messages.Enqueue(message);
+ index += (uint)singleMetadata.PayloadSize;
+ }
+
+ return _messages.Dequeue();
+ }
+
+ public Message? GetNext() => _messages.Count == 0 ? null : _messages.Dequeue();
+
+ public void Clear()
+ {
+ _messages.Clear();
+ _batches.Clear();
+ }
+
+ public MessageIdData? Acknowledge(MessageIdData messageId)
+ {
+ foreach (var batch in _batches)
+ {
+ if (messageId.LedgerId != batch.MessageId.LedgerId ||
+ messageId.EntryId != batch.MessageId.EntryId ||
+ messageId.Partition != batch.MessageId.Partition)
+ continue;
+
+ batch.Acknowledge(messageId.BatchIndex);
+ if (batch.IsAcknowledged())
+ {
+ _batches.Remove(batch);
+ return batch.MessageId;
+ }
+ break;
+ }
+
+ return null;
+ }
+
+ private sealed class Batch
+ {
+ private readonly BitArray _acknowledgementIndex;
+
+ public Batch(MessageIdData messageId, int numberOfMessages)
+ {
+ MessageId = messageId;
+ _acknowledgementIndex = new BitArray(numberOfMessages, false);
+ }
+
+ public MessageIdData MessageId { get; }
+
+ public void Acknowledge(int batchIndex) => _acknowledgementIndex.Set(batchIndex, true);
+
+ public bool IsAcknowledged()
+ {
+ for (var i = 0; i < _acknowledgementIndex.Length; i++)
+ {
+ if (!_acknowledgementIndex[i])
+ return false;
+ }
+
+ return true;
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 402e624..4c7bd29 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -156,7 +156,7 @@
switch (command.CommandType)
{
case BaseCommand.Type.Message:
- _consumerManager.Incoming(new MessagePackage(command.Message, sequence.Slice(commandSize)));
+ _consumerManager.Incoming(command.Message, sequence.Slice(commandSize));
return;
case BaseCommand.Type.CloseConsumer:
_consumerManager.Incoming(command.CloseConsumer);
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index 3199b15..e303ef0 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -13,6 +13,7 @@
PulsarSslScheme = "pulsar+ssl";
DefaultPulsarPort = 6650;
DefaultPulsarSSLPort = 6651;
+ MagicNumber = new byte[] { 0x0e, 0x01 };
}
public static string ClientVersion { get; }
@@ -21,5 +22,6 @@
public static string PulsarSslScheme { get; }
public static int DefaultPulsarPort { get; }
public static int DefaultPulsarSSLPort { get; }
+ public static byte[] MagicNumber { get; }
}
}
diff --git a/src/DotPulsar/Internal/ConsumerManager.cs b/src/DotPulsar/Internal/ConsumerManager.cs
index 597894c..c237608 100644
--- a/src/DotPulsar/Internal/ConsumerManager.cs
+++ b/src/DotPulsar/Internal/ConsumerManager.cs
@@ -1,6 +1,7 @@
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
+using System.Buffers;
namespace DotPulsar.Internal
{
@@ -22,11 +23,10 @@
}
}
- public void Incoming(MessagePackage package)
+ public void Incoming(CommandMessage message, ReadOnlySequence<byte> data)
{
- var consumerId = package.Command.ConsumerId;
- var proxy = _proxies[consumerId];
- proxy?.Enqueue(package);
+ var proxy = _proxies[message.ConsumerId];
+ proxy?.Enqueue(new MessagePackage(message.MessageId, data));
}
public void Incoming(CommandCloseConsumer command) => RemoveConsumer(command.ConsumerId);
diff --git a/src/DotPulsar/Internal/ConsumerStream.cs b/src/DotPulsar/Internal/ConsumerStream.cs
index 776c3df..b617d3b 100644
--- a/src/DotPulsar/Internal/ConsumerStream.cs
+++ b/src/DotPulsar/Internal/ConsumerStream.cs
@@ -1,5 +1,4 @@
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System;
@@ -15,61 +14,75 @@
private readonly Connection _connection;
private readonly IFaultStrategy _faultStrategy;
private readonly IConsumerProxy _proxy;
- private readonly CommandFlow _commandFlow;
+ private readonly BatchHandler _batchHandler;
+ private readonly CommandFlow _cachedCommandFlow;
private uint _sendWhenZero;
- private bool _firstBatch;
+ private bool _firstFlow;
- public ConsumerStream(ulong id, uint messagePrefetchCount, IDequeue<MessagePackage> dequeue, Connection connection, IFaultStrategy faultStrategy, IConsumerProxy proxy)
+ public ConsumerStream(
+ ulong id,
+ uint messagePrefetchCount,
+ IDequeue<MessagePackage> dequeue,
+ Connection connection,
+ IFaultStrategy faultStrategy,
+ IConsumerProxy proxy,
+ BatchHandler batchHandler)
{
_id = id;
_dequeue = dequeue;
_connection = connection;
_faultStrategy = faultStrategy;
_proxy = proxy;
- _commandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
+ _batchHandler = batchHandler;
+ _cachedCommandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
_sendWhenZero = 0;
- _firstBatch = true;
+ _firstFlow = true;
}
public async ValueTask<Message> Receive(CancellationToken cancellationToken)
{
while (true)
{
- if (_sendWhenZero == 0) //TODO should sending the flow command be handled on other thread and thereby not slow down the consumer?
- {
- await _connection.Send(_commandFlow);
+ if (_sendWhenZero == 0)
+ await SendFlow();
- if (_firstBatch)
- {
- _commandFlow.MessagePermits = (uint)Math.Ceiling(_commandFlow.MessagePermits * 0.5);
- _firstBatch = false;
- }
-
- _sendWhenZero = _commandFlow.MessagePermits;
- }
-
- var messagePackage = await _dequeue.Dequeue(cancellationToken);
_sendWhenZero--;
- try
- {
- return Serializer.Deserialize(messagePackage);
- }
- catch (ChecksumException)
- {
- var ack = new CommandAck
- {
- Type = CommandAck.AckType.Individual,
- validation_error = CommandAck.ValidationError.ChecksumMismatch
- };
- ack.MessageIds.Add(messagePackage.Command.MessageId);
- await Send(ack);
- }
+ var message = _batchHandler.GetNext();
+ if (message != null)
+ return message;
+
+ var messagePackage = await _dequeue.Dequeue(cancellationToken);
+
+ if (!await Validate(messagePackage))
+ continue;
+
+ var messageId = messagePackage.MessageId;
+ var data = messagePackage.Data;
+
+ var metadataSize = data.ReadUInt32(6, true);
+ var metadata = Serializer.Deserialize<PulsarApi.MessageMetadata>(data.Slice(10, metadataSize));
+ data = data.Slice(10 + metadataSize);
+
+ if (metadata.NumMessagesInBatch == 1)
+ return new Message(new MessageId(messageId), metadata, null, data);
+
+ return _batchHandler.Add(messageId, metadata, data);
}
}
public async Task Send(CommandAck command)
{
+ var messageId = command.MessageIds[0];
+ if (messageId.BatchIndex != -1)
+ {
+ var batchMessageId = _batchHandler.Acknowledge(messageId);
+ if (batchMessageId is null)
+ return;
+
+ command.MessageIds[0] = batchMessageId;
+ }
+
try
{
command.ConsumerId = _id;
@@ -105,6 +118,7 @@
command.ConsumerId = _id;
var response = await _connection.Send(command);
response.Expect(BaseCommand.Type.Success);
+ _batchHandler.Clear();
return response.Success;
}
catch (Exception exception)
@@ -147,5 +161,38 @@
if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
_proxy.Disconnected();
}
+
+ private async ValueTask SendFlow()
+ {
+ await _connection.Send(_cachedCommandFlow); //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+
+ if (_firstFlow)
+ {
+ _cachedCommandFlow.MessagePermits = (uint)Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
+ _firstFlow = false;
+ }
+
+ _sendWhenZero = _cachedCommandFlow.MessagePermits;
+ }
+
+ private async ValueTask<bool> Validate(MessagePackage messagePackage)
+ {
+ var magicNumberMatches = messagePackage.Data.StartsWith(Constants.MagicNumber);
+ var expectedChecksum = messagePackage.Data.ReadUInt32(2, true);
+ var actualChecksum = Crc32C.Calculate(messagePackage.Data.Slice(6));
+ if (!magicNumberMatches || expectedChecksum != actualChecksum)
+ {
+ var ack = new CommandAck
+ {
+ Type = CommandAck.AckType.Individual,
+ validation_error = CommandAck.ValidationError.ChecksumMismatch
+ };
+ ack.MessageIds.Add(messagePackage.MessageId);
+ await Send(ack);
+ return false;
+ }
+
+ return true;
+ }
}
}
diff --git a/src/DotPulsar/Internal/ConsumerStreamFactory.cs b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
index a95e499..ba62d6f 100644
--- a/src/DotPulsar/Internal/ConsumerStreamFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
@@ -12,12 +12,14 @@
private readonly IFaultStrategy _faultStrategy;
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
+ private readonly BatchHandler _batchHandler;
public ConsumerStreamFactory(ConnectionPool connectionManager, ConsumerOptions options, IFaultStrategy faultStrategy)
{
_connectionTool = connectionManager;
_faultStrategy = faultStrategy;
_messagePrefetchCount = options.MessagePrefetchCount;
+ _batchHandler = new BatchHandler(true);
_subscribe = new CommandSubscribe
{
@@ -36,6 +38,7 @@
_connectionTool = connectionManager;
_faultStrategy = faultStrategy;
_messagePrefetchCount = options.MessagePrefetchCount;
+ _batchHandler = new BatchHandler(false);
_subscribe = new CommandSubscribe
{
@@ -56,7 +59,7 @@
{
var connection = await _connectionTool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
var response = await connection.Send(_subscribe, proxy);
- return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy);
+ return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy, _batchHandler);
}
catch (OperationCanceledException)
{
diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
index 4da0cdd..3e54624 100644
--- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
@@ -17,12 +17,6 @@
public static void SetEventTime(this Metadata metadata, DateTimeOffset timestamp)
=> metadata.EventTime = (ulong)timestamp.ToUnixTimeMilliseconds();
- public static DateTimeOffset GetPublishTimeAsDateTimeOffset(this Metadata metadata)
- => DateTimeOffset.FromUnixTimeMilliseconds((long)metadata.PublishTime);
-
- public static void SetPublishTime(this Metadata metadata, DateTimeOffset timestamp)
- => metadata.PublishTime = (ulong)timestamp.ToUnixTimeMilliseconds();
-
public static byte[]? GetKeyAsBytes(this Metadata metadata)
=> metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null;
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index e477974..df4aaca 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -30,7 +30,7 @@
return false;
}
- public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, int start, bool isBigEndian)
+ public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, long start, bool isBigEndian)
{
if (sequence.Length < (4 + start))
throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
@@ -48,7 +48,7 @@
}
var span = memory.Span;
- for (var i = start; i < span.Length; ++i, ++read)
+ for (var i = (int)start; i < span.Length; ++i, ++read)
{
switch (read)
{
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index 2aaf738..39864ba 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -3,15 +3,15 @@
namespace DotPulsar.Internal
{
- public sealed class MessagePackage
+ public struct MessagePackage
{
- public MessagePackage(CommandMessage command, ReadOnlySequence<byte> data)
+ public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
{
- Command = command;
+ MessageId = messageId;
Data = data;
}
- public CommandMessage Command { get; }
+ public MessageIdData MessageId { get; }
public ReadOnlySequence<byte> Data { get; }
}
}
diff --git a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
index 5a1a834..fa273c9 100644
--- a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
+++ b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
@@ -359,6 +359,16 @@
public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
public void ResetOrderingKey() => __pbn__OrderingKey = null;
private byte[] __pbn__OrderingKey;
+
+ [global::ProtoBuf.ProtoMember(8, Name = @"sequence_id")]
+ public ulong SequenceId
+ {
+ get { return __pbn__SequenceId.GetValueOrDefault(); }
+ set { __pbn__SequenceId = value; }
+ }
+ public bool ShouldSerializeSequenceId() => __pbn__SequenceId != null;
+ public void ResetSequenceId() => __pbn__SequenceId = null;
+ private ulong? __pbn__SequenceId;
}
[global::ProtoBuf.ProtoContract()]
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
index 866ccce..ddffc69 100644
--- a/src/DotPulsar/Internal/Serializer.cs
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -1,6 +1,4 @@
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
+using DotPulsar.Internal.PulsarApi;
using System;
using System.Buffers;
using System.IO;
@@ -9,30 +7,12 @@
{
public static class Serializer
{
- private static readonly byte[] MagicNumber = new byte[] { 0x0e, 0x01 };
-
public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
{
using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
return ProtoBuf.Serializer.Deserialize<T>(ms);
}
- public static Message Deserialize(MessagePackage package)
- {
- var sequence = package.Data;
- var magicNumberMatches = sequence.StartsWith(MagicNumber);
- if (!magicNumberMatches)
- throw new ChecksumException("Magic number don't match");
- var expectedChecksum = sequence.ReadUInt32(2, true);
- var actualChecksum = Crc32C.Calculate(sequence.Slice(6));
- if (expectedChecksum != actualChecksum)
- throw new ChecksumException(expectedChecksum, actualChecksum);
- var metaSize = sequence.ReadUInt32(6, true);
- var meta = Deserialize<PulsarApi.MessageMetadata>(sequence.Slice(10, metaSize));
- var data = sequence.Slice(10 + metaSize);
- return new Message(new MessageId(package.Command.MessageId), meta, data);
- }
-
public static ReadOnlySequence<byte> Serialize(BaseCommand command)
{
var commandBytes = Serialize<BaseCommand>(command);
@@ -58,7 +38,7 @@
var checksum = Crc32C.Calculate(sb.Build());
return sb.Prepend(ToBigEndianBytes(checksum))
- .Prepend(MagicNumber)
+ .Prepend(Constants.MagicNumber)
.Prepend(commandBytes)
.Prepend(commandSizeBytes)
.Prepend(ToBigEndianBytes((uint)sb.Length))
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 5447e65..7ca62be 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -1,5 +1,4 @@
-using DotPulsar.Internal.Extensions;
-using System;
+using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
@@ -8,41 +7,61 @@
{
public sealed class Message
{
- private readonly Internal.PulsarApi.MessageMetadata _messageMetadata;
+ private readonly List<Internal.PulsarApi.KeyValue> _keyVaues;
private IReadOnlyDictionary<string, string>? _properties;
- internal Message(MessageId messageId, Internal.PulsarApi.MessageMetadata messageMetadata, ReadOnlySequence<byte> data)
+ internal Message(
+ MessageId messageId,
+ Internal.PulsarApi.MessageMetadata metadata,
+ Internal.PulsarApi.SingleMessageMetadata? singleMetadata,
+ ReadOnlySequence<byte> data)
{
MessageId = messageId;
- _messageMetadata = messageMetadata;
+ ProducerName = metadata.ProducerName;
+ PublishTime = metadata.PublishTime;
Data = data;
+
+ if (singleMetadata is null)
+ {
+ EventTime = metadata.EventTime;
+ HasBase64EncodedKey = metadata.PartitionKeyB64Encoded;
+ Key = metadata.PartitionKey;
+ SequenceId = metadata.SequenceId;
+ OrderingKey = metadata.OrderingKey;
+ _keyVaues = metadata.Properties;
+ }
+ else
+ {
+ EventTime = singleMetadata.EventTime;
+ HasBase64EncodedKey = singleMetadata.PartitionKeyB64Encoded;
+ Key = singleMetadata.PartitionKey;
+ OrderingKey = singleMetadata.OrderingKey;
+ SequenceId = singleMetadata.SequenceId;
+ _keyVaues = singleMetadata.Properties;
+ }
}
public MessageId MessageId { get; }
public ReadOnlySequence<byte> Data { get; }
- public string ProducerName => _messageMetadata.ProducerName;
- public ulong SequenceId => _messageMetadata.SequenceId;
+ public string ProducerName { get; }
+ public ulong SequenceId { get; }
- public bool HasEventTime => _messageMetadata.EventTime != 0;
- public ulong EventTime => _messageMetadata.EventTime;
- public DateTimeOffset EventTimeAsDateTimeOffset => _messageMetadata.GetEventTimeAsDateTimeOffset();
+ public bool HasEventTime => EventTime != 0;
+ public ulong EventTime { get; }
+ public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)EventTime);
- public bool HasBase64EncodedKey => _messageMetadata.PartitionKeyB64Encoded;
- public bool HasKey => _messageMetadata.PartitionKey != null;
- public string? Key => _messageMetadata.PartitionKey;
- public byte[]? KeyBytes => _messageMetadata.GetKeyAsBytes();
+ public bool HasBase64EncodedKey { get; }
+ public bool HasKey => Key != null;
+ public string? Key { get; }
+ public byte[]? KeyBytes => HasBase64EncodedKey ? Convert.FromBase64String(Key) : null;
- public bool HasOrderingKey => _messageMetadata.OrderingKey != null;
- public byte[]? OrderingKey => _messageMetadata.OrderingKey;
+ public bool HasOrderingKey => OrderingKey != null;
+ public byte[]? OrderingKey { get; }
- public ulong PublishTime => _messageMetadata.PublishTime;
- public DateTimeOffset PublishTimeAsDateTimeOffset => _messageMetadata.GetPublishTimeAsDateTimeOffset();
+ public ulong PublishTime { get; }
+ public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)PublishTime);
-
- public IReadOnlyDictionary<string, string> Properties
- {
- get => _properties ??= _messageMetadata.Properties.ToDictionary(p => p.Key, p => p.Value);
- }
+ public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyVaues.ToDictionary(p => p.Key, p => p.Value);
}
}