blob: 7937d9335a39cff5ccc79ce7e4cc600697d93f0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Proto = Apache.Rocketmq.V2;
using Org.Apache.Rocketmq.Error;
namespace Org.Apache.Rocketmq
{
/// <summary>
/// Provides a view for message to publish.
/// </summary>
public class PublishingMessage : Message
{
public MessageType MessageType { get; }
internal string MessageId { get; }
public PublishingMessage(Message message, PublishingSettings publishingSettings, bool txEnabled) : base(
message)
{
var maxBodySizeBytes = publishingSettings.GetMaxBodySizeBytes();
if (message.Body.Length > maxBodySizeBytes)
{
throw new IOException($"Message body size exceed the threshold, max size={maxBodySizeBytes} bytes");
}
// Generate message id.
MessageId = MessageIdGenerator.GetInstance().Next();
// For NORMAL message.
if (string.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue &&
!txEnabled)
{
MessageType = MessageType.Normal;
return;
}
// For FIFO message.
if (!string.IsNullOrEmpty(message.MessageGroup) && !txEnabled)
{
MessageType = MessageType.Fifo;
return;
}
// For DELAY message.
if (message.DeliveryTimestamp.HasValue && !txEnabled)
{
MessageType = MessageType.Delay;
return;
}
// For TRANSACTION message.
if (!string.IsNullOrEmpty(message.MessageGroup) || message.DeliveryTimestamp.HasValue || !txEnabled)
{
throw new InternalErrorException(
"Transactional message should not set messageGroup or deliveryTimestamp");
}
MessageType = MessageType.Transaction;
}
public Proto::Message ToProtobuf(int queueId)
{
var systemProperties = new Proto.SystemProperties
{
Keys = { Keys },
MessageId = MessageId,
BornTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
BornHost = Utilities.GetHostName(),
BodyEncoding = EncodingHelper.ToProtobuf(MqEncoding.Identity),
QueueId = queueId,
MessageType = MessageTypeHelper.ToProtobuf(MessageType)
};
if (null != Tag)
{
systemProperties.Tag = Tag;
}
if (DeliveryTimestamp.HasValue)
{
systemProperties.DeliveryTimestamp = Timestamp.FromDateTime(DeliveryTimestamp.Value.ToUniversalTime());
}
if (null != MessageGroup)
{
systemProperties.MessageGroup = MessageGroup;
}
var topicResource = new Proto.Resource
{
Name = Topic
};
return new Proto.Message
{
Topic = topicResource,
Body = ByteString.CopyFrom(Body),
SystemProperties = systemProperties,
UserProperties = { Properties }
};
}
}
}