blob: 2e6a6ec6486b0d2cdd142fe4c47fe987d6e90d3e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
using System;
using rmq = Apache.Rocketmq.V2;
using grpc = Grpc.Core;
using NLog;
using System.Diagnostics.Metrics;
namespace Org.Apache.Rocketmq
public abstract class Client : ClientConfig, IClient
protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
protected Client(string accessUrl)
AccessPoint = new AccessPoint(accessUrl);
AccessPointScheme = AccessPoint.HostScheme();
var serviceEndpoint = new rmq::Address
Host = AccessPoint.Host,
Port = AccessPoint.Port
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
_resourceNamespace = "";
ClientSettings = new rmq::Settings
AccessPoint = new rmq::Endpoints
Scheme = AccessPoint.HostScheme()
ClientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
ClientSettings.UserAgent = new rmq.UA
Language = rmq::Language.DotNet,
Version = "5.0.0",
Platform = Environment.OSVersion.ToString(),
Hostname = System.Net.Dns.GetHostName()
Manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
_telemetryCts = new CancellationTokenSource();
public virtual async Task Start()
Schedule(async () =>
Logger.Debug("Update topic route by schedule");
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
// Get routes for topics of interest.
Logger.Debug("Step of #Start: get route for topics of interest");
await UpdateTopicRoute();
string accessPointUrl = AccessPoint.TargetUrl();
await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
Logger.Debug($"Session has been created for {accessPointUrl}");
await Heartbeat();
public virtual async Task Shutdown()
Logger.Info($"Shutdown client");
await Manager.Shutdown();
private string FilterBroker(Func<string, bool> acceptor)
foreach (var item in _topicRouteTable)
foreach (var partition in item.Value.MessageQueues)
var target = Utilities.TargetUrl(partition);
if (acceptor(target))
return target;
return null;
* Return all endpoints of brokers in route table.
private List<string> AvailableBrokerEndpoints()
var endpoints = new List<string>();
foreach (var item in _topicRouteTable)
foreach (var partition in item.Value.MessageQueues)
string endpoint = Utilities.TargetUrl(partition);
if (!endpoints.Contains(endpoint))
return endpoints;
private async Task UpdateTopicRoute()
HashSet<string> topics = new HashSet<string>();
foreach (var topic in _topicsOfInterest)
foreach (var item in _topicRouteTable)
Logger.Debug($"Fetch topic route for {topics.Count} topics");
// Wrap topics into list such that we can map async result to topic
List<string> topicList = new List<string>();
var tasks = new List<Task<TopicRouteData>>();
foreach (var item in topicList)
tasks.Add(GetRouteFor(item, true));
// Update topic route data
TopicRouteData[] result = await Task.WhenAll(tasks);
var i = 0;
foreach (var item in result)
if (null == item)
Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
if (0 == item.MessageQueues.Count)
Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
var topicName = item.MessageQueues[0].Topic.Name;
// Make assertion
var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
_topicRouteTable[topicName] = item;
protected void Schedule(Action action, int seconds, CancellationToken token)
if (null == action)
// TODO: log warning
Task.Run(async () =>
while (!token.IsCancellationRequested)
await Task.Delay(TimeSpan.FromSeconds(seconds), token);
* Parameters:
* topic
* Topic to query
* direct
* Indicate if we should by-pass cache and fetch route entries from name server.
protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
Logger.Debug($"Get route for topic={topic}, direct={direct}");
if (!direct && _topicRouteTable.ContainsKey(topic))
Logger.Debug($"Return cached route for {topic}");
return _topicRouteTable[topic];
// We got one or more name servers available.
var request = new rmq::QueryRouteRequest
Topic = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = topic
Endpoints = new rmq::Endpoints
Scheme = AccessPointScheme
foreach (var address in AccessPointEndpoints)
var metadata = new grpc.Metadata();
Signature.Sign(this, metadata);
int index = _random.Next(0, AccessPointEndpoints.Count);
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
Logger.Debug($"Resolving route for topic={topic}");
var topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
Logger.Debug($"Got route entries for {topic} from name server");
_topicRouteTable.TryAdd(topic, topicRouteData);
Logger.Debug($"Got route for {topic} from {target}");
return topicRouteData;
Logger.Warn($"Failed to query route of {topic} from {target}");
catch (Exception e)
Logger.Warn(e, "Failed when querying route");
return null;
protected abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
public async Task Heartbeat()
List<string> endpoints = AvailableBrokerEndpoints();
if (0 == endpoints.Count)
Logger.Debug("No broker endpoints available in topic route");
var request = new rmq::HeartbeatRequest
Group = null,
ClientType = rmq.ClientType.Unspecified
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
await Task.WhenAll(tasks);
private List<string> BlockedBrokerEndpoints()
List<string> endpoints = new List<string>();
return endpoints;
private void RemoveFromBlockList(string endpoint)
protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string topic, string group)
// Pick a broker randomly
string target = FilterBroker((s) => true);
var request = new rmq::QueryAssignmentRequest
Topic = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = topic
Group = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = group
Endpoints = new rmq::Endpoints
Scheme = AccessPointScheme
foreach (var endpoint in AccessPointEndpoints)
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
catch (System.Exception e)
Logger.Warn(e, $"Failed to acquire load assignments from {target}");
// Just return an empty list.
return new List<rmq.Assignment>();
private string TargetUrl(rmq::Assignment assignment)
var broker = assignment.MessageQueue.Broker;
var addresses = broker.Endpoints.Addresses;
// TODO: use the first address for now.
var address = addresses[0];
return $"https://{address.Host}:{address.Port}";
public virtual void BuildClientSetting(rmq::Settings settings)
private async Task CreateSession(string url)
Logger.Debug($"Create session for url={url}");
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
var stream = Manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
await session.Loop();
internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
var targetUrl = TargetUrl(assignment);
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
var request = new rmq::ReceiveMessageRequest
Group = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = group
MessageQueue = assignment.MessageQueue
var messages = await Manager.ReceiveMessage(targetUrl, metadata, request,
return messages;
public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
var request = new rmq::AckMessageRequest
Group = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = group
Topic = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = topic
var entry = new rmq::AckMessageEntry
ReceiptHandle = receiptHandle,
MessageId = messageId
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.Ack(target, metadata, request, RequestTimeout);
public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
var request = new rmq::ChangeInvisibleDurationRequest
ReceiptHandle = receiptHandle,
Group = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = group
Topic = new rmq::Resource
ResourceNamespace = _resourceNamespace,
Name = topic
MessageId = messageId
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
public async Task<bool> NotifyClientTermination(rmq.Resource group)
List<string> endpoints = AvailableBrokerEndpoints();
var request = new rmq::NotifyClientTerminationRequest
Group = group
var metadata = new grpc.Metadata();
Signature.Sign(this, metadata);
List<Task<Boolean>> tasks = new List<Task<Boolean>>();
foreach (var endpoint in endpoints)
tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
bool[] results = await Task.WhenAll(tasks);
foreach (bool b in results)
if (!b)
return false;
return true;
internal virtual void OnSettingsReceived(rmq::Settings settings)
if (null != settings.Metric)
ClientSettings.Metric = new rmq::Metric();
if (null != settings.BackoffPolicy)
ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
switch (settings.PubSubCase)
case rmq.Settings.PubSubOneofCase.Publishing:
ClientSettings.Publishing = settings.Publishing;
case rmq.Settings.PubSubOneofCase.Subscription:
ClientSettings.Subscription = settings.Subscription;
protected readonly IClientManager Manager;
private readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
public void AddTopicOfInterest(string topic)
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
private readonly CancellationTokenSource _telemetryCts;
public CancellationTokenSource TelemetryCts()
return _telemetryCts;
protected readonly AccessPoint AccessPoint;
// This field is subject changes from servers.
protected readonly rmq::Settings ClientSettings;
private readonly Random _random = new Random();
private readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
protected const string MeterName = "Apache.RocketMQ.Client";
protected static readonly Meter MetricMeter = new(MeterName, "1.0");