Making it easier to create messages for testing.
diff --git a/src/DotPulsar/Internal/ActionExceptionHandler.cs b/src/DotPulsar/Internal/ActionExceptionHandler.cs
index 685b672..0327153 100644
--- a/src/DotPulsar/Internal/ActionExceptionHandler.cs
+++ b/src/DotPulsar/Internal/ActionExceptionHandler.cs
@@ -1,4 +1,18 @@
-namespace DotPulsar.Internal
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
using System;
diff --git a/src/DotPulsar/Internal/BatchHandler.cs b/src/DotPulsar/Internal/BatchHandler.cs
index 3da59f3..9523627 100644
--- a/src/DotPulsar/Internal/BatchHandler.cs
+++ b/src/DotPulsar/Internal/BatchHandler.cs
@@ -51,7 +51,7 @@
var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
index += singleMetadataSize;
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
- var message = new Message(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
+ var message = MessageFactory.Create(singleMessageId, redeliveryCount, metadata, singleMetadata, data.Slice(index, singleMetadata.PayloadSize));
_messages.Enqueue(message);
index += (uint) singleMetadata.PayloadSize;
}
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 3a632e9..eef5d8d 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -88,7 +88,7 @@
return metadata.ShouldSerializeNumMessagesInBatch()
? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
- : new Message(new MessageId(messageId), redeliveryCount, metadata, null, data);
+ : MessageFactory.Create(new MessageId(messageId), redeliveryCount, metadata, data);
}
}
}
diff --git a/src/DotPulsar/Internal/MessageFactory.cs b/src/DotPulsar/Internal/MessageFactory.cs
new file mode 100644
index 0000000..87bf110
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageFactory.cs
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using DotPulsar.Internal.PulsarApi;
+ using System.Buffers;
+ using System.Collections.Generic;
+
+ public static class MessageFactory
+ {
+ private static readonly Dictionary<string, string> _empty;
+
+ static MessageFactory() => _empty = new Dictionary<string, string>();
+
+ private static IReadOnlyDictionary<string, string> FromKeyValueList(List<KeyValue> keyValues)
+ {
+ if (keyValues.Count == 0)
+ return _empty;
+
+ var dictionary = new Dictionary<string, string>(keyValues.Count);
+
+ for (var i = 0; i < keyValues.Count; ++i)
+ {
+ var keyValue = keyValues[i];
+ dictionary.Add(keyValue.Key, keyValue.Value);
+ }
+
+ return dictionary;
+ }
+
+ public static Message Create(
+ MessageId messageId,
+ uint redeliveryCount,
+ MessageMetadata metadata,
+ ReadOnlySequence<byte> data)
+ {
+ return new Message(
+ messageId: messageId,
+ data: data,
+ producerName: metadata.ProducerName,
+ sequenceId: metadata.SequenceId,
+ redeliveryCount: redeliveryCount,
+ eventTime: metadata.EventTime,
+ publishTime: metadata.PublishTime,
+ properties: FromKeyValueList(metadata.Properties),
+ hasBase64EncodedKey: metadata.PartitionKeyB64Encoded,
+ key: metadata.PartitionKey,
+ orderingKey: metadata.OrderingKey);
+ }
+
+ public static Message Create(
+ MessageId messageId,
+ uint redeliveryCount,
+ MessageMetadata metadata,
+ SingleMessageMetadata singleMetadata,
+ ReadOnlySequence<byte> data)
+ {
+ return new Message(
+ messageId: messageId,
+ data: data,
+ producerName: metadata.ProducerName,
+ sequenceId: singleMetadata.SequenceId,
+ redeliveryCount: redeliveryCount,
+ eventTime: singleMetadata.EventTime,
+ publishTime: metadata.PublishTime,
+ properties: FromKeyValueList(singleMetadata.Properties),
+ hasBase64EncodedKey: singleMetadata.PartitionKeyB64Encoded,
+ key: singleMetadata.PartitionKey,
+ orderingKey: singleMetadata.OrderingKey);
+ }
+
+ /// <summary>
+ /// Intended for testing.
+ /// </summary>
+ public static Message Create(
+ MessageId messageId,
+ ReadOnlySequence<byte> data,
+ string producerName,
+ ulong sequenceId,
+ uint redeliveryCount,
+ ulong eventTime,
+ ulong publishTime,
+ IReadOnlyDictionary<string, string> properties,
+ bool hasBase64EncodedKey,
+ string? key,
+ byte[]? orderingKey) => new Message(messageId, data, producerName, sequenceId, redeliveryCount, eventTime, publishTime, properties, hasBase64EncodedKey, key, orderingKey);
+ }
+}
diff --git a/src/DotPulsar/Message.cs b/src/DotPulsar/Message.cs
index c921711..812ed8f 100644
--- a/src/DotPulsar/Message.cs
+++ b/src/DotPulsar/Message.cs
@@ -14,51 +14,39 @@
namespace DotPulsar
{
- using Internal.PulsarApi;
using System;
using System.Buffers;
using System.Collections.Generic;
- using System.Linq;
/// <summary>
/// The message received by consumers and readers.
/// </summary>
public sealed class Message
{
- private readonly List<KeyValue> _keyValues;
- private IReadOnlyDictionary<string, string>? _properties;
-
internal Message(
MessageId messageId,
+ ReadOnlySequence<byte> data,
+ string producerName,
+ ulong sequenceId,
uint redeliveryCount,
- Internal.PulsarApi.MessageMetadata metadata,
- SingleMessageMetadata? singleMetadata,
- ReadOnlySequence<byte> data)
+ ulong eventTime,
+ ulong publishTime,
+ IReadOnlyDictionary<string, string> properties,
+ bool hasBase64EncodedKey,
+ string? key,
+ byte[]? orderingKey)
{
MessageId = messageId;
- RedeliveryCount = redeliveryCount;
- ProducerName = metadata.ProducerName;
- PublishTime = metadata.PublishTime;
Data = data;
-
- if (singleMetadata is null)
- {
- EventTime = metadata.EventTime;
- HasBase64EncodedKey = metadata.PartitionKeyB64Encoded;
- Key = metadata.PartitionKey;
- SequenceId = metadata.SequenceId;
- OrderingKey = metadata.OrderingKey;
- _keyValues = metadata.Properties;
- }
- else
- {
- EventTime = singleMetadata.EventTime;
- HasBase64EncodedKey = singleMetadata.PartitionKeyB64Encoded;
- Key = singleMetadata.PartitionKey;
- OrderingKey = singleMetadata.OrderingKey;
- SequenceId = singleMetadata.SequenceId;
- _keyValues = singleMetadata.Properties;
- }
+ ProducerName = producerName;
+ SequenceId = sequenceId;
+ RedeliveryCount = redeliveryCount;
+ EventTime = eventTime;
+ PublishTime = publishTime;
+ Properties = properties;
+ HasBase64EncodedKey = hasBase64EncodedKey;
+ Key = key;
+ OrderingKey = orderingKey;
}
/// <summary>
@@ -144,6 +132,6 @@
/// <summary>
/// The properties of the message.
/// </summary>
- public IReadOnlyDictionary<string, string> Properties => _properties ??= _keyValues.ToDictionary(p => p.Key, p => p.Value);
+ public IReadOnlyDictionary<string, string> Properties { get; }
}
}