blob: 2e6a6ec6486b0d2cdd142fe4c47fe987d6e90d3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
using System;
using rmq = Apache.Rocketmq.V2;
using grpc = Grpc.Core;
using NLog;
using System.Diagnostics.Metrics;
namespace Org.Apache.Rocketmq
{
public abstract class Client : ClientConfig, IClient
{
protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
protected Client(string accessUrl)
{
AccessPoint = new AccessPoint(accessUrl);
AccessPointScheme = AccessPoint.HostScheme();
var serviceEndpoint = new rmq::Address
{
Host = AccessPoint.Host,
Port = AccessPoint.Port
};
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
_resourceNamespace = "";
ClientSettings = new rmq::Settings
{
AccessPoint = new rmq::Endpoints
{
Scheme = AccessPoint.HostScheme()
}
};
ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
ClientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
ClientSettings.UserAgent = new rmq.UA
{
Language = rmq::Language.DotNet,
Version = "5.0.0",
Platform = Environment.OSVersion.ToString(),
Hostname = System.Net.Dns.GetHostName()
};
Manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
_telemetryCts = new CancellationTokenSource();
}
public virtual async Task Start()
{
Schedule(async () =>
{
Logger.Debug("Update topic route by schedule");
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
// Get routes for topics of interest.
Logger.Debug("Step of #Start: get route for topics of interest");
await UpdateTopicRoute();
string accessPointUrl = AccessPoint.TargetUrl();
CreateSession(accessPointUrl);
await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
Logger.Debug($"Session has been created for {accessPointUrl}");
await Heartbeat();
}
public virtual async Task Shutdown()
{
Logger.Info($"Shutdown client");
_updateTopicRouteCts.Cancel();
_telemetryCts.Cancel();
await Manager.Shutdown();
}
private string FilterBroker(Func<string, bool> acceptor)
{
foreach (var item in _topicRouteTable)
{
foreach (var partition in item.Value.MessageQueues)
{
var target = Utilities.TargetUrl(partition);
if (acceptor(target))
{
return target;
}
}
}
return null;
}
/**
* Return all endpoints of brokers in route table.
*/
private List<string> AvailableBrokerEndpoints()
{
var endpoints = new List<string>();
foreach (var item in _topicRouteTable)
{
foreach (var partition in item.Value.MessageQueues)
{
string endpoint = Utilities.TargetUrl(partition);
if (!endpoints.Contains(endpoint))
{
endpoints.Add(endpoint);
}
}
}
return endpoints;
}
private async Task UpdateTopicRoute()
{
HashSet<string> topics = new HashSet<string>();
foreach (var topic in _topicsOfInterest)
{
topics.Add(topic);
}
foreach (var item in _topicRouteTable)
{
topics.Add(item.Key);
}
Logger.Debug($"Fetch topic route for {topics.Count} topics");
// Wrap topics into list such that we can map async result to topic
List<string> topicList = new List<string>();
topicList.AddRange(topics);
var tasks = new List<Task<TopicRouteData>>();
foreach (var item in topicList)
{
tasks.Add(GetRouteFor(item, true));
}
// Update topic route data
TopicRouteData[] result = await Task.WhenAll(tasks);
var i = 0;
foreach (var item in result)
{
if (null == item)
{
Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
++i;
continue;
}
if (0 == item.MessageQueues.Count)
{
Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
++i;
continue;
}
var topicName = item.MessageQueues[0].Topic.Name;
// Make assertion
Debug.Assert(topicName.Equals(topicList[i]));
var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
{
_topicRouteTable[topicName] = item;
}
++i;
}
}
protected void Schedule(Action action, int seconds, CancellationToken token)
{
if (null == action)
{
// TODO: log warning
return;
}
Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
action();
await Task.Delay(TimeSpan.FromSeconds(seconds), token);
}
});
}
/**
* Parameters:
* topic
* Topic to query
* direct
* Indicate if we should by-pass cache and fetch route entries from name server.
*/
protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
{
Logger.Debug($"Get route for topic={topic}, direct={direct}");
if (!direct && _topicRouteTable.ContainsKey(topic))
{
Logger.Debug($"Return cached route for {topic}");
return _topicRouteTable[topic];
}
// We got one or more name servers available.
var request = new rmq::QueryRouteRequest
{
Topic = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = topic
},
Endpoints = new rmq::Endpoints
{
Scheme = AccessPointScheme
}
};
foreach (var address in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(address);
}
var metadata = new grpc.Metadata();
Signature.Sign(this, metadata);
int index = _random.Next(0, AccessPointEndpoints.Count);
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
try
{
Logger.Debug($"Resolving route for topic={topic}");
var topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name server");
_topicRouteTable.TryAdd(topic, topicRouteData);
Logger.Debug($"Got route for {topic} from {target}");
return topicRouteData;
}
Logger.Warn($"Failed to query route of {topic} from {target}");
}
catch (Exception e)
{
Logger.Warn(e, "Failed when querying route");
}
return null;
}
protected abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
public async Task Heartbeat()
{
List<string> endpoints = AvailableBrokerEndpoints();
if (0 == endpoints.Count)
{
Logger.Debug("No broker endpoints available in topic route");
return;
}
var request = new rmq::HeartbeatRequest
{
Group = null,
ClientType = rmq.ClientType.Unspecified
};
PrepareHeartbeatData(request);
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
}
await Task.WhenAll(tasks);
}
private List<string> BlockedBrokerEndpoints()
{
List<string> endpoints = new List<string>();
return endpoints;
}
private void RemoveFromBlockList(string endpoint)
{
}
protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string topic, string group)
{
// Pick a broker randomly
string target = FilterBroker((s) => true);
var request = new rmq::QueryAssignmentRequest
{
Topic = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = topic
},
Group = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = group
},
Endpoints = new rmq::Endpoints
{
Scheme = AccessPointScheme
}
};
foreach (var endpoint in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(endpoint);
}
try
{
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
}
catch (System.Exception e)
{
Logger.Warn(e, $"Failed to acquire load assignments from {target}");
}
// Just return an empty list.
return new List<rmq.Assignment>();
}
private string TargetUrl(rmq::Assignment assignment)
{
var broker = assignment.MessageQueue.Broker;
var addresses = broker.Endpoints.Addresses;
// TODO: use the first address for now.
var address = addresses[0];
return $"https://{address.Host}:{address.Port}";
}
public virtual void BuildClientSetting(rmq::Settings settings)
{
settings.MergeFrom(ClientSettings);
}
private async Task CreateSession(string url)
{
Logger.Debug($"Create session for url={url}");
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
var stream = Manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
await session.Loop();
}
internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
{
var targetUrl = TargetUrl(assignment);
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
var request = new rmq::ReceiveMessageRequest
{
Group = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = group
},
MessageQueue = assignment.MessageQueue
};
var messages = await Manager.ReceiveMessage(targetUrl, metadata, request,
ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
return messages;
}
public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
{
var request = new rmq::AckMessageRequest
{
Group = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = group
},
Topic = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = topic
}
};
var entry = new rmq::AckMessageEntry
{
ReceiptHandle = receiptHandle,
MessageId = messageId
};
request.Entries.Add(entry);
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.Ack(target, metadata, request, RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
{
var request = new rmq::ChangeInvisibleDurationRequest
{
ReceiptHandle = receiptHandle,
Group = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = group
},
Topic = new rmq::Resource
{
ResourceNamespace = _resourceNamespace,
Name = topic
},
MessageId = messageId
};
var metadata = new grpc::Metadata();
Signature.Sign(this, metadata);
return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
}
public async Task<bool> NotifyClientTermination(rmq.Resource group)
{
List<string> endpoints = AvailableBrokerEndpoints();
var request = new rmq::NotifyClientTerminationRequest
{
Group = group
};
var metadata = new grpc.Metadata();
Signature.Sign(this, metadata);
List<Task<Boolean>> tasks = new List<Task<Boolean>>();
foreach (var endpoint in endpoints)
{
tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
foreach (bool b in results)
{
if (!b)
{
return false;
}
}
return true;
}
internal virtual void OnSettingsReceived(rmq::Settings settings)
{
if (null != settings.Metric)
{
ClientSettings.Metric = new rmq::Metric();
ClientSettings.Metric.MergeFrom(settings.Metric);
}
if (null != settings.BackoffPolicy)
{
ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
}
switch (settings.PubSubCase)
{
case rmq.Settings.PubSubOneofCase.Publishing:
{
ClientSettings.Publishing = settings.Publishing;
break;
}
case rmq.Settings.PubSubOneofCase.Subscription:
{
ClientSettings.Subscription = settings.Subscription;
break;
}
}
}
protected readonly IClientManager Manager;
private readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
public void AddTopicOfInterest(string topic)
{
_topicsOfInterest.Add(topic);
}
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
private readonly CancellationTokenSource _telemetryCts;
public CancellationTokenSource TelemetryCts()
{
return _telemetryCts;
}
protected readonly AccessPoint AccessPoint;
// This field is subject changes from servers.
protected readonly rmq::Settings ClientSettings;
private readonly Random _random = new Random();
private readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
protected const string MeterName = "Apache.RocketMQ.Client";
protected static readonly Meter MetricMeter = new(MeterName, "1.0");
}
}