blob: 5c51cdcf7de0f4caa033b9f94cca9d73bd99315d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading.Tasks;
using rmq = Apache.Rocketmq.V2;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using NLog;
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Metrics;
namespace Org.Apache.Rocketmq
{
public class Producer : Client, IProducer
{
public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
{
_loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
_sendFailureTotal = MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
_sendLatency = MetricMeter.CreateHistogram<double>(SendLatencyName,
description: "Measure the duration of publishing messages to brokers",
unit: "milliseconds");
}
public override async Task Start()
{
await base.Start();
// More initialization
// TODO: Add authentication header
_meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter("Apache.RocketMQ.Client")
.AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
{
options.Protocol = OtlpExportProtocol.Grpc;
options.Endpoint = new Uri(_accessPoint.TargetUrl());
options.TimeoutMilliseconds = (int) _clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 60 * 1000;
})
.AddView((instrument) =>
{
if (instrument.Meter.Name == MeterName && instrument.Name == SendLatencyName)
{
return new ExplicitBucketHistogramConfiguration()
{
Boundaries = new double[] {1, 5, 10, 20, 50, 200, 500},
};
}
return null;
})
.Build();
}
public override async Task Shutdown()
{
// Release local resources
await base.Shutdown();
}
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
request.ClientType = rmq::ClientType.Producer;
// Concept of ProducerGroup has been removed.
}
public async Task<SendReceipt> Send(Message message)
{
if (!_loadBalancer.ContainsKey(message.Topic))
{
var topicRouteData = await GetRouteFor(message.Topic, false);
if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
{
Logger.Error($"Failed to resolve route info for {message.Topic}");
throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
}
var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
_loadBalancer.TryAdd(message.Topic, loadBalancerItem);
}
var publishLb = _loadBalancer[message.Topic];
var request = new rmq::SendMessageRequest();
var entry = new rmq::Message();
entry.Body = ByteString.CopyFrom(message.Body);
entry.Topic = new rmq::Resource();
entry.Topic.ResourceNamespace = resourceNamespace();
entry.Topic.Name = message.Topic;
request.Messages.Add(entry);
// User properties
foreach (var item in message.UserProperties)
{
entry.UserProperties.Add(item.Key, item.Value);
}
entry.SystemProperties = new rmq::SystemProperties();
entry.SystemProperties.MessageId = message.MessageId;
entry.SystemProperties.MessageType = rmq::MessageType.Normal;
if (DateTime.MinValue != message.DeliveryTimestamp)
{
entry.SystemProperties.MessageType = rmq::MessageType.Delay;
entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
if (message.Fifo())
{
Logger.Warn("A message may not be FIFO and delayed at the same time");
throw new MessageException("A message may not be both FIFO and Timed");
}
} else if (!String.IsNullOrEmpty(message.MessageGroup))
{
entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
entry.SystemProperties.MessageGroup = message.MessageGroup;
}
if (!string.IsNullOrEmpty(message.Tag))
{
entry.SystemProperties.Tag = message.Tag;
}
if (0 != message.Keys.Count)
{
foreach (var key in message.Keys)
{
entry.SystemProperties.Keys.Add(key);
}
}
List<string> targets = new List<string>();
List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
foreach (var messageQueue in candidates)
{
targets.Add(Utilities.TargetUrl(messageQueue));
}
var metadata = new Metadata();
Signature.sign(this, metadata);
Exception ex = null;
foreach (var target in targets)
{
try
{
var stopWatch = new Stopwatch();
stopWatch.Start();
rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok == response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
// Account latency histogram
stopWatch.Stop();
var latency = stopWatch.ElapsedMilliseconds;
_sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
return new SendReceipt(messageId);
}
}
catch (Exception e)
{
// Account failure count
_sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));
Logger.Info(e, $"Failed to send message to {target}");
ex = e;
}
}
if (null != ex)
{
Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
throw ex;
}
Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
throw new Exception("Send message failed");
}
private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
private readonly Counter<long> _sendFailureTotal;
private readonly Histogram<double> _sendLatency;
private static readonly string SendLatencyName = "rocketmq_send_success_cost_time";
private MeterProvider _meterProvider;
}
}