Support reading/consuming batched messages and make ready for 0.6.0
diff --git a/src/DotPulsar.Tests/DotPulsar.Tests.csproj b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
index 3dff614..57ac1e6 100644
--- a/src/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/src/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -1,7 +1,7 @@
 <Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFramework>netcoreapp3.0</TargetFramework>
+    <TargetFrameworks>netcoreapp3.0;netcoreapp22</TargetFrameworks>
     <IsPackable>false</IsPackable>
   </PropertyGroup>
 
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 397d98e..e50959b 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
-    <Version>0.5.0</Version>
+    <Version>0.6.0</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
     <Authors>DanskeCommodities;dblank</Authors>
@@ -11,7 +11,7 @@
     <Title>DotPulsar</Title>
     <PackageTags>Apache;Pulsar</PackageTags>
     <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
-    <PackageReleaseNotes>Beta release - Support sending ReadOnlySequence of bytes</PackageReleaseNotes>
+    <PackageReleaseNotes>Beta release - Support KeyShared subscription and consuming batched messages</PackageReleaseNotes>
     <Description>.NET/C# client library for Apache Pulsar</Description>
     <GenerateDocumentationFile>true</GenerateDocumentationFile>
     <PublishRepositoryUrl>true</PublishRepositoryUrl>
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs b/src/DotPulsar/Exceptions/ChecksumException.cs
index 4068590..4bb382a 100644
--- a/src/DotPulsar/Exceptions/ChecksumException.cs
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -3,7 +3,5 @@
     public sealed class ChecksumException : DotPulsarException
     {
         public ChecksumException(string message) : base(message) { }
-
-        public ChecksumException(uint expectedChecksum, uint actualChecksum) : base($"Checksum mismatch. Excepted {expectedChecksum} but was actually {actualChecksum}") { }
     }
 }
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
new file mode 100644
index 0000000..840676e
--- /dev/null
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -0,0 +1,98 @@
+using DotPulsar.Internal.Extensions;
+using DotPulsar.Internal.PulsarApi;
+using System.Buffers;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace DotPulsar.Internal
+{
+    public sealed class BatchHandler
+    {
+        private readonly bool _trackBatches;
+        private readonly Queue<Message> _messages;
+        private readonly LinkedList<Batch> _batches;
+
+        public BatchHandler(bool trackBatches)
+        {
+            _trackBatches = trackBatches;
+            _messages = new Queue<Message>();
+            _batches = new LinkedList<Batch>();
+        }
+
+        public Message Add(MessageIdData messageId, PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data)
+        {
+            if (_trackBatches)
+                _batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
+
+            long index = 0;
+            for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
+            {
+                var singleMetadataSize = data.ReadUInt32(index, true);
+                index += 4;
+                var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
+                index += singleMetadataSize;
+                var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
+                var message = new Message(singleMessageId, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+                _messages.Enqueue(message);
+                index += (uint)singleMetadata.PayloadSize;
+            }
+
+            return _messages.Dequeue();
+        }
+
+        public Message? GetNext() => _messages.Count == 0 ? null : _messages.Dequeue();
+
+        public void Clear()
+        {
+            _messages.Clear();
+            _batches.Clear();
+        }
+
+        public MessageIdData? Acknowledge(MessageIdData messageId)
+        {
+            foreach (var batch in _batches)
+            {
+                if (messageId.LedgerId != batch.MessageId.LedgerId ||
+                    messageId.EntryId != batch.MessageId.EntryId ||
+                    messageId.Partition != batch.MessageId.Partition)
+                    continue;
+
+                batch.Acknowledge(messageId.BatchIndex);
+                if (batch.IsAcknowledged())
+                {
+                    _batches.Remove(batch);
+                    return batch.MessageId;
+                }
+                break;
+            }
+
+            return null;
+        }
+
+        private sealed class Batch
+        {
+            private readonly BitArray _acknowledgementIndex;
+
+            public Batch(MessageIdData messageId, int numberOfMessages)
+            {
+                MessageId = messageId;
+                _acknowledgementIndex = new BitArray(numberOfMessages, false);
+            }
+
+            public MessageIdData MessageId { get; }
+
+            public void Acknowledge(int batchIndex) => _acknowledgementIndex.Set(batchIndex, true);
+
+            public bool IsAcknowledged()
+            {
+                for (var i = 0; i < _acknowledgementIndex.Length; i++)
+                {
+                    if (!_acknowledgementIndex[i])
+                        return false;
+                }
+
+                return true;
+            }
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 402e624..4c7bd29 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -156,7 +156,7 @@
             switch (command.CommandType)
             {
                 case BaseCommand.Type.Message:
-                    _consumerManager.Incoming(new MessagePackage(command.Message, sequence.Slice(commandSize)));
+                    _consumerManager.Incoming(command.Message, sequence.Slice(commandSize));
                     return;
                 case BaseCommand.Type.CloseConsumer:
                     _consumerManager.Incoming(command.CloseConsumer);
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index 3199b15..e303ef0 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -13,6 +13,7 @@
             PulsarSslScheme = "pulsar+ssl";
             DefaultPulsarPort = 6650;
             DefaultPulsarSSLPort = 6651;
+            MagicNumber = new byte[] { 0x0e, 0x01 };
         }
 
         public static string ClientVersion { get; }
@@ -21,5 +22,6 @@
         public static string PulsarSslScheme { get; }
         public static int DefaultPulsarPort { get; }
         public static int DefaultPulsarSSLPort { get; }
+        public static byte[] MagicNumber { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerManager.cs b/src/DotPulsar/Internal/ConsumerManager.cs
index 597894c..c237608 100644
--- a/src/DotPulsar/Internal/ConsumerManager.cs
+++ b/src/DotPulsar/Internal/ConsumerManager.cs
@@ -1,6 +1,7 @@
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 using System;
+using System.Buffers;
 
 namespace DotPulsar.Internal
 {
@@ -22,11 +23,10 @@
             }
         }
 
-        public void Incoming(MessagePackage package)
+        public void Incoming(CommandMessage message, ReadOnlySequence<byte> data)
         {
-            var consumerId = package.Command.ConsumerId;
-            var proxy = _proxies[consumerId];
-            proxy?.Enqueue(package);
+            var proxy = _proxies[message.ConsumerId];
+            proxy?.Enqueue(new MessagePackage(message.MessageId, data));
         }
 
         public void Incoming(CommandCloseConsumer command) => RemoveConsumer(command.ConsumerId);
diff --git a/src/DotPulsar/Internal/ConsumerStream.cs b/src/DotPulsar/Internal/ConsumerStream.cs
index 776c3df..b617d3b 100644
--- a/src/DotPulsar/Internal/ConsumerStream.cs
+++ b/src/DotPulsar/Internal/ConsumerStream.cs
@@ -1,5 +1,4 @@
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System;
@@ -15,61 +14,75 @@
         private readonly Connection _connection;
         private readonly IFaultStrategy _faultStrategy;
         private readonly IConsumerProxy _proxy;
-        private readonly CommandFlow _commandFlow;
+        private readonly BatchHandler _batchHandler;
+        private readonly CommandFlow _cachedCommandFlow;
         private uint _sendWhenZero;
-        private bool _firstBatch;
+        private bool _firstFlow;
 
-        public ConsumerStream(ulong id, uint messagePrefetchCount, IDequeue<MessagePackage> dequeue, Connection connection, IFaultStrategy faultStrategy, IConsumerProxy proxy)
+        public ConsumerStream(
+            ulong id,
+            uint messagePrefetchCount,
+            IDequeue<MessagePackage> dequeue,
+            Connection connection,
+            IFaultStrategy faultStrategy,
+            IConsumerProxy proxy,
+            BatchHandler batchHandler)
         {
             _id = id;
             _dequeue = dequeue;
             _connection = connection;
             _faultStrategy = faultStrategy;
             _proxy = proxy;
-            _commandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
+            _batchHandler = batchHandler;
+            _cachedCommandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount };
             _sendWhenZero = 0;
-            _firstBatch = true;
+            _firstFlow = true;
         }
 
         public async ValueTask<Message> Receive(CancellationToken cancellationToken)
         {
             while (true)
             {
-                if (_sendWhenZero == 0) //TODO should sending the flow command be handled on other thread and thereby not slow down the consumer?
-                {
-                    await _connection.Send(_commandFlow);
+                if (_sendWhenZero == 0)
+                    await SendFlow();
 
-                    if (_firstBatch)
-                    {
-                        _commandFlow.MessagePermits = (uint)Math.Ceiling(_commandFlow.MessagePermits * 0.5);
-                        _firstBatch = false;
-                    }
-
-                    _sendWhenZero = _commandFlow.MessagePermits;
-                }
-
-                var messagePackage = await _dequeue.Dequeue(cancellationToken);
                 _sendWhenZero--;
 
-                try
-                {
-                    return Serializer.Deserialize(messagePackage);
-                }
-                catch (ChecksumException)
-                {
-                    var ack = new CommandAck
-                    {
-                        Type = CommandAck.AckType.Individual,
-                        validation_error = CommandAck.ValidationError.ChecksumMismatch
-                    };
-                    ack.MessageIds.Add(messagePackage.Command.MessageId);
-                    await Send(ack);
-                }
+                var message = _batchHandler.GetNext();
+                if (message != null)
+                    return message;
+
+                var messagePackage = await _dequeue.Dequeue(cancellationToken);
+
+                if (!await Validate(messagePackage))
+                    continue;
+
+                var messageId = messagePackage.MessageId;
+                var data = messagePackage.Data;
+
+                var metadataSize = data.ReadUInt32(6, true);
+                var metadata = Serializer.Deserialize<PulsarApi.MessageMetadata>(data.Slice(10, metadataSize));
+                data = data.Slice(10 + metadataSize);
+
+                if (metadata.NumMessagesInBatch == 1)
+                    return new Message(new MessageId(messageId), metadata, null, data);
+
+                return _batchHandler.Add(messageId, metadata, data);
             }
         }
 
         public async Task Send(CommandAck command)
         {
+            var messageId = command.MessageIds[0];
+            if (messageId.BatchIndex != -1)
+            {
+                var batchMessageId = _batchHandler.Acknowledge(messageId);
+                if (batchMessageId is null)
+                    return;
+
+                command.MessageIds[0] = batchMessageId;
+            }
+
             try
             {
                 command.ConsumerId = _id;
@@ -105,6 +118,7 @@
                 command.ConsumerId = _id;
                 var response = await _connection.Send(command);
                 response.Expect(BaseCommand.Type.Success);
+                _batchHandler.Clear();
                 return response.Success;
             }
             catch (Exception exception)
@@ -147,5 +161,38 @@
             if (_faultStrategy.DetermineFaultAction(exception) == FaultAction.Relookup)
                 _proxy.Disconnected();
         }
+
+        private async ValueTask SendFlow()
+        {
+            await _connection.Send(_cachedCommandFlow); //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+
+            if (_firstFlow)
+            {
+                _cachedCommandFlow.MessagePermits = (uint)Math.Ceiling(_cachedCommandFlow.MessagePermits * 0.5);
+                _firstFlow = false;
+            }
+
+            _sendWhenZero = _cachedCommandFlow.MessagePermits;
+        }
+
+        private async ValueTask<bool> Validate(MessagePackage messagePackage)
+        {
+            var magicNumberMatches = messagePackage.Data.StartsWith(Constants.MagicNumber);
+            var expectedChecksum = messagePackage.Data.ReadUInt32(2, true);
+            var actualChecksum = Crc32C.Calculate(messagePackage.Data.Slice(6));
+            if (!magicNumberMatches || expectedChecksum != actualChecksum)
+            {
+                var ack = new CommandAck
+                {
+                    Type = CommandAck.AckType.Individual,
+                    validation_error = CommandAck.ValidationError.ChecksumMismatch
+                };
+                ack.MessageIds.Add(messagePackage.MessageId);
+                await Send(ack);
+                return false;
+            }
+
+            return true;
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerStreamFactory.cs b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
index a95e499..ba62d6f 100644
--- a/src/DotPulsar/Internal/ConsumerStreamFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerStreamFactory.cs
@@ -12,12 +12,14 @@
         private readonly IFaultStrategy _faultStrategy;
         private readonly CommandSubscribe _subscribe;
         private readonly uint _messagePrefetchCount;
+        private readonly BatchHandler _batchHandler;
 
         public ConsumerStreamFactory(ConnectionPool connectionManager, ConsumerOptions options, IFaultStrategy faultStrategy)
         {
             _connectionTool = connectionManager;
             _faultStrategy = faultStrategy;
             _messagePrefetchCount = options.MessagePrefetchCount;
+            _batchHandler = new BatchHandler(true);
 
             _subscribe = new CommandSubscribe
             {
@@ -36,6 +38,7 @@
             _connectionTool = connectionManager;
             _faultStrategy = faultStrategy;
             _messagePrefetchCount = options.MessagePrefetchCount;
+            _batchHandler = new BatchHandler(false);
 
             _subscribe = new CommandSubscribe
             {
@@ -56,7 +59,7 @@
                 {
                     var connection = await _connectionTool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
                     var response = await connection.Send(_subscribe, proxy);
-                    return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy);
+                    return new ConsumerStream(response.ConsumerId, _messagePrefetchCount, proxy, connection, _faultStrategy, proxy, _batchHandler);
                 }
                 catch (OperationCanceledException)
                 {
diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
index 4da0cdd..3e54624 100644
--- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs
@@ -17,12 +17,6 @@
         public static void SetEventTime(this Metadata metadata, DateTimeOffset timestamp)
             => metadata.EventTime = (ulong)timestamp.ToUnixTimeMilliseconds();
 
-        public static DateTimeOffset GetPublishTimeAsDateTimeOffset(this Metadata metadata)
-            => DateTimeOffset.FromUnixTimeMilliseconds((long)metadata.PublishTime);
-
-        public static void SetPublishTime(this Metadata metadata, DateTimeOffset timestamp)
-            => metadata.PublishTime = (ulong)timestamp.ToUnixTimeMilliseconds();
-
         public static byte[]? GetKeyAsBytes(this Metadata metadata)
             => metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null;
 
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index e477974..df4aaca 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -30,7 +30,7 @@
             return false;
         }
 
-        public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, int start, bool isBigEndian)
+        public static uint ReadUInt32(this ReadOnlySequence<byte> sequence, long start, bool isBigEndian)
         {
             if (sequence.Length < (4 + start))
                 throw new ArgumentOutOfRangeException(nameof(start), start, "Sequence must be at least 4 bytes long from 'start' to end");
@@ -48,7 +48,7 @@
                 }
 
                 var span = memory.Span;
-                for (var i = start; i < span.Length; ++i, ++read)
+                for (var i = (int)start; i < span.Length; ++i, ++read)
                 {
                     switch (read)
                     {
diff --git a/src/DotPulsar/Internal/MessagePackage.cs b/src/DotPulsar/Internal/MessagePackage.cs
index 2aaf738..39864ba 100644
--- a/src/DotPulsar/Internal/MessagePackage.cs
+++ b/src/DotPulsar/Internal/MessagePackage.cs
@@ -3,15 +3,15 @@
 
 namespace DotPulsar.Internal
 {
-    public sealed class MessagePackage
+    public struct MessagePackage
     {
-        public MessagePackage(CommandMessage command, ReadOnlySequence<byte> data)
+        public MessagePackage(MessageIdData messageId, ReadOnlySequence<byte> data)
         {
-            Command = command;
+            MessageId = messageId;
             Data = data;
         }
 
-        public CommandMessage Command { get; }
+        public MessageIdData MessageId { get; }
         public ReadOnlySequence<byte> Data { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
index 5a1a834..fa273c9 100644
--- a/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
+++ b/src/DotPulsar/Internal/PulsarApi/GeneratedCode.cs
@@ -359,6 +359,16 @@
         public bool ShouldSerializeOrderingKey() => __pbn__OrderingKey != null;
         public void ResetOrderingKey() => __pbn__OrderingKey = null;
         private byte[] __pbn__OrderingKey;
+
+        [global::ProtoBuf.ProtoMember(8, Name = @"sequence_id")]
+        public ulong SequenceId
+        {
+            get { return __pbn__SequenceId.GetValueOrDefault(); }
+            set { __pbn__SequenceId = value; }
+        }
+        public bool ShouldSerializeSequenceId() => __pbn__SequenceId != null;
+        public void ResetSequenceId() => __pbn__SequenceId = null;
+        private ulong? __pbn__SequenceId;
     }
 
     [global::ProtoBuf.ProtoContract()]
diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs
index 866ccce..ddffc69 100644
--- a/src/DotPulsar/Internal/Serializer.cs
+++ b/src/DotPulsar/Internal/Serializer.cs
@@ -1,6 +1,4 @@
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
+using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
 using System.IO;
@@ -9,30 +7,12 @@
 {
     public static class Serializer
     {
-        private static readonly byte[] MagicNumber = new byte[] { 0x0e, 0x01 };
-
         public static T Deserialize<T>(ReadOnlySequence<byte> sequence)
         {
             using var ms = new MemoryStream(sequence.ToArray()); //TODO Fix this when protobuf-net start supporting sequences or .NET supports creating a stream from a sequence
             return ProtoBuf.Serializer.Deserialize<T>(ms);
         }
 
-        public static Message Deserialize(MessagePackage package)
-        {
-            var sequence = package.Data;
-            var magicNumberMatches = sequence.StartsWith(MagicNumber);
-            if (!magicNumberMatches)
-                throw new ChecksumException("Magic number don't match");
-            var expectedChecksum = sequence.ReadUInt32(2, true);
-            var actualChecksum = Crc32C.Calculate(sequence.Slice(6));
-            if (expectedChecksum != actualChecksum)
-                throw new ChecksumException(expectedChecksum, actualChecksum);
-            var metaSize = sequence.ReadUInt32(6, true);
-            var meta = Deserialize<PulsarApi.MessageMetadata>(sequence.Slice(10, metaSize));
-            var data = sequence.Slice(10 + metaSize);
-            return new Message(new MessageId(package.Command.MessageId), meta, data);
-        }
-
         public static ReadOnlySequence<byte> Serialize(BaseCommand command)
         {
             var commandBytes = Serialize<BaseCommand>(command);
@@ -58,7 +38,7 @@
             var checksum = Crc32C.Calculate(sb.Build());
 
             return sb.Prepend(ToBigEndianBytes(checksum))
-                .Prepend(MagicNumber)
+                .Prepend(Constants.MagicNumber)
                 .Prepend(commandBytes)
                 .Prepend(commandSizeBytes)
                 .Prepend(ToBigEndianBytes((uint)sb.Length))
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index 5447e65..7ca62be 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -1,5 +1,4 @@
-using DotPulsar.Internal.Extensions;
-using System;
+using System;
 using System.Buffers;
 using System.Collections.Generic;
 using System.Linq;
@@ -8,41 +7,61 @@
 {
     public sealed class Message
     {
-        private readonly Internal.PulsarApi.MessageMetadata _messageMetadata;
+        private readonly List<Internal.PulsarApi.KeyValue> _keyVaues;
         private IReadOnlyDictionary<string, string>? _properties;
 
-        internal Message(MessageId messageId, Internal.PulsarApi.MessageMetadata messageMetadata, ReadOnlySequence<byte> data)
+        internal Message(
+            MessageId messageId,
+            Internal.PulsarApi.MessageMetadata metadata,
+            Internal.PulsarApi.SingleMessageMetadata? singleMetadata,
+            ReadOnlySequence<byte> data)
         {
             MessageId = messageId;
-            _messageMetadata = messageMetadata;
+            ProducerName = metadata.ProducerName;
+            PublishTime = metadata.PublishTime;
             Data = data;
+
+            if (singleMetadata is null)
+            {
+                EventTime = metadata.EventTime;
+                HasBase64EncodedKey = metadata.PartitionKeyB64Encoded;
+                Key = metadata.PartitionKey;
+                SequenceId = metadata.SequenceId;
+                OrderingKey = metadata.OrderingKey;
+                _keyVaues = metadata.Properties;
+            }
+            else
+            {
+                EventTime = singleMetadata.EventTime;
+                HasBase64EncodedKey = singleMetadata.PartitionKeyB64Encoded;
+                Key = singleMetadata.PartitionKey;
+                OrderingKey = singleMetadata.OrderingKey;
+                SequenceId = singleMetadata.SequenceId;
+                _keyVaues = singleMetadata.Properties;
+            }
         }
 
         public MessageId MessageId { get; }
         public ReadOnlySequence<byte> Data { get; }
-        public string ProducerName => _messageMetadata.ProducerName;
-        public ulong SequenceId => _messageMetadata.SequenceId;
+        public string ProducerName { get; }
+        public ulong SequenceId { get; }
 
-        public bool HasEventTime => _messageMetadata.EventTime != 0;
-        public ulong EventTime => _messageMetadata.EventTime;
-        public DateTimeOffset EventTimeAsDateTimeOffset => _messageMetadata.GetEventTimeAsDateTimeOffset();
+        public bool HasEventTime => EventTime != 0;
+        public ulong EventTime { get; }
+        public DateTimeOffset EventTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)EventTime);
 
-        public bool HasBase64EncodedKey => _messageMetadata.PartitionKeyB64Encoded;
-        public bool HasKey => _messageMetadata.PartitionKey != null;
-        public string? Key => _messageMetadata.PartitionKey;
-        public byte[]? KeyBytes => _messageMetadata.GetKeyAsBytes();
+        public bool HasBase64EncodedKey { get; }
+        public bool HasKey => Key != null;
+        public string? Key { get; }
+        public byte[]? KeyBytes => HasBase64EncodedKey ? Convert.FromBase64String(Key) : null;
 
-        public bool HasOrderingKey => _messageMetadata.OrderingKey != null;
-        public byte[]? OrderingKey => _messageMetadata.OrderingKey;
+        public bool HasOrderingKey => OrderingKey != null;
+        public byte[]? OrderingKey { get; }
 
 
-        public ulong PublishTime => _messageMetadata.PublishTime;
-        public DateTimeOffset PublishTimeAsDateTimeOffset => _messageMetadata.GetPublishTimeAsDateTimeOffset();
+        public ulong PublishTime { get; }
+        public DateTimeOffset PublishTimeAsDateTimeOffset => DateTimeOffset.FromUnixTimeMilliseconds((long)PublishTime);
 
-
-        public IReadOnlyDictionary<string, string> Properties
-        {
-            get => _properties ??= _messageMetadata.Properties.ToDictionary(p => p.Key, p => p.Value);
-        }
+        public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyVaues.ToDictionary(p => p.Key, p => p.Value);
     }
 }