Merge back to master (#21)
* Refactor ClientManager and RpcClient (#9)
* Ensure rpc stub dispose correctly when shutdown (#10)
* Transparent retry when query topic route entries
* Make sure stub shutdown gracefully
* Log when something is wrong (#11)
* Supply the residual RPC request (#12)
* Fix a series of naming issues (#13)
* Implement the first alpha version of PushConsumer, with many TODOs (#14)
Implement an initial version of PushConsumer
* Polish code (#15)
* WIP: Start to adapt to protocol v2
* WIP: debug telemetry bi-direction streaming
* Fix telemetry by adding x-mq-client-id header
* WIP:
* Make producer work
* Polish code (#20)
* Add package OpenTelemetry and Opentelemetry.API
* WIP: write unit tests
* WIP
* Make Shutdown async
* WIP: refactor start procedure
* WIP
* WIP
* WIP: receive messages
* WIP: add change invisible duration
* Minor fix
* Adjust message
* Add unit tests for Producer.Send
* WIP: prepare to add unit test for SimpleConsumer.Receive
* WIP: Unit tests for SimpleConsumer.Receive
* Add unit tests for SimpleConsumer
* Collect metrics for Producer
* Fix minor issue
* Add custom otlp exporter
* add MeterProvider
Co-authored-by: aaron ai <yangkun.ayk@alibaba-inc.com>
diff --git a/.gitignore b/.gitignore
index 678b6cc..c6ddbae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@
.vscode/
.idea
*.user
-*DS_Store
\ No newline at end of file
+*DS_Store
+.fake
\ No newline at end of file
diff --git a/examples/Program.cs b/examples/Program.cs
index 9bf745c..09a1674 100644
--- a/examples/Program.cs
+++ b/examples/Program.cs
@@ -5,20 +5,24 @@
namespace examples
{
- class Foo {
+ class Foo
+ {
public int bar = 1;
}
class Program
{
- static void RT(Action action, int seconds, CancellationToken token) {
- if (null == action) {
+ static void RT(Action action, int seconds, CancellationToken token)
+ {
+ if (null == action)
+ {
return;
}
Task.Run(async () =>
{
- while(!token.IsCancellationRequested) {
+ while (!token.IsCancellationRequested)
+ {
action();
await Task.Delay(TimeSpan.FromSeconds(seconds), token);
}
@@ -31,7 +35,7 @@
string accessKey = "key";
string accessSecret = "secret";
- var credentials = new org.apache.rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();
+ var credentials = new Org.Apache.Rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();
bool expired = credentials.expired();
int workerThreads;
@@ -44,7 +48,8 @@
ThreadPool.QueueUserWorkItem((Object stateInfo) =>
{
Console.WriteLine("From ThreadPool");
- if (stateInfo is Foo) {
+ if (stateInfo is Foo)
+ {
Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar);
}
}, new Foo());
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/AccessPoint.cs
similarity index 66%
rename from rocketmq-client-csharp/StaticNameServerResolver.cs
rename to rocketmq-client-csharp/AccessPoint.cs
index 9f97599..cf4e1f4 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/AccessPoint.cs
@@ -14,25 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
- public class StaticNameServerResolver : INameServerResolver
+ public class AccessPoint
{
+ private string _host;
- public StaticNameServerResolver(List<string> nameServerList)
+ public string Host
{
- this.nameServerList = nameServerList;
+ get { return _host; }
+ set { _host = value; }
}
- public async Task<List<string>> resolveAsync()
+ private int _port;
+
+ public int Port
{
- return nameServerList;
+ get { return _port; }
+ set { _port = value; }
}
- private List<string> nameServerList;
+ public string TargetUrl()
+ {
+ return $"https://{_host}:{_port}";
+ }
}
-}
\ No newline at end of file
+}
diff --git a/rocketmq-client-csharp/Address.cs b/rocketmq-client-csharp/Address.cs
deleted file mode 100644
index dadf346..0000000
--- a/rocketmq-client-csharp/Address.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-namespace org.apache.rocketmq {
- public class Address {
- public Address(string host, int port) {
- this.host = host;
- this.port = port;
- }
-
- private string host;
- public string Host {
- get { return host; }
- }
-
- private int port;
- public int Port {
- get { return port; }
- }
-
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Broker.cs b/rocketmq-client-csharp/Broker.cs
deleted file mode 100644
index 2f5f675..0000000
--- a/rocketmq-client-csharp/Broker.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace org.apache.rocketmq {
- public class Broker : IComparable<Broker>, IEquatable<Broker> {
-
- public Broker(string name, int id, ServiceAddress address) {
- this.name = name;
- this.id = id;
- this.address = address;
- }
-
- private string name;
- public string Name {
- get { return name; }
- }
-
- private int id;
- public int Id {
- get { return id; }
- }
-
- private ServiceAddress address;
- public ServiceAddress Address {
- get { return address; }
- }
-
- /**
- * Context aware primary target URL.
- */
- public string targetUrl()
- {
- var addr = address.Addresses[0];
- return string.Format("https://{0}:{1}", addr.Host, addr.Port);
- }
-
- public int CompareTo(Broker other) {
- if (0 != name.CompareTo(other.name)) {
- return name.CompareTo(other.name);
- }
-
- return id.CompareTo(other.id);
- }
-
- public bool Equals(Broker other) {
- return name.Equals(other.name) && id.Equals(other.id);
- }
-
- public override bool Equals(Object other) {
- if (!(other is Broker)) {
- return false;
- }
- return Equals(other as Broker);
- }
-
- public override int GetHashCode()
- {
- return HashCode.Combine(name, id);
- }
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index dac6d89..32dffae 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -19,113 +19,176 @@
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
+using System.Diagnostics;
using System;
-using rmq = apache.rocketmq.v1;
+using rmq = Apache.Rocketmq.V2;
using grpc = global::Grpc.Core;
+using NLog;
+using System.Diagnostics.Metrics;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public abstract class Client : ClientConfig, IClient
{
+ protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public Client(INameServerResolver resolver, string resourceNamespace)
+ protected Client(AccessPoint accessPoint, string resourceNamespace)
{
- this.nameServerResolver = resolver;
- this.resourceNamespace_ = resourceNamespace;
- this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace);
- this.nameServerResolverCTS = new CancellationTokenSource();
+ _accessPoint = accessPoint;
- this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
- this.updateTopicRouteCTS = new CancellationTokenSource();
+ // Support IPv4 for now
+ AccessPointScheme = rmq::AddressScheme.Ipv4;
+ var serviceEndpoint = new rmq::Address();
+ serviceEndpoint.Host = accessPoint.Host;
+ serviceEndpoint.Port = accessPoint.Port;
+ AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
+
+ _resourceNamespace = resourceNamespace;
+
+ _clientSettings = new rmq::Settings();
+
+ _clientSettings.AccessPoint = new rmq::Endpoints();
+ _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+ _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+
+ _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+
+ _clientSettings.UserAgent = new rmq.UA();
+ _clientSettings.UserAgent.Language = rmq::Language.DotNet;
+ _clientSettings.UserAgent.Version = "5.0.0";
+ _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
+ _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+
+ Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+
+ _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
+ _updateTopicRouteCts = new CancellationTokenSource();
+
+ _healthCheckCts = new CancellationTokenSource();
+
+ telemetryCts_ = new CancellationTokenSource();
}
- public virtual void start()
+ public virtual async Task Start()
{
schedule(async () =>
{
- await updateNameServerList();
- }, 30, nameServerResolverCTS.Token);
+ await UpdateTopicRoute();
- schedule(async () =>
- {
- await updateTopicRoute();
+ }, 30, _updateTopicRouteCts.Token);
- }, 30, updateTopicRouteCTS.Token);
+ // Get routes for topics of interest.
+ await UpdateTopicRoute();
+ string accessPointUrl = _accessPoint.TargetUrl();
+ createSession(accessPointUrl);
+
+ await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
+
+ await Heartbeat();
}
- public virtual void shutdown()
+ public virtual async Task Shutdown()
{
- updateTopicRouteCTS.Cancel();
- nameServerResolverCTS.Cancel();
+ Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
+ _updateTopicRouteCts.Cancel();
+ telemetryCts_.Cancel();
+ await Manager.Shutdown();
}
- private async Task updateNameServerList()
+ protected string FilterBroker(Func<string, bool> acceptor)
{
- List<string> nameServers = await nameServerResolver.resolveAsync();
- if (0 == nameServers.Count)
+ foreach (var item in _topicRouteTable)
{
- // Whoops, something should be wrong. We got an empty name server list.
- return;
- }
-
- if (nameServers.Equals(this.nameServers))
- {
- return;
- }
-
- // Name server list is updated.
- // TODO: Locking is required
- this.nameServers = nameServers;
- this.currentNameServerIndex = 0;
- }
-
- private async Task updateTopicRoute()
- {
- if (null == nameServers || 0 == nameServers.Count)
- {
- List<string> list = await nameServerResolver.resolveAsync();
- if (null != list && 0 != list.Count)
+ foreach (var partition in item.Value.MessageQueues)
{
- this.nameServers = list;
- }
- else
- {
- // TODO: log warning here.
- return;
+ string target = Utilities.TargetUrl(partition);
+ if (acceptor(target))
+ {
+ return target;
+ }
}
}
+ return null;
+ }
- // We got one or more name servers available.
- string nameServer = nameServers[currentNameServerIndex];
+ /**
+ * Return all endpoints of brokers in route table.
+ */
+ private List<string> AvailableBrokerEndpoints()
+ {
+ List<string> endpoints = new List<string>();
+ foreach (var item in _topicRouteTable)
+ {
+ foreach (var partition in item.Value.MessageQueues)
+ {
+ string endpoint = Utilities.TargetUrl(partition);
+ if (!endpoints.Contains(endpoint))
+ {
+ endpoints.Add(endpoint);
+ }
+ }
+ }
+ return endpoints;
+ }
+
+ private async Task UpdateTopicRoute()
+ {
+ HashSet<string> topics = new HashSet<string>();
+ foreach (var topic in topicsOfInterest_)
+ {
+ topics.Add(topic);
+ }
+
+ foreach (var item in _topicRouteTable)
+ {
+ topics.Add(item.Key);
+ }
+ Logger.Debug($"Fetch topic route for {topics.Count} topics");
+
+ // Wrap topics into list such that we can map async result to topic
+ List<string> topicList = new List<string>();
+ topicList.AddRange(topics);
List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
- foreach (var item in topicRouteTable)
+ foreach (var item in topicList)
{
- tasks.Add(getRouteFor(item.Key, true));
+ tasks.Add(GetRouteFor(item, true));
}
// Update topic route data
TopicRouteData[] result = await Task.WhenAll(tasks);
+ var i = 0;
foreach (var item in result)
{
if (null == item)
{
+ Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
+ ++i;
continue;
}
- if (0 == item.Partitions.Count)
+ if (0 == item.MessageQueues.Count)
{
+ Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
+ ++i;
continue;
}
- var topicName = item.Partitions[0].Topic.Name;
- var existing = topicRouteTable[topicName];
+ var topicName = item.MessageQueues[0].Topic.Name;
+
+ // Make assertion
+ Debug.Assert(topicName.Equals(topicList[i]));
+
+ var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
{
- topicRouteTable[topicName] = item;
+ _topicRouteTable[topicName] = item;
}
+ ++i;
}
}
@@ -154,74 +217,210 @@
* direct
* Indicate if we should by-pass cache and fetch route entries from name server.
*/
- public async Task<TopicRouteData> getRouteFor(string topic, bool direct)
+ public async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
{
- if (!direct && topicRouteTable.ContainsKey(topic))
+ if (!direct && _topicRouteTable.ContainsKey(topic))
{
- return topicRouteTable[topic];
- }
-
- if (null == nameServers || 0 == nameServers.Count)
- {
- List<string> list = await nameServerResolver.resolveAsync();
- if (null != list && 0 != list.Count)
- {
- this.nameServers = list;
- }
- else
- {
- // TODO: log warning here.
- return null;
- }
+ return _topicRouteTable[topic];
}
// We got one or more name servers available.
- string nameServer = nameServers[currentNameServerIndex];
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = resourceNamespace_;
+ request.Topic.ResourceNamespace = _resourceNamespace;
request.Topic.Name = topic;
request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
- var address = new rmq::Address();
- int pos = nameServer.LastIndexOf(':');
- address.Host = nameServer.Substring(0, pos);
- address.Port = Int32.Parse(nameServer.Substring(pos + 1));
- request.Endpoints.Addresses.Add(address);
- var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+ request.Endpoints.Scheme = AccessPointScheme;
+ foreach (var address in AccessPointEndpoints)
+ {
+ request.Endpoints.Addresses.Add(address);
+ }
+
var metadata = new grpc.Metadata();
Signature.sign(this, metadata);
- var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
- return topicRouteData;
+ int index = _random.Next(0, AccessPointEndpoints.Count);
+ var serviceEndpoint = AccessPointEndpoints[index];
+ // AccessPointAddresses.Count
+ string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+ TopicRouteData topicRouteData;
+ try
+ {
+ Logger.Debug($"Resolving route for topic={topic}");
+ topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
+ if (null != topicRouteData)
+ {
+ Logger.Debug($"Got route entries for {topic} from name server");
+ _topicRouteTable.TryAdd(topic, topicRouteData);
+ return topicRouteData;
+ }
+ else
+ {
+ Logger.Warn($"Failed to query route of {topic} from {target}");
+ }
+ }
+ catch (Exception e)
+ {
+ Logger.Warn(e, "Failed when querying route");
+ }
+
+ return null;
}
- public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
+ protected abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
- public void heartbeat()
+ public async Task Heartbeat()
{
- List<string> endpoints = endpointsInUse();
+ List<string> endpoints = AvailableBrokerEndpoints();
if (0 == endpoints.Count)
{
+ Logger.Debug("No broker endpoints available in topic route");
return;
}
- var heartbeatRequest = new rmq::HeartbeatRequest();
- prepareHeartbeatData(heartbeatRequest);
+ var request = new rmq::HeartbeatRequest();
+ PrepareHeartbeatData(request);
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
+
+ List<Task> tasks = new List<Task>();
+ foreach (var endpoint in endpoints)
+ {
+ tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
+ }
+
+ await Task.WhenAll(tasks);
}
- public void healthCheck()
+ private List<string> BlockedBrokerEndpoints()
+ {
+ List<string> endpoints = new List<string>();
+ return endpoints;
+ }
+
+ private void RemoveFromBlockList(string endpoint)
{
}
- public async Task<bool> notifyClientTermination()
+ protected async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group)
{
- List<string> endpoints = endpointsInUse();
+ // Pick a broker randomly
+ string target = FilterBroker((s) => true);
+ var request = new rmq::QueryAssignmentRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = AccessPointScheme;
+ foreach (var endpoint in AccessPointEndpoints)
+ {
+ request.Endpoints.Addresses.Add(endpoint);
+ }
+ try
+ {
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
+ }
+ catch (System.Exception e)
+ {
+ Logger.Warn(e, $"Failed to acquire load assignments from {target}");
+ }
+ // Just return an empty list.
+ return new List<rmq.Assignment>();
+ }
+
+ private string TargetUrl(rmq::Assignment assignment)
+ {
+ var broker = assignment.MessageQueue.Broker;
+ var addresses = broker.Endpoints.Addresses;
+ // TODO: use the first address for now.
+ var address = addresses[0];
+ return $"https://{address.Host}:{address.Port}";
+ }
+
+ public virtual void BuildClientSetting(rmq::Settings settings)
+ {
+ settings.MergeFrom(_clientSettings);
+ }
+
+ public void createSession(string url)
+ {
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ var stream = Manager.Telemetry(url, metadata);
+ var session = new Session(url, stream, this);
+ _sessions.TryAdd(url, session);
+ Task.Run(async () =>
+ {
+ await session.Loop();
+ });
+ }
+
+
+ public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+ {
+ var targetUrl = TargetUrl(assignment);
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ var request = new rmq::ReceiveMessageRequest();
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+ request.MessageQueue = assignment.MessageQueue;
+ var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+ return messages;
+ }
+
+ public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
+ {
+ var request = new rmq::AckMessageRequest();
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+
+ var entry = new rmq::AckMessageEntry();
+ entry.ReceiptHandle = receiptHandle;
+ entry.MessageId = messageId;
+ request.Entries.Add(entry);
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ return await Manager.Ack(target, metadata, request, RequestTimeout);
+ }
+
+ public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
+ {
+ var request = new rmq::ChangeInvisibleDurationRequest();
+ request.ReceiptHandle = receiptHandle;
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+
+ request.MessageId = messageId;
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
+ }
+
+ public async Task<bool> NotifyClientTermination()
+ {
+ List<string> endpoints = AvailableBrokerEndpoints();
var request = new rmq::NotifyClientTerminationRequest();
- request.ClientId = clientId();
+
var metadata = new grpc.Metadata();
Signature.sign(this, metadata);
@@ -230,7 +429,7 @@
foreach (var endpoint in endpoints)
{
- tasks.Add(clientManager.notifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+ tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
@@ -244,19 +443,53 @@
return true;
}
- private List<string> endpointsInUse()
+ public virtual void OnSettingsReceived(rmq::Settings settings)
{
- //TODO: gather endpoints from route entries.
- return new List<string>();
+ if (null != settings.Metric)
+ {
+ _clientSettings.Metric = new rmq::Metric();
+ _clientSettings.Metric.MergeFrom(settings.Metric);
+ }
+
+ if (null != settings.BackoffPolicy)
+ {
+ _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+ _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+ }
}
- protected IClientManager clientManager;
- private INameServerResolver nameServerResolver;
- private CancellationTokenSource nameServerResolverCTS;
- private List<string> nameServers;
- private int currentNameServerIndex;
+ protected readonly IClientManager Manager;
- private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
- private CancellationTokenSource updateTopicRouteCTS;
+ private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>();
+
+ public void AddTopicOfInterest(string topic)
+ {
+ topicsOfInterest_.Add(topic);
+ }
+
+ private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
+ private readonly CancellationTokenSource _updateTopicRouteCts;
+
+ private readonly CancellationTokenSource _healthCheckCts;
+
+ private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource();
+
+ public CancellationTokenSource TelemetryCts()
+ {
+ return telemetryCts_;
+ }
+
+ protected readonly AccessPoint _accessPoint;
+
+ // This field is subject changes from servers.
+ protected readonly rmq::Settings _clientSettings;
+
+ private readonly Random _random = new Random();
+
+ protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+
+ public static readonly string MeterName = "Apache.RocketMQ.Client";
+
+ protected static readonly Meter MetricMeter = new(MeterName, "1.0");
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 949f8b4..0d99cb1 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -15,101 +15,134 @@
* limitations under the License.
*/
using System;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
- public class ClientConfig : IClientConfig {
+ public class ClientConfig : IClientConfig
+ {
- public ClientConfig() {
+ public ClientConfig()
+ {
var hostName = System.Net.Dns.GetHostName();
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
- this.ioTimeout_ = TimeSpan.FromSeconds(3);
- this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
+ this._requestTimeout = TimeSpan.FromSeconds(3);
+ this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
+ this.client_type_ = rmq::ClientType.Unspecified;
+ this.access_point_ = new rmq::Endpoints();
+ this.back_off_policy_ = new rmq::RetryPolicy();
+ this._publishing = new Publishing();
}
- public string region() {
- return region_;
+ public string region()
+ {
+ return _region;
}
- public string Region {
- set { region_ = value; }
+ public string Region
+ {
+ set { _region = value; }
}
- public string serviceName() {
- return serviceName_;
+ public string serviceName()
+ {
+ return _serviceName;
}
- public string ServiceName {
- set { serviceName_ = value; }
+ public string ServiceName
+ {
+ set { _serviceName = value; }
}
- public string resourceNamespace() {
- return resourceNamespace_;
+ public string resourceNamespace()
+ {
+ return _resourceNamespace;
}
- public string ResourceNamespace {
- set { resourceNamespace_ = value; }
+ public string ResourceNamespace
+ {
+ get { return _resourceNamespace; }
+ set { _resourceNamespace = value; }
}
- public ICredentialsProvider credentialsProvider() {
+ public ICredentialsProvider credentialsProvider()
+ {
return credentialsProvider_;
}
-
- public ICredentialsProvider CredentialsProvider {
+
+ public ICredentialsProvider CredentialsProvider
+ {
set { credentialsProvider_ = value; }
}
- public string tenantId() {
- return tenantId_;
+ public string tenantId()
+ {
+ return _tenantId;
}
- public string TenantId {
- set { tenantId_ = value; }
+ public string TenantId
+ {
+ set { _tenantId = value; }
}
- public TimeSpan getIoTimeout() {
- return ioTimeout_;
- }
- public TimeSpan IoTimeout {
- set { ioTimeout_ = value; }
+ public TimeSpan RequestTimeout
+ {
+ get
+ {
+ return _requestTimeout;
+ }
+ set
+ {
+ _requestTimeout = value;
+ }
}
- public TimeSpan getLongPollingTimeout() {
+ public TimeSpan getLongPollingTimeout()
+ {
return longPollingIoTimeout_;
}
- public TimeSpan LongPollingTimeout {
+ public TimeSpan LongPollingTimeout
+ {
set { longPollingIoTimeout_ = value; }
}
- public string getGroupName() {
+ public string getGroupName()
+ {
return groupName_;
}
- public string GroupName {
+ public string GroupName
+ {
set { groupName_ = value; }
}
- public string clientId() {
+ public string clientId()
+ {
return clientId_;
}
- public bool isTracingEnabled() {
+ public bool isTracingEnabled()
+ {
return tracingEnabled_;
}
- public bool TracingEnabled {
+ public bool TracingEnabled
+ {
set { tracingEnabled_ = value; }
}
- public void setInstanceName(string instanceName) {
+ public void setInstanceName(string instanceName)
+ {
this.instanceName_ = instanceName;
}
- private string region_ = "cn-hangzhou";
- private string serviceName_ = "ONS";
+ private string _region = "cn-hangzhou";
+ private string _serviceName = "ONS";
- protected string resourceNamespace_;
+ protected string _resourceNamespace;
private ICredentialsProvider credentialsProvider_;
- private string tenantId_;
+ private string _tenantId;
- private TimeSpan ioTimeout_;
+ private TimeSpan _requestTimeout;
private TimeSpan longPollingIoTimeout_;
@@ -120,6 +153,53 @@
private bool tracingEnabled_ = false;
private string instanceName_ = "default";
+
+ private rmq::ClientType client_type_;
+ public rmq::ClientType ClientType
+ {
+ get { return client_type_; }
+ set { client_type_ = value; }
+ }
+
+
+ private rmq::Endpoints access_point_;
+
+ public rmq::AddressScheme AccessPointScheme
+ {
+ get { return access_point_.Scheme; }
+ set { access_point_.Scheme = value; }
+ }
+
+ public List<rmq::Address> AccessPointEndpoints
+ {
+ get
+ {
+ List<rmq::Address> addresses = new List<rmq::Address>();
+ foreach (var item in access_point_.Addresses)
+ {
+ addresses.Add(item);
+ }
+ return addresses;
+ }
+
+ set
+ {
+ access_point_.Addresses.Clear();
+ foreach (var item in value)
+ {
+ access_point_.Addresses.Add(item);
+ }
+ }
+ }
+
+ private rmq::RetryPolicy back_off_policy_;
+
+ private Publishing _publishing;
+ public Publishing Publishing
+ {
+ get { return _publishing; }
+ }
+
}
}
diff --git a/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 59ec0f2..01adddc 100644
--- a/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -18,11 +18,15 @@
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Interceptors;
+using NLog;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public class ClientLoggerInterceptor : Interceptor
{
+
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
@@ -52,19 +56,12 @@
try
{
var response = await t;
- Console.WriteLine($"Response received: {response}");
+ Logger.Debug($"Response received: {response}");
return response;
}
catch (Exception ex)
{
- // Log error to the console.
- // Note: Configuring .NET Core logging is the recommended way to log errors
- // https://docs.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
- var initialColor = Console.ForegroundColor;
- Console.ForegroundColor = ConsoleColor.Red;
- Console.WriteLine($"Call error: {ex.Message}");
- Console.ForegroundColor = initialColor;
-
+ Logger.Error($"Call error: {ex.Message}");
throw;
}
}
@@ -104,10 +101,7 @@
where TRequest : class
where TResponse : class
{
- var initialColor = Console.ForegroundColor;
- Console.ForegroundColor = ConsoleColor.Green;
- Console.WriteLine($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
- Console.ForegroundColor = initialColor;
+ Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
}
private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 59fec83..a39a0e0 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -15,168 +15,310 @@
* limitations under the License.
*/
-using System.Collections.Concurrent;
-
-using rmq = global::apache.rocketmq.v1;
-using Grpc.Net.Client;
+using rmq = Apache.Rocketmq.V2;
using System;
+using System.IO;
+using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using System.Collections.Generic;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
+using System.Security.Cryptography;
+using NLog;
-namespace org.apache.rocketmq {
- public class ClientManager : IClientManager {
+namespace Org.Apache.Rocketmq
+{
+ public class ClientManager : IClientManager
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public ClientManager() {
- rpcClients = new ConcurrentDictionary<string, RpcClient>();
+ public ClientManager()
+ {
+ _rpcClients = new Dictionary<string, RpcClient>();
+ _clientLock = new ReaderWriterLockSlim();
}
- public IRpcClient getRpcClient(string target) {
- if (!rpcClients.ContainsKey(target)) {
- var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions {
- HttpHandler = createHttpHandler()
- });
- var invoker = channel.Intercept(new ClientLoggerInterceptor());
- var client = new rmq::MessagingService.MessagingServiceClient(invoker);
- var rpcClient = new RpcClient(client);
- if(rpcClients.TryAdd(target, rpcClient)) {
- return rpcClient;
+ public IRpcClient GetRpcClient(string target)
+ {
+ _clientLock.EnterReadLock();
+ try
+ {
+ // client exists, return in advance.
+ if (_rpcClients.ContainsKey(target))
+ {
+ return _rpcClients[target];
}
}
- return rpcClients[target];
- }
-
- /**
- * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
- * why parameters are configured this way.
- */
- public static HttpMessageHandler createHttpHandler()
- {
- var sslOptions = new System.Net.Security.SslClientAuthenticationOptions();
- // Disable server certificate validation during development phase.
- // Comment out the following line if server certificate validation is required.
- sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
- var handler = new SocketsHttpHandler
+ finally
{
- PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
- KeepAlivePingDelay = TimeSpan.FromSeconds(60),
- KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
- EnableMultipleHttp2Connections = true,
- SslOptions = sslOptions,
- };
- return handler;
+ _clientLock.ExitReadLock();
+ }
+
+ _clientLock.EnterWriteLock();
+ try
+ {
+ // client exists, return in advance.
+ if (_rpcClients.ContainsKey(target))
+ {
+ return _rpcClients[target];
+ }
+
+ // client does not exist, generate a new one
+ var client = new RpcClient(target);
+ _rpcClients.Add(target, client);
+ return client;
+ }
+ finally
+ {
+ _clientLock.ExitWriteLock();
+ }
}
- public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+ public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata)
{
- var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var queryRouteResponse = await rpcClient.queryRoute(request, callOptions);
+ var rpcClient = GetRpcClient(target);
+ return rpcClient.Telemetry(metadata);
+ }
- if (queryRouteResponse.Common.Status.Code != ((int)Google.Rpc.Code.Ok)) {
+ public async Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata,
+ rmq::QueryRouteRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ Logger.Debug($"QueryRouteRequest: {request}");
+ var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
+
+ if (queryRouteResponse.Status.Code != rmq::Code.Ok)
+ {
+ Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status}");
// Raise an application layer exception
-
}
+ Logger.Debug($"QueryRouteResponse: {queryRouteResponse}");
- var partitions = new List<Partition>();
- // Translate protobuf object to domain specific one
- foreach (var partition in queryRouteResponse.Partitions)
+ var messageQueues = new List<rmq::MessageQueue>();
+ foreach (var messageQueue in queryRouteResponse.MessageQueues)
{
- var topic = new Topic(partition.Topic.ResourceNamespace, partition.Topic.Name);
- var id = partition.Id;
- Permission permission = Permission.READ_WRITE;
- switch (partition.Permission) {
- case rmq::Permission.None:
- {
- permission = Permission.NONE;
- break;
- }
- case rmq::Permission.Read:
- {
- permission = Permission.READ;
- break;
- }
- case rmq::Permission.Write:
- {
- permission = Permission.WRITE;
- break;
- }
- case rmq::Permission.ReadWrite:
- {
- permission = Permission.READ_WRITE;
- break;
- }
- }
-
- AddressScheme scheme = AddressScheme.IPv4;
- switch(partition.Broker.Endpoints.Scheme) {
- case rmq::AddressScheme.Ipv4:
- {
- scheme = AddressScheme.IPv4;
- break;
- }
- case rmq::AddressScheme.Ipv6:
- {
- scheme = AddressScheme.IPv6;
- break;
- }
- case rmq::AddressScheme.DomainName:
- {
- scheme = AddressScheme.DOMAIN_NAME;
- break;
- }
- }
-
- List<Address> addresses = new List<Address>();
- foreach(var item in partition.Broker.Endpoints.Addresses) {
- addresses.Add(new Address(item.Host, item.Port));
- }
- ServiceAddress serviceAddress = new ServiceAddress(scheme, addresses);
- Broker broker = new Broker(partition.Broker.Name, id, serviceAddress);
- partitions.Add(new Partition(topic, broker, id, permission));
+ messageQueues.Add(messageQueue);
}
-
- var topicRouteData = new TopicRouteData(partitions);
+ var topicRouteData = new TopicRouteData(messageQueues);
return topicRouteData;
}
- public async Task<Boolean> heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+ public async Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request,
+ TimeSpan timeout)
{
- var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc.CallOptions(metadata, deadline);
- var response = await rpcClient.heartbeat(request, callOptions);
- if (null == response)
- {
- return false;
- }
-
- return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+ var rpcClient = GetRpcClient(target);
+ Logger.Debug($"Heartbeat to {target}, Request: {request}");
+ var response = await rpcClient.Heartbeat(metadata, request, timeout);
+ Logger.Debug($"Heartbeat to {target} response status: {response.Status}");
+ return response.Status.Code == rmq::Code.Ok;
}
- public async Task<rmq::SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
+ public async Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata,
+ rmq::SendMessageRequest request, TimeSpan timeout)
{
- var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = await rpcClient.sendMessage(request, callOptions);
+ var rpcClient = GetRpcClient(target);
+ var response = await rpcClient.SendMessage(metadata, request, timeout);
return response;
}
- public async Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+ public async Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata,
+ rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
{
- var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- rmq::NotifyClientTerminationResponse response = await rpcClient.notifyClientTermination(request, callOptions);
- return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+ var rpcClient = GetRpcClient(target);
+ rmq::NotifyClientTerminationResponse response =
+ await rpcClient.NotifyClientTermination(metadata, request, timeout);
+ return response.Status.Code == rmq::Code.Ok;
}
- private ConcurrentDictionary<string, RpcClient> rpcClients;
+ public async Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ rmq::QueryAssignmentResponse response = await rpcClient.QueryAssignment(metadata, request, timeout);
+ if (response.Status.Code != rmq::Code.Ok)
+ {
+ // TODO: Build exception hierarchy
+ throw new Exception($"Failed to query load assignment from server. Cause: {response.Status.Message}");
+ }
+ List<rmq::Assignment> assignments = new List<rmq.Assignment>();
+ foreach (var item in response.Assignments)
+ {
+ assignments.Add(item);
+ }
+ return assignments;
+ }
+
+ public async Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata,
+ rmq::ReceiveMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ List<rmq::ReceiveMessageResponse> response = await rpcClient.ReceiveMessage(metadata, request, timeout);
+
+ if (null == response || 0 == response.Count)
+ {
+ // TODO: throw an exception to propagate this error?
+ return new List<Message>();
+ }
+
+ List<Message> messages = new List<Message>();
+
+ foreach (var entry in response)
+ {
+ switch (entry.ContentCase)
+ {
+ case rmq.ReceiveMessageResponse.ContentOneofCase.None:
+ {
+ Logger.Warn("Unexpected ReceiveMessageResponse content type");
+ break;
+ }
+
+ case rmq.ReceiveMessageResponse.ContentOneofCase.Status:
+ {
+ switch (entry.Status.Code)
+ {
+ case rmq.Code.Ok:
+ {
+ break;
+ }
+
+ case rmq.Code.Forbidden:
+ {
+ Logger.Warn("Receive message denied");
+ break;
+ }
+ case rmq.Code.TooManyRequests:
+ {
+ Logger.Warn("TooManyRequest: servers throttled");
+ break;
+ }
+ default:
+ {
+ Logger.Warn("Unknown error status");
+ break;
+ }
+ }
+ break;
+ }
+
+ case rmq.ReceiveMessageResponse.ContentOneofCase.Message:
+ {
+ var message = Convert(target, entry.Message);
+ messages.Add(message);
+ break;
+ }
+
+ case rmq.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
+ {
+ var begin = entry.DeliveryTimestamp;
+ var costs = DateTime.UtcNow - begin.ToDateTime();
+ // TODO: Collect metrics
+ break;
+ }
+ }
+ }
+ return messages;
+ }
+
+ private Message Convert(string sourceHost, rmq::Message message)
+ {
+ var msg = new Message();
+ msg.Topic = message.Topic.Name;
+ msg.MessageId = message.SystemProperties.MessageId;
+ msg.Tag = message.SystemProperties.Tag;
+
+ // Validate message body checksum
+ byte[] raw = message.Body.ToByteArray();
+ if (rmq::DigestType.Crc32 == message.SystemProperties.BodyDigest.Type)
+ {
+ uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
+ if (!message.SystemProperties.BodyDigest.Checksum.Equals(checksum.ToString("X")))
+ {
+ msg._checksumVerifiedOk = false;
+ }
+ }
+ else if (rmq::DigestType.Md5 == message.SystemProperties.BodyDigest.Type)
+ {
+ var checksum = MD5.HashData(raw);
+ if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+ {
+ msg._checksumVerifiedOk = false;
+ }
+ }
+ else if (rmq::DigestType.Sha1 == message.SystemProperties.BodyDigest.Type)
+ {
+ var checksum = SHA1.HashData(raw);
+ if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+ {
+ msg._checksumVerifiedOk = false;
+ }
+ }
+
+ foreach (var entry in message.UserProperties)
+ {
+ msg.UserProperties.Add(entry.Key, entry.Value);
+ }
+
+ msg._receiptHandle = message.SystemProperties.ReceiptHandle;
+ msg._sourceHost = sourceHost;
+
+ foreach (var key in message.SystemProperties.Keys)
+ {
+ msg.Keys.Add(key);
+ }
+
+ msg.DeliveryAttempt = message.SystemProperties.DeliveryAttempt;
+
+ if (message.SystemProperties.BodyEncoding == rmq::Encoding.Gzip)
+ {
+ // Decompress/Inflate message body
+ var inputStream = new MemoryStream(message.Body.ToByteArray());
+ var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress);
+ var outputStream = new MemoryStream();
+ gzipStream.CopyTo(outputStream);
+ msg.Body = outputStream.ToArray();
+ }
+ else
+ {
+ msg.Body = message.Body.ToByteArray();
+ }
+
+ return msg;
+ }
+
+ public async Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ var response = await rpcClient.AckMessage(metadata, request, timeout);
+ return response.Status.Code == rmq::Code.Ok;
+ }
+
+ public async Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ var response = await rpcClient.ChangeInvisibleDuration(metadata, request, timeout);
+ return response.Status.Code == rmq::Code.Ok;
+ }
+
+ public async Task Shutdown()
+ {
+ _clientLock.EnterReadLock();
+ try
+ {
+ List<Task> tasks = new List<Task>();
+ foreach (var item in _rpcClients)
+ {
+ tasks.Add(item.Value.Shutdown());
+ }
+
+ await Task.WhenAll(tasks);
+ }
+ finally
+ {
+ _clientLock.ExitReadLock();
+ }
+ }
+
+ private readonly Dictionary<string, RpcClient> _rpcClients;
+ private readonly ReaderWriterLockSlim _clientLock;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManagerFactory.cs b/rocketmq-client-csharp/ClientManagerFactory.cs
index 3ca211d..9d03994 100644
--- a/rocketmq-client-csharp/ClientManagerFactory.cs
+++ b/rocketmq-client-csharp/ClientManagerFactory.cs
@@ -18,7 +18,7 @@
using System.Collections.Generic;
using System.Collections.Concurrent;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public sealed class ClientManagerFactory
{
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 1381b3f..39dfd7e 100644
--- a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -19,36 +19,46 @@
using System.Text.Json;
using System.Collections.Generic;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
/**
* File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
* A sample config content is as follows:
* {"AccessKey": "key", "AccessSecret": "secret"}
*/
- public class ConfigFileCredentialsProvider : ICredentialsProvider {
+ public class ConfigFileCredentialsProvider : ICredentialsProvider
+ {
- public ConfigFileCredentialsProvider() {
+ public ConfigFileCredentialsProvider()
+ {
var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
string configFileRelativePath = "/.rocketmq/config";
- if (!File.Exists(home + configFileRelativePath)) {
+ if (!File.Exists(home + configFileRelativePath))
+ {
return;
}
- try {
- using (var reader = new StreamReader(home + configFileRelativePath)) {
+ try
+ {
+ using (var reader = new StreamReader(home + configFileRelativePath))
+ {
string json = reader.ReadToEnd();
var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
accessKey = kv["AccessKey"];
accessSecret = kv["AccessSecret"];
valid = true;
- }
- } catch (IOException e) {
+ }
+ }
+ catch (IOException)
+ {
}
}
- public Credentials getCredentials() {
- if (!valid) {
+ public Credentials getCredentials()
+ {
+ if (!valid)
+ {
return null;
}
diff --git a/rocketmq-client-csharp/Credentials.cs b/rocketmq-client-csharp/Credentials.cs
index 2da9581..a73b000 100644
--- a/rocketmq-client-csharp/Credentials.cs
+++ b/rocketmq-client-csharp/Credentials.cs
@@ -17,27 +17,34 @@
using System;
-namespace org.apache.rocketmq {
- public class Credentials {
+namespace Org.Apache.Rocketmq
+{
+ public class Credentials
+ {
- public Credentials(string accessKey, string accessSecret) {
+ public Credentials(string accessKey, string accessSecret)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
}
- public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) {
+ public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
this.sessionToken = sessionToken;
this.expirationInstant = expirationInstant;
}
- public bool empty() {
+ public bool empty()
+ {
return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret);
}
- public bool expired() {
- if (DateTime.MinValue == expirationInstant) {
+ public bool expired()
+ {
+ if (DateTime.MinValue == expirationInstant)
+ {
return false;
}
@@ -45,17 +52,20 @@
}
private string accessKey;
- public string AccessKey {
+ public string AccessKey
+ {
get { return accessKey; }
}
-
+
private string accessSecret;
- public string AccessSecret {
+ public string AccessSecret
+ {
get { return accessSecret; }
}
private string sessionToken;
- public string SessionToken {
+ public string SessionToken
+ {
get { return sessionToken; }
}
diff --git a/rocketmq-client-csharp/Permission.cs b/rocketmq-client-csharp/ExpressionType.cs
similarity index 88%
rename from rocketmq-client-csharp/Permission.cs
rename to rocketmq-client-csharp/ExpressionType.cs
index 659c15b..0caaf8e 100644
--- a/rocketmq-client-csharp/Permission.cs
+++ b/rocketmq-client-csharp/ExpressionType.cs
@@ -14,10 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+namespace Org.Apache.Rocketmq
+{
-public enum Permission {
- NONE,
- READ,
- WRITE,
- READ_WRITE,
+ public enum ExpressionType
+ {
+ TAG,
+ SQL92,
+ }
+
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/FilterExpression.cs
similarity index 72%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/FilterExpression.cs
index 568098f..3bd432d 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/FilterExpression.cs
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
- public interface INameServerResolver
+ public class FilterExpression
{
- Task<List<string>> resolveAsync();
+ public FilterExpression(string expression, ExpressionType type)
+ {
+ Expression = expression;
+ Type = type;
+ }
+
+ public ExpressionType Type { get; }
+ public string Expression { get; }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 7f3ed64..3352028 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,17 +16,24 @@
*/
using System.Threading.Tasks;
+using System.Threading;
+using System;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public interface IClient : IClientConfig
{
- void heartbeat();
+ Task Heartbeat();
- void healthCheck();
+ Task<bool> NotifyClientTermination();
- Task<bool> notifyClientTermination();
+ void BuildClientSetting(rmq::Settings settings);
+
+ void OnSettingsReceived(rmq::Settings settings);
+
+ CancellationTokenSource TelemetryCts();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index b83006c..438d7a8 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -16,8 +16,10 @@
*/
using System;
-namespace org.apache.rocketmq {
- public interface IClientConfig {
+namespace Org.Apache.Rocketmq
+{
+ public interface IClientConfig
+ {
string region();
string serviceName();
@@ -28,8 +30,6 @@
string tenantId();
- TimeSpan getIoTimeout();
-
TimeSpan getLongPollingTimeout();
string getGroupName();
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 08ed86a..afccfde 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -15,22 +15,37 @@
* limitations under the License.
*/
-using apache.rocketmq.v1;
using System.Threading.Tasks;
using System;
+using System.Collections.Generic;
using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq {
- public interface IClientManager {
- IRpcClient getRpcClient(string target);
- Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+namespace Org.Apache.Rocketmq
+{
+ public interface IClientManager
+ {
+ IRpcClient GetRpcClient(string target);
- Task<Boolean> heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+ grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
- Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
+ Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout);
- Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+ Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
+ Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout);
+
+ Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout);
+
+ Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout);
+
+ Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, rmq::ReceiveMessageRequest request, TimeSpan timeout);
+
+ Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout);
+
+ Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout);
+
+ Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/AddressScheme.cs b/rocketmq-client-csharp/IConsumer.cs
similarity index 84%
rename from rocketmq-client-csharp/AddressScheme.cs
rename to rocketmq-client-csharp/IConsumer.cs
index 3e95b09..2ad0dab 100644
--- a/rocketmq-client-csharp/AddressScheme.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,10 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace org.apache.rocketmq {
- public enum AddressScheme {
- IPv4,
- IPv6,
- DOMAIN_NAME,
+
+using System.Threading.Tasks;
+namespace Org.Apache.Rocketmq
+{
+ public interface IConsumer
+ {
+ Task Start();
+
+ Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ICredentialsProvider.cs b/rocketmq-client-csharp/ICredentialsProvider.cs
index 6e7112e..1fb892b 100644
--- a/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace org.apache.rocketmq {
- public interface ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+ public interface ICredentialsProvider
+ {
Credentials getCredentials();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/IMessageListener.cs
similarity index 86%
rename from rocketmq-client-csharp/INameServerResolver.cs
rename to rocketmq-client-csharp/IMessageListener.cs
index 568098f..f46efd5 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/IMessageListener.cs
@@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
+
using System.Collections.Generic;
using System.Threading.Tasks;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
- public interface INameServerResolver
+
+ public interface IMessageListener
{
- Task<List<string>> resolveAsync();
+ Task Consume(List<Message> messages, List<Message> failed);
+
}
+
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 89f8955..420af20 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
+
using System.Threading.Tasks;
-namespace org.apache.rocketmq {
- public interface IProducer {
- void start();
+namespace Org.Apache.Rocketmq
+{
+ public interface IProducer
+ {
+ Task Start();
- void shutdown();
+ Task Shutdown();
- Task<SendResult> send(Message message);
-
+ Task<SendReceipt> Send(Message message);
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index 0590bb0..146d4f7 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -14,21 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-namespace org.apache.rocketmq
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+
+namespace Org.Apache.Rocketmq
{
public interface IRpcClient
{
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
+ AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Metadata metadata);
- Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions);
+ Task<QueryRouteResponse> QueryRoute(Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
- Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions);
+ Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
- Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions);
+ Task<SendMessageResponse> SendMessage(Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
+ Task<QueryAssignmentResponse> QueryAssignment(Metadata metadata, QueryAssignmentRequest request,
+ TimeSpan timeout);
+
+ Task<List<ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, ReceiveMessageRequest request, TimeSpan timeout);
+
+ Task<AckMessageResponse> AckMessage(Metadata metadata, AckMessageRequest request, TimeSpan timeout);
+
+ Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, ChangeInvisibleDurationRequest request, TimeSpan timeout);
+
+ Task<ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,
+ ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout);
+
+ Task<EndTransactionResponse> EndTransaction(Metadata metadata, EndTransactionRequest request, TimeSpan timeout);
+
+
+ Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+ NotifyClientTerminationRequest request, TimeSpan timeout);
+
+ Task Shutdown();
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 1e6ee0e..b527311 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -14,79 +14,111 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
using System.Collections.Generic;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
{
- public class Message {
- public Message() : this(null, null) {
+ public class Message
+ {
+ public Message() : this(null, null)
+ {
}
- public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+ public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
- public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+ public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
+ {
}
- public Message(string topic, string tag, List<string> keys, byte[] body) {
- this.messageId = SequenceGenerator.Instance.Next();
- this.maxAttemptTimes = 3;
- this.topic = topic;
- this.tag = tag;
- this.keys = keys;
- this.body = body;
- this.userProperties = new Dictionary<string, string>();
- this.systemProperties = new Dictionary<string, string>();
+ public Message(string topic, string tag, List<string> keys, byte[] body)
+ {
+ MessageId = SequenceGenerator.Instance.Next();
+ MaxAttemptTimes = 3;
+ Topic = topic;
+ Tag = tag;
+ Keys = keys;
+ Body = body;
+ UserProperties = new Dictionary<string, string>();
+ DeliveryTimestamp = DateTime.MinValue;
}
- private string messageId;
public string MessageId
{
- get { return messageId; }
+ get;
+ internal set;
+ }
+
+ public string Topic
+ {
+ get;
+ set;
}
- private string topic;
-
- public string Topic {
- get { return topic; }
- set { this.topic = value; }
+ public byte[] Body
+ {
+ get;
+ set;
}
- private byte[] body;
- public byte[] Body {
- get { return body; }
- set { this.body = value; }
+ public string Tag
+ {
+ get;
+ set;
}
- private string tag;
- public string Tag {
- get { return tag; }
- set { this.tag = value; }
+ public List<string> Keys
+ {
+ get;
+ set;
}
- private List<string> keys;
- public List<string> Keys{
- get { return keys; }
- set { this.keys = value; }
+ public Dictionary<string, string> UserProperties
+ {
+ get;
+ set;
}
- private Dictionary<string, string> userProperties;
- public Dictionary<string, string> UserProperties {
- get { return userProperties; }
- set { this.userProperties = value; }
- }
-
- private Dictionary<string, string> systemProperties;
- internal Dictionary<string, string> SystemProperties {
- get { return systemProperties; }
- set { this.systemProperties = value; }
- }
-
- private int maxAttemptTimes;
public int MaxAttemptTimes
{
- get { return maxAttemptTimes; }
- set { maxAttemptTimes = value; }
+ get;
+ set;
}
+
+
+ public DateTime DeliveryTimestamp
+ {
+ get;
+ set;
+ }
+
+ public int DeliveryAttempt
+ {
+ get;
+ internal set;
+ }
+
+ public string MessageGroup
+ {
+ get;
+ set;
+ }
+
+ public bool Fifo()
+ {
+ return !String.IsNullOrEmpty(MessageGroup);
+ }
+
+ public bool Scheduled()
+ {
+ return DeliveryTimestamp > DateTime.UtcNow;
+ }
+
+ internal bool _checksumVerifiedOk = true;
+ internal string _receiptHandle;
+ internal string _sourceHost;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/MessageException.cs
similarity index 81%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/MessageException.cs
index 568098f..7ef10df 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/MessageException.cs
@@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-namespace org.apache.rocketmq
+using System;
+
+namespace Org.Apache.Rocketmq
{
- public interface INameServerResolver
+ [Serializable]
+ public class MessageException : Exception
{
- Task<List<string>> resolveAsync();
+ public MessageException(string message) : base(message)
+ {
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MessageIdGenerator.cs b/rocketmq-client-csharp/MessageIdGenerator.cs
index 8af1fda..8dc370d 100644
--- a/rocketmq-client-csharp/MessageIdGenerator.cs
+++ b/rocketmq-client-csharp/MessageIdGenerator.cs
@@ -20,7 +20,7 @@
using System.IO;
using System.Threading;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
/**
* MessageId generate rules refer: https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o
diff --git a/rocketmq-client-csharp/MessageType.cs b/rocketmq-client-csharp/MessageType.cs
index 376b658..a459e93 100644
--- a/rocketmq-client-csharp/MessageType.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
- public enum MessageType {
+ public enum MessageType
+ {
Normal,
Fifo,
Delay,
Transaction,
}
-
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MetadataConstants.cs b/rocketmq-client-csharp/MetadataConstants.cs
index 184bec8..5381595 100644
--- a/rocketmq-client-csharp/MetadataConstants.cs
+++ b/rocketmq-client-csharp/MetadataConstants.cs
@@ -17,8 +17,10 @@
using System;
-namespace org.apache.rocketmq {
- public class MetadataConstants {
+namespace Org.Apache.Rocketmq
+{
+ public class MetadataConstants
+ {
public const string TENANT_ID_KEY = "x-mq-tenant-id";
public const string NAMESPACE_KEY = "x-mq-namespace";
public const string AUTHORIZATION = "authorization";
@@ -33,5 +35,7 @@
public const string CLIENT_VERSION_KEY = "x-mq-client-version";
public const string PROTOCOL_VERSION_KEY = "x-mq-protocol-version";
public const string REQUEST_ID_KEY = "x-mq-request-id";
+
+ public const string CLIENT_ID_KEY = "x-mq-client-id";
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MqLogManager.cs b/rocketmq-client-csharp/MqLogManager.cs
index 0608c8a..3d294bd 100644
--- a/rocketmq-client-csharp/MqLogManager.cs
+++ b/rocketmq-client-csharp/MqLogManager.cs
@@ -4,7 +4,7 @@
using NLog;
using NLog.Config;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
/**
* RocketMQ Log Manager.
diff --git a/rocketmq-client-csharp/Partition.cs b/rocketmq-client-csharp/Partition.cs
deleted file mode 100644
index 410601e..0000000
--- a/rocketmq-client-csharp/Partition.cs
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-
-namespace org.apache.rocketmq {
-
- public class Partition : IEquatable<Partition>, IComparable<Partition> {
-
- public Partition(Topic topic, Broker broker, int id, Permission permission) {
- this.topic = topic;
- this.broker = broker;
- this.id = id;
- this.permission = permission;
- }
-
- private Topic topic;
- public Topic Topic{
- get { return topic; }
- }
-
- private Broker broker;
- public Broker Broker {
- get { return broker; }
- }
-
- private int id;
- public int Id {
- get { return id; }
- }
-
- Permission permission;
- public Permission Permission {
- get { return permission; }
- }
-
- public bool Equals(Partition other) {
- return topic.Equals(other.topic)
- && broker.Equals(other.broker)
- && id.Equals(other.id)
- && permission == other.permission;
- }
-
- public override bool Equals(Object other) {
- if (!(other is Partition)) {
- return false;
- }
- return Equals(other);
- }
-
- public override int GetHashCode()
- {
- return HashCode.Combine(topic, broker, id, permission);
- }
-
- public int CompareTo(Partition other) {
- if (0 != topic.CompareTo(other.topic)) {
- return topic.CompareTo(other.topic);
- }
-
- if (0 != broker.CompareTo(other.broker)) {
- return broker.CompareTo(other.broker);
- }
-
- if (0 != id.CompareTo(other.id)) {
- return id.CompareTo(other.id);
- }
-
- return permission.CompareTo(other.permission);
- }
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/ProcessQueue.cs
similarity index 63%
copy from rocketmq-client-csharp/StaticNameServerResolver.cs
copy to rocketmq-client-csharp/ProcessQueue.cs
index 9f97599..1022978 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/ProcessQueue.cs
@@ -15,24 +15,29 @@
* limitations under the License.
*/
using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
- public class StaticNameServerResolver : INameServerResolver
+ public class ProcessQueue
{
- public StaticNameServerResolver(List<string> nameServerList)
+ public ProcessQueue()
{
- this.nameServerList = nameServerList;
+ _lastReceivedTime = DateTime.UtcNow;
+ }
+ public bool Dropped { get; set; }
+
+ private DateTime _lastReceivedTime;
+
+ public DateTime LastReceiveTime
+ {
+ get { return _lastReceivedTime; }
+ set { _lastReceivedTime = value; }
}
- public async Task<List<string>> resolveAsync()
+ internal bool Expired()
{
- return nameServerList;
+ return DateTime.UtcNow.Subtract(_lastReceivedTime).TotalMilliseconds > 30 * 1000;
}
- private List<string> nameServerList;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 0b3f8a0..5c51cdc 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -14,94 +14,149 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Threading.Tasks;
-using rmq = apache.rocketmq.v1;
-using pb = global::Google.Protobuf;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
using System.Collections.Generic;
using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using NLog;
+using OpenTelemetry;
+using OpenTelemetry.Exporter;
+using OpenTelemetry.Metrics;
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public class Producer : Client, IProducer
{
- public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
+ public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
{
- this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+ _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+ _sendFailureTotal = MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
+ _sendLatency = MetricMeter.CreateHistogram<double>(SendLatencyName,
+ description: "Measure the duration of publishing messages to brokers",
+ unit: "milliseconds");
}
- public override void start()
+ public override async Task Start()
{
- base.start();
- // More initalization
+ await base.Start();
+ // More initialization
+ // TODO: Add authentication header
+
+ _meterProvider = Sdk.CreateMeterProviderBuilder()
+ .AddMeter("Apache.RocketMQ.Client")
+ .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+ {
+ options.Protocol = OtlpExportProtocol.Grpc;
+ options.Endpoint = new Uri(_accessPoint.TargetUrl());
+ options.TimeoutMilliseconds = (int) _clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+
+ readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 60 * 1000;
+ })
+ .AddView((instrument) =>
+ {
+ if (instrument.Meter.Name == MeterName && instrument.Name == SendLatencyName)
+ {
+ return new ExplicitBucketHistogramConfiguration()
+ {
+ Boundaries = new double[] {1, 5, 10, 20, 50, 200, 500},
+ };
+ }
+ return null;
+ })
+ .Build();
}
- public override void shutdown()
+ public override async Task Shutdown()
{
// Release local resources
- base.shutdown();
+ await base.Shutdown();
}
- public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
+ protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
+ request.ClientType = rmq::ClientType.Producer;
+ // Concept of ProducerGroup has been removed.
}
- public async Task<SendResult> send(Message message)
+ public async Task<SendReceipt> Send(Message message)
{
- if (!loadBalancer.ContainsKey(message.Topic))
+ if (!_loadBalancer.ContainsKey(message.Topic))
{
- var topicRouteData = await getRouteFor(message.Topic, false);
- if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+ var topicRouteData = await GetRouteFor(message.Topic, false);
+ if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
{
+ Logger.Error($"Failed to resolve route info for {message.Topic}");
throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
}
var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
- loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+ _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
}
- var publishLB = loadBalancer[message.Topic];
+ var publishLb = _loadBalancer[message.Topic];
var request = new rmq::SendMessageRequest();
- request.Message = new rmq::Message();
- request.Message.Body = pb::ByteString.CopyFrom(message.Body);
- request.Message.Topic = new rmq::Resource();
- request.Message.Topic.ResourceNamespace = resourceNamespace();
- request.Message.Topic.Name = message.Topic;
+ var entry = new rmq::Message();
+ entry.Body = ByteString.CopyFrom(message.Body);
+ entry.Topic = new rmq::Resource();
+ entry.Topic.ResourceNamespace = resourceNamespace();
+ entry.Topic.Name = message.Topic;
+ request.Messages.Add(entry);
// User properties
foreach (var item in message.UserProperties)
{
- request.Message.UserAttribute.Add(item.Key, item.Value);
+ entry.UserProperties.Add(item.Key, item.Value);
}
- request.Message.SystemAttribute = new rmq::SystemAttribute();
- request.Message.SystemAttribute.MessageId = message.MessageId;
+ entry.SystemProperties = new rmq::SystemProperties();
+ entry.SystemProperties.MessageId = message.MessageId;
+ entry.SystemProperties.MessageType = rmq::MessageType.Normal;
+ if (DateTime.MinValue != message.DeliveryTimestamp)
+ {
+ entry.SystemProperties.MessageType = rmq::MessageType.Delay;
+ entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+
+ if (message.Fifo())
+ {
+ Logger.Warn("A message may not be FIFO and delayed at the same time");
+ throw new MessageException("A message may not be both FIFO and Timed");
+ }
+ } else if (!String.IsNullOrEmpty(message.MessageGroup))
+ {
+ entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
+ entry.SystemProperties.MessageGroup = message.MessageGroup;
+ }
+
if (!string.IsNullOrEmpty(message.Tag))
{
- request.Message.SystemAttribute.Tag = message.Tag;
+ entry.SystemProperties.Tag = message.Tag;
}
if (0 != message.Keys.Count)
{
foreach (var key in message.Keys)
{
- request.Message.SystemAttribute.Keys.Add(key);
+ entry.SystemProperties.Keys.Add(key);
}
}
- // string target = "https://";
List<string> targets = new List<string>();
- List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
- foreach (var partition in candidates)
+ List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
+ foreach (var messageQueue in candidates)
{
- targets.Add(partition.Broker.targetUrl());
+ targets.Add(Utilities.TargetUrl(messageQueue));
}
- var metadata = new grpc::Metadata();
+ var metadata = new Metadata();
Signature.sign(this, metadata);
Exception ex = null;
@@ -110,27 +165,46 @@
{
try
{
- rmq::SendMessageResponse response = await clientManager.sendMessage(target, metadata, request, getIoTimeout());
- if (null != response && (int)global::Google.Rpc.Code.Ok == response.Common.Status.Code)
+ var stopWatch = new Stopwatch();
+ stopWatch.Start();
+ rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
+ if (null != response && rmq::Code.Ok == response.Status.Code)
{
- var messageId = response.MessageId;
- return new SendResult(messageId);
+ var messageId = response.Entries[0].MessageId;
+
+ // Account latency histogram
+ stopWatch.Stop();
+ var latency = stopWatch.ElapsedMilliseconds;
+ _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
+
+ return new SendReceipt(messageId);
}
}
catch (Exception e)
{
+ // Account failure count
+ _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));
+ Logger.Info(e, $"Failed to send message to {target}");
ex = e;
}
}
if (null != ex)
{
+ Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
throw ex;
}
+ Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
throw new Exception("Send message failed");
}
- private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
+ private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
+
+ private readonly Counter<long> _sendFailureTotal;
+ private readonly Histogram<double> _sendLatency;
+
+ private static readonly string SendLatencyName = "rocketmq_send_success_cost_time";
+ private MeterProvider _meterProvider;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto
deleted file mode 100644
index 33f4644..0000000
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto
+++ /dev/null
@@ -1,351 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-package apache.rocketmq.v1;
-
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQDomain";
-
-option csharp_namespace = "apache.rocketmq.v1";
-
-enum Permission {
- NONE = 0;
- READ = 1;
- WRITE = 2;
- READ_WRITE = 3;
-
- reserved 4 to 64;
-}
-
-enum FilterType {
- TAG = 0;
- SQL = 1;
-
- reserved 2 to 64;
-}
-
-message FilterExpression {
- FilterType type = 1;
- string expression = 2;
-
- reserved 3 to 64;
-}
-
-// Dead lettering is done on a best effort basis. The same message might be
-// dead lettered multiple times.
-//
-// If validation on any of the fields fails at subscription creation/update,
-// the create/update subscription request will fail.
-message DeadLetterPolicy {
- // The maximum number of delivery attempts for any message.
- //
- // This field will be honored on a best effort basis.
- //
- // If this parameter is 0, a default value of 16 is used.
- int32 max_delivery_attempts = 1;
-
- reserved 2 to 64;
-}
-
-message Resource {
- string resource_namespace = 1;
-
- // Resource name identifier, which remains unique within the abstract resource
- // namespace.
- string name = 2;
-
- reserved 3 to 64;
-}
-
-enum ConsumeModel {
- CLUSTERING = 0;
- BROADCASTING = 1;
-
- reserved 2 to 64;
-}
-
-message ProducerData {
- Resource group = 1;
-
- reserved 2 to 64;
-}
-
-enum ConsumePolicy {
- RESUME = 0;
- PLAYBACK = 1;
- DISCARD = 2;
- TARGET_TIMESTAMP = 3;
-
- reserved 4 to 64;
-}
-
-enum ConsumeMessageType {
- ACTIVE = 0;
- PASSIVE = 1;
-
- reserved 2 to 64;
-}
-
-message ConsumerData {
- Resource group = 1;
-
- repeated SubscriptionEntry subscriptions = 2;
-
- ConsumeModel consume_model = 3;
-
- ConsumePolicy consume_policy = 4;
-
- DeadLetterPolicy dead_letter_policy = 5;
-
- ConsumeMessageType consume_type = 6;
-
- reserved 7 to 64;
-}
-
-message SubscriptionEntry {
- Resource topic = 1;
- FilterExpression expression = 2;
-
- reserved 3 to 64;
-}
-
-enum AddressScheme {
- IPv4 = 0;
- IPv6 = 1;
- DOMAIN_NAME = 2;
-
- reserved 3 to 64;
-}
-
-message Address {
- string host = 1;
- int32 port = 2;
-
- reserved 3 to 64;
-}
-
-message Endpoints {
- AddressScheme scheme = 1;
- repeated Address addresses = 2;
-
- reserved 3 to 64;
-}
-
-message Broker {
- // Name of the broker
- string name = 1;
-
- // Broker index. Canonically, index = 0 implies that the broker is playing
- // leader role while brokers with index > 0 play follower role.
- int32 id = 2;
-
- // Address of the broker, complying with the following scheme
- // 1. dns:[//authority/]host[:port]
- // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
- // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
- Endpoints endpoints = 3;
-
- reserved 4 to 64;
-}
-
-message Partition {
- Resource topic = 1;
- int32 id = 2;
- Permission permission = 3;
- Broker broker = 4;
-
- reserved 5 to 64;
-}
-
-enum MessageType {
- NORMAL = 0;
-
- // Sequenced message
- FIFO = 1;
-
- // Messages that are delivered after the specified duration.
- DELAY = 2;
-
- // Messages that are transactional. Only committed messages are delivered to
- // subscribers.
- TRANSACTION = 3;
-
- reserved 4 to 64;
-}
-
-enum DigestType {
- // CRC algorithm achieves goal of detecting random data error with lowest
- // computation overhead.
- CRC32 = 0;
-
- // MD5 algorithm achieves good balance between collision rate and computation
- // overhead.
- MD5 = 1;
-
- // SHA-family has substantially fewer collision with fair amount of
- // computation.
- SHA1 = 2;
-
- reserved 3 to 64;
-}
-
-// When publishing messages to or subscribing messages from brokers, clients
-// shall include or validate digests of message body to ensure data integrity.
-//
-// For message publishment, when an invalid digest were detected, brokers need
-// respond client with BAD_REQUEST.
-//
-// For messags subscription, when an invalid digest were detected, consumers
-// need to handle this case according to message type:
-// 1) Standard messages should be negatively acknowledged instantly, causing
-// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
-// previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
-message Digest {
- DigestType type = 1;
- string checksum = 2;
-
- reserved 3 to 64;
-}
-
-enum Encoding {
- IDENTITY = 0;
- GZIP = 1;
-
- reserved 2 to 64;
-}
-
-message SystemAttribute {
- // Tag
- string tag = 1;
-
- // Message keys
- repeated string keys = 2;
-
- // Message identifier, client-side generated, remains unique.
- // if message_id is empty, the send message request will be aborted with
- // status `INVALID_ARGUMENT`
- string message_id = 3;
-
- // Message body digest
- Digest body_digest = 4;
-
- // Message body encoding. Candidate options are identity, gzip, snappy etc.
- Encoding body_encoding = 5;
-
- // Message type, normal, FIFO or transactional.
- MessageType message_type = 6;
-
- // Message born time-point.
- google.protobuf.Timestamp born_timestamp = 7;
-
- // Message born host. Valid options are IPv4, IPv6 or client host domain name.
- string born_host = 8;
-
- // Time-point at which the message is stored in the broker.
- google.protobuf.Timestamp store_timestamp = 9;
-
- // The broker that stores this message. It may be name, IP or arbitrary
- // identifier that uniquely identify the broker.
- string store_host = 10;
-
- oneof timed_delivery {
- // Time-point at which broker delivers to clients.
- google.protobuf.Timestamp delivery_timestamp = 11;
-
- // Level-based delay strategy.
- int32 delay_level = 12;
- }
-
- // If a message is acquired by way of POP, this field holds the receipt.
- // Clients use the receipt to acknowledge or negatively acknowledge the
- // message.
- string receipt_handle = 13;
-
- // Partition identifier in which a message is physically stored.
- int32 partition_id = 14;
-
- // Partition offset at which a message is stored.
- int64 partition_offset = 15;
-
- // Period of time servers would remain invisible once a message is acquired.
- google.protobuf.Duration invisible_period = 16;
-
- // Business code may failed to process messages for the moment. Hence, clients
- // may request servers to deliver them again using certain back-off strategy,
- // the attempt is 1 not 0 if message is delivered first time.
- int32 delivery_attempt = 17;
-
- // Message producer load-balance group if applicable.
- Resource producer_group = 18;
-
- string message_group = 19;
-
- // Trace context.
- string trace_context = 20;
-
- // Delay time of first recover orphaned transaction request from server.
- google.protobuf.Duration orphaned_transaction_recovery_period = 21;
-
- reserved 22 to 64;
-}
-
-message Message {
-
- Resource topic = 1;
-
- // User defined key-value pairs.
- // If user_attribute contains the reserved keys by RocketMQ,
- // the send message request will be aborted with status `INVALID_ARGUMENT`.
- // See below links for the reserved keys
- // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
- map<string, string> user_attribute = 2;
-
- SystemAttribute system_attribute = 3;
-
- bytes body = 4;
-
- reserved 5 to 64;
-}
-
-message Assignment {
- Partition Partition = 1;
-
- reserved 2 to 64;
-}
-
-enum QueryOffsetPolicy {
- // Use this option if client wishes to playback all existing messages.
- BEGINNING = 0;
-
- // Use this option if client wishes to skip all existing messages.
- END = 1;
-
- // Use this option if time-based seek is targeted.
- TIME_POINT = 2;
-
- reserved 3 to 64;
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto
deleted file mode 100644
index 6f1b4c1..0000000
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto
+++ /dev/null
@@ -1,522 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
-
-import "apache/rocketmq/v1/definition.proto";
-
-package apache.rocketmq.v1;
-
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQService";
-
-option csharp_namespace = "apache.rocketmq.v1";
-
-message ResponseCommon {
- google.rpc.Status status = 1;
- google.rpc.RequestInfo request_info = 2;
- google.rpc.Help help = 3;
- google.rpc.RetryInfo retry_info = 4;
- google.rpc.DebugInfo debug_info = 5;
- google.rpc.ErrorInfo error_info = 6;
-
- reserved 7 to 64;
-}
-
-// Topics are destination of messages to publish to or subscribe from. Similar
-// to domain names, they will be addressable after resolution through the
-// provided access point.
-//
-// Access points are usually the addresses of name servers, which fulfill
-// service discovery, load-balancing and other auxillary services. Name servers
-// receive periodic heartbeats from affiliate brokers and erase those which
-// failed to maintain alive status.
-//
-// Name servers answer queries of QueryRouteRequest, responding clients with
-// addressable partitions, which they may directly publish messages to or
-// subscribe messages from.
-//
-// QueryRouteRequest shall include source endpoints, aka, configured
-// access-point, which annotates tenant-id, instance-id or other
-// vendor-specific settings. Purpose-built name servers may respond customized
-// results based on these particular requirements.
-message QueryRouteRequest {
- Resource topic = 1;
-
- Endpoints endpoints = 2;
-
- reserved 3 to 64;
-}
-
-message QueryRouteResponse {
- ResponseCommon common = 1;
-
- repeated Partition partitions = 2;
-
- reserved 3 to 64;
-}
-
-message SendMessageRequest {
- Message message = 1;
- Partition partition = 2;
-
- reserved 3 to 64;
-}
-
-message SendMessageResponse {
- ResponseCommon common = 1;
- string message_id = 2;
- string transaction_id = 3;
-
- reserved 4 to 64;
-}
-
-message QueryAssignmentRequest {
- Resource topic = 1;
- Resource group = 2;
- string client_id = 3;
-
- // Service access point
- Endpoints endpoints = 4;
-
- reserved 5 to 64;
-}
-
-message QueryAssignmentResponse {
- ResponseCommon common = 1;
- repeated Assignment assignments = 2;
-
- reserved 3 to 64;
-}
-
-message ReceiveMessageRequest {
- Resource group = 1;
- string client_id = 2;
- Partition partition = 3;
- FilterExpression filter_expression = 4;
- ConsumePolicy consume_policy = 5;
- google.protobuf.Timestamp initialization_timestamp = 6;
- int32 batch_size = 7;
- google.protobuf.Duration invisible_duration = 8;
- google.protobuf.Duration await_time = 9;
- bool fifo_flag = 10;
-
- reserved 11 to 64;
-}
-
-message ReceiveMessageResponse {
- ResponseCommon common = 1;
- repeated Message messages = 2;
- google.protobuf.Timestamp delivery_timestamp = 3;
- google.protobuf.Duration invisible_duration = 4;
-
- reserved 5 to 64;
-}
-
-message AckMessageRequest {
- Resource group = 1;
- Resource topic = 2;
- string client_id = 3;
- oneof handle {
- string receipt_handle = 4;
- int64 offset = 5;
- }
- string message_id = 6;
-
- reserved 7 to 64;
-}
-
-message AckMessageResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message NackMessageRequest {
- Resource group = 1;
- Resource topic = 2;
- string client_id = 3;
- string receipt_handle = 4;
- string message_id = 5;
- int32 delivery_attempt = 6;
- int32 max_delivery_attempts = 7;
-
- reserved 8 to 64;
-}
-
-message NackMessageResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message ForwardMessageToDeadLetterQueueRequest {
- Resource group = 1;
- Resource topic = 2;
- string client_id = 3;
- string receipt_handle = 4;
- string message_id = 5;
- int32 delivery_attempt = 6;
- int32 max_delivery_attempts = 7;
-
- reserved 8 to 64;
-}
-
-message ForwardMessageToDeadLetterQueueResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message HeartbeatRequest {
- string client_id = 1;
- oneof client_data {
- ProducerData producer_data = 2;
- ConsumerData consumer_data = 3;
- }
- bool fifo_flag = 4;
-
- reserved 5 to 64;
-}
-
-message HeartbeatResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message HealthCheckRequest {
- Resource group = 1;
- string client_host = 2;
-
- reserved 3 to 64;
-}
-
-message HealthCheckResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message EndTransactionRequest {
- Resource group = 1;
- string message_id = 2;
- string transaction_id = 3;
- enum TransactionResolution {
- COMMIT = 0;
- ROLLBACK = 1;
- }
- TransactionResolution resolution = 4;
- enum Source {
- CLIENT = 0;
- SERVER_CHECK = 1;
- }
- Source source = 5;
- string trace_context = 6;
-
- reserved 7 to 64;
-}
-
-message EndTransactionResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message QueryOffsetRequest {
- Partition partition = 1;
- QueryOffsetPolicy policy = 2;
- google.protobuf.Timestamp time_point = 3;
-
- reserved 4 to 64;
-}
-
-message QueryOffsetResponse {
- ResponseCommon common = 1;
- int64 offset = 2;
-
- reserved 3 to 64;
-}
-
-message PullMessageRequest {
- Resource group = 1;
- Partition partition = 2;
- int64 offset = 3;
- int32 batch_size = 4;
- google.protobuf.Duration await_time = 5;
- FilterExpression filter_expression = 6;
- string client_id = 7;
-
- reserved 8 to 64;
-}
-
-message PullMessageResponse {
- ResponseCommon common = 1;
- int64 min_offset = 2;
- int64 next_offset = 3;
- int64 max_offset = 4;
- repeated Message messages = 5;
-
- reserved 6 to 64;
-}
-
-message NoopCommand { reserved 1 to 64; }
-
-message PrintThreadStackTraceCommand {
- string command_id = 1;
-
- reserved 2 to 64;
-}
-
-message ReportThreadStackTraceRequest {
- string command_id = 1;
- string thread_stack_trace = 2;
-
- reserved 3 to 64;
-}
-
-message ReportThreadStackTraceResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
- string command_id = 1;
- Message message = 2;
-
- reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultRequest {
- string command_id = 1;
-
- // 1. Return `INVALID_ARGUMENT` if message is corrupted.
- // 2. Return `INTERNAL` if failed to consume message.
- // 3. Return `OK` if success.
- google.rpc.Status status = 2;
-
- reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message RecoverOrphanedTransactionCommand {
- Message orphaned_transactional_message = 1;
- string transaction_id = 2;
-
- reserved 3 to 64;
-}
-
-message PollCommandRequest {
- string client_id = 1;
- repeated Resource topics = 2;
- oneof group {
- Resource producer_group = 3;
- Resource consumer_group = 4;
- }
-
- reserved 5 to 64;
-}
-
-message PollCommandResponse {
- oneof type {
- // Default command when no new command need to be delivered.
- NoopCommand noop_command = 1;
- // Request client to print thread stack trace.
- PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
- // Request client to verify the consumption of the appointed message.
- VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
- // Request client to recover the orphaned transaction message.
- RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
- }
-
- reserved 5 to 64;
-}
-
-message NotifyClientTerminationRequest {
- oneof group {
- Resource producer_group = 1;
- Resource consumer_group = 2;
- }
- string client_id = 3;
-
- reserved 4 to 64;
-}
-
-message NotifyClientTerminationResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-// For all the RPCs in MessagingService, the following error handling policies
-// apply:
-//
-// If the request doesn't bear a valid authentication credential, return a
-// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
-// user is not granted with sufficient permission to execute the requested
-// operation, return a response with common.status.code == `PERMISSION_DENIED`.
-// If the per-user-resource-based quota is exhausted, return a response with
-// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
-// errors raise, return a response with common.status.code == `INTERNAL`.
-service MessagingService {
-
- // Queries the route entries of the requested topic in the perspective of the
- // given endpoints. On success, servers should return a collection of
- // addressable partitions. Note servers may return customized route entries
- // based on endpoints provided.
- //
- // If the requested topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
- rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
-
- // Producer or consumer sends HeartbeatRequest to servers periodically to
- // keep-alive. Additionally, it also reports client-side configuration,
- // including topic subscription, load-balancing group name, etc.
- //
- // Returns `OK` if success.
- //
- // If a client specifies a language that is not yet supported by servers,
- // returns `INVALID_ARGUMENT`
- rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
-
- // Checks the health status of message server, returns `OK` if services are
- // online and serving. Clients may use this RPC to detect availability of
- // messaging service, and take isolation actions when necessary.
- rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
-
- // Delivers messages to brokers.
- // Clients may further:
- // 1. Refine a message destination to topic partition which fulfills parts of
- // FIFO semantic;
- // 2. Flag a message as transactional, which keeps it invisible to consumers
- // until it commits;
- // 3. Time a message, making it invisible to consumers till specified
- // time-point;
- // 4. And more...
- //
- // Returns message-id or transaction-id with status `OK` on success.
- //
- // If the destination topic doesn't exist, returns `NOT_FOUND`.
- rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
-
- // Queries the assigned partition route info of a topic for current consumer,
- // the returned assignment result is decided by server-side load balancer.
- //
- // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
- rpc QueryAssignment(QueryAssignmentRequest)
- returns (QueryAssignmentResponse) {}
-
- // Receives messages from the server in batch manner, returns a batch of
- // messages if success. The received messages should be ACKed or NACKed after
- // processing.
- //
- // If the pending concurrent receive requests exceed the quota of the given
- // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
- // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
- // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
- // message in the specific topic, returns `OK` with an empty message set.
- // Please note that client may suffer from false empty responses.
- rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
-
- // Acknowledges the message associated with the `receipt_handle` or `offset`
- // in the `AckMessageRequest`, it means the message has been successfully
- // processed. Returns `OK` if the message server remove the relevant message
- // successfully.
- //
- // If the given receipt_handle is illegal or out of date, returns
- // `INVALID_ARGUMENT`.
- rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
-
- // Signals that the message has not been successfully processed. The message
- // server should resend the message follow the retry policy defined at
- // server-side.
- //
- // If the corresponding topic or consumer group doesn't exist, returns
- // `NOT_FOUND`.
- rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
-
- // Forwards one message to dead letter queue if the DeadLetterPolicy is
- // triggered by this message at client-side, return `OK` if success.
- rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
- returns (ForwardMessageToDeadLetterQueueResponse) {}
-
- // Commits or rollback one transactional message.
- rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
-
- // Queries the offset of the specific partition, returns the offset with `OK`
- // if success. The message server should maintain a numerical offset for each
- // message in a partition.
- rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
-
- // Pulls messages from the specific partition, returns a set of messages with
- // next pull offset. The pulled messages can't be ACKed or NACKed, while the
- // client is responsible for manage offsets for consumer, typically update
- // consume offset to local memory or a third-party storage service.
- //
- // If the pending concurrent receive requests exceed the quota of the given
- // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
- // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
- // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
- // message in the specific topic, returns `OK` with an empty message set.
- // Please note that client may suffer from false empty responses.
- rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
-
- // Multiplexing RPC(s) for various polling requests, which issue different
- // commands to client.
- //
- // Sometimes client may need to receive and process the command from server.
- // To prevent the complexity of streaming RPC(s), a unary RPC using
- // long-polling is another solution.
- //
- // To mark the request-response of corresponding command, `command_id` in
- // message is recorded in the subsequent RPC(s). For example, after receiving
- // command of printing thread stack trace, client would send
- // `ReportMessageConsumptionResultRequest` to server, which contain both of
- // the stack trace and `command_id`.
- //
- // At same time, `NoopCommand` is delivered from server when no new command is
- // needed, it is essential for client to maintain the ping-pong.
- //
- rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
- // After receiving the corresponding polling command, the thread stack trace
- // is reported to the server.
- rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
- returns (ReportThreadStackTraceResponse) {}
-
- // After receiving the corresponding polling command, the consumption result
- // of appointed message is reported to the server.
- rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
- returns (ReportMessageConsumptionResultResponse) {}
-
- // Notify the server that the client is terminated.
- rpc NotifyClientTermination(NotifyClientTerminationRequest)
- returns (NotifyClientTerminationResponse) {}
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
similarity index 86%
rename from rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto
rename to rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
index 554207b..7dbb702 100644
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
@@ -15,11 +15,12 @@
syntax = "proto3";
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
+option java_package = "apache.rocketmq.v2";
option java_generate_equals_and_hash = true;
option java_string_check_utf8 = true;
option java_outer_classname = "MQAdmin";
@@ -35,11 +36,8 @@
Level level = 1;
}
-message ChangeLogLevelResponse {
- string remark = 1;
-}
+message ChangeLogLevelResponse { string remark = 1; }
service Admin {
- rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {
- }
+ rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..21a6321
--- /dev/null
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+ TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+ COMMIT = 1;
+ ROLLBACK = 2;
+}
+
+enum TransactionSource {
+ SOURCE_UNSPECIFIED = 0;
+ SOURCE_CLIENT = 1;
+ SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+ PERMISSION_UNSPECIFIED = 0;
+ NONE = 1;
+ READ = 2;
+ WRITE = 3;
+ READ_WRITE = 4;
+}
+
+enum FilterType {
+ FILTER_TYPE_UNSPECIFIED = 0;
+ TAG = 1;
+ SQL = 2;
+}
+
+message FilterExpression {
+ FilterType type = 1;
+ string expression = 2;
+}
+
+message RetryPolicy {
+ int32 max_attempts = 1;
+ oneof strategy {
+ ExponentialBackoff exponential_backoff = 2;
+ CustomizedBackoff customized_backoff = 3;
+ }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+ google.protobuf.Duration initial = 1;
+ google.protobuf.Duration max = 2;
+ float multiplier = 3;
+}
+
+message CustomizedBackoff {
+ // To support classic backoff strategy which is arbitary defined by end users.
+ // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
+ repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+ string resource_namespace = 1;
+
+ // Resource name identifier, which remains unique within the abstract resource
+ // namespace.
+ string name = 2;
+}
+
+message SubscriptionEntry {
+ Resource topic = 1;
+ FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+ ADDRESS_SCHEME_UNSPECIFIED = 0;
+ IPv4 = 1;
+ IPv6 = 2;
+ DOMAIN_NAME = 3;
+}
+
+message Address {
+ string host = 1;
+ int32 port = 2;
+}
+
+message Endpoints {
+ AddressScheme scheme = 1;
+ repeated Address addresses = 2;
+}
+
+message Broker {
+ // Name of the broker
+ string name = 1;
+
+ // Broker index. Canonically, index = 0 implies that the broker is playing
+ // leader role while brokers with index > 0 play follower role.
+ int32 id = 2;
+
+ // Address of the broker, complying with the following scheme
+ // 1. dns:[//authority/]host[:port]
+ // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+ // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+ Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+ Resource topic = 1;
+ int32 id = 2;
+ Permission permission = 3;
+ Broker broker = 4;
+ repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+ MESSAGE_TYPE_UNSPECIFIED = 0;
+
+ NORMAL = 1;
+
+ // Sequenced message
+ FIFO = 2;
+
+ // Messages that are delivered after the specified duration.
+ DELAY = 3;
+
+ // Messages that are transactional. Only committed messages are delivered to
+ // subscribers.
+ TRANSACTION = 4;
+}
+
+enum DigestType {
+ DIGEST_TYPE_UNSPECIFIED = 0;
+
+ // CRC algorithm achieves goal of detecting random data error with lowest
+ // computation overhead.
+ CRC32 = 1;
+
+ // MD5 algorithm achieves good balance between collision rate and computation
+ // overhead.
+ MD5 = 2;
+
+ // SHA-family has substantially fewer collision with fair amount of
+ // computation.
+ SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+ DigestType type = 1;
+ string checksum = 2;
+}
+
+enum ClientType {
+ CLIENT_TYPE_UNSPECIFIED = 0;
+ PRODUCER = 1;
+ PUSH_CONSUMER = 2;
+ SIMPLE_CONSUMER = 3;
+}
+
+enum Encoding {
+ ENCODING_UNSPECIFIED = 0;
+
+ IDENTITY = 1;
+
+ GZIP = 2;
+}
+
+message SystemProperties {
+ // Tag, which is optional.
+ optional string tag = 1;
+
+ // Message keys
+ repeated string keys = 2;
+
+ // Message identifier, client-side generated, remains unique.
+ // if message_id is empty, the send message request will be aborted with
+ // status `INVALID_ARGUMENT`
+ string message_id = 3;
+
+ // Message body digest
+ Digest body_digest = 4;
+
+ // Message body encoding. Candidate options are identity, gzip, snappy etc.
+ Encoding body_encoding = 5;
+
+ // Message type, normal, FIFO or transactional.
+ MessageType message_type = 6;
+
+ // Message born time-point.
+ google.protobuf.Timestamp born_timestamp = 7;
+
+ // Message born host. Valid options are IPv4, IPv6 or client host domain name.
+ string born_host = 8;
+
+ // Time-point at which the message is stored in the broker, which is absent
+ // for message publishing.
+ optional google.protobuf.Timestamp store_timestamp = 9;
+
+ // The broker that stores this message. It may be broker name, IP or arbitrary
+ // identifier that uniquely identify the server.
+ string store_host = 10;
+
+ // Time-point at which broker delivers to clients, which is optional.
+ optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+ // If a message is acquired by way of POP, this field holds the receipt,
+ // which is absent for message publishing.
+ // Clients use the receipt to acknowledge or negatively acknowledge the
+ // message.
+ optional string receipt_handle = 12;
+
+ // Message queue identifier in which a message is physically stored.
+ int32 queue_id = 13;
+
+ // Message-queue offset at which a message is stored, which is absent for
+ // message publishing.
+ optional int64 queue_offset = 14;
+
+ // Period of time servers would remain invisible once a message is acquired.
+ optional google.protobuf.Duration invisible_duration = 15;
+
+ // Business code may failed to process messages for the moment. Hence, clients
+ // may request servers to deliver them again using certain back-off strategy,
+ // the attempt is 1 not 0 if message is delivered first time, and it is absent
+ // for message publishing.
+ optional int32 delivery_attempt = 16;
+
+ // Define the group name of message in the same topic, which is optional.
+ optional string message_group = 17;
+
+ // Trace context for each message, which is optional.
+ optional string trace_context = 18;
+
+ // If a transactional message stay unresolved for more than
+ // `transaction_orphan_threshold`, it would be regarded as an
+ // orphan. Servers that manages orphan messages would pick up
+ // a capable publisher to resolve
+ optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+
+ Resource topic = 1;
+
+ // User defined key-value pairs.
+ // If user_properties contain the reserved keys by RocketMQ,
+ // the send message request will be aborted with status `INVALID_ARGUMENT`.
+ // See below links for the reserved keys
+ // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+ map<string, string> user_properties = 2;
+
+ SystemProperties system_properties = 3;
+
+ bytes body = 4;
+}
+
+message Assignment { MessageQueue message_queue = 1; }
+
+enum Code {
+ // Success.
+ OK = 0;
+ // Format of access point is illegal.
+ ILLEGAL_ACCESS_POINT = 1;
+ // Format of topic is illegal.
+ ILLEGAL_TOPIC = 2;
+ // Format of consumer group is illegal.
+ ILLEGAL_CONSUMER_GROUP = 3;
+ // Format of message tag is illegal.
+ ILLEGAL_MESSAGE_TAG = 4;
+ // Format of message key is illegal.
+ ILLEGAL_MESSAGE_KEY = 5;
+ // Size of message keys exceeds the threshold.
+ MESSAGE_KEYS_TOO_LARGE = 6;
+ // Format of message group is illegal.
+ ILLEGAL_MESSAGE_GROUP = 7;
+ // Format of message property key is illegal.
+ ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 9;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 10;
+
+ // User does not have the permission to operate.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
+ FORBIDDEN = 403;
+
+ // Code indicates that the client request has not been completed
+ // because it lacks valid authentication credentials for the
+ // requested resource.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
+ UNAUTHORIZED = 401;
+
+ // Topic resource does not exist.
+ TOPIC_NOT_FOUND = 13;
+
+ // Consumer group resource does not exist.
+ CONSUMER_GROUP_NOT_FOUND = 14;
+
+ // Not allowed to verify message. Chances are that you are verifying
+ // a FIFO message, as is violating FIFO semantics.
+ VERIFY_MESSAGE_FORBIDDEN = 15;
+
+ // Failed to consume message.
+ FAILED_TO_CONSUME_MESSAGE = 16;
+
+ // Message is corrupted.
+ MESSAGE_CORRUPTED = 17;
+
+ // Too many requests are made in short period of duration.
+ // Requests are throttled.
+ TOO_MANY_REQUESTS = 18;
+
+ // Expired receipt-handle is used when trying to acknowledge or change
+ // invisible duration of a message
+ RECEIPT_HANDLE_EXPIRED = 19;
+
+ // Message property is not match the message type.
+ MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+
+ // Format of message id is illegal.
+ ILLEGAL_MESSAGE_ID = 21;
+
+ // Transaction id is invalid.
+ INVALID_TRANSACTION_ID = 22;
+
+ // Format of filter expression is illegal.
+ ILLEGAL_FILTER_EXPRESSION = 23;
+
+ // Receipt handle of message is invalid.
+ INVALID_RECEIPT_HANDLE = 24;
+
+ // Message persistence timeout.
+ MASTER_PERSISTENCE_TIMEOUT = 25;
+
+ // Slave persistence timeout.
+ SLAVE_PERSISTENCE_TIMEOUT = 26;
+
+ // The HA-mechanism is not working now.
+ HA_NOT_AVAILABLE = 27;
+
+ // Operation is not allowed in current version.
+ VERSION_UNSUPPORTED = 28;
+
+ // Message not found from server.
+ MESSAGE_NOT_FOUND = 29;
+
+ // Message offset is illegal.
+ ILLEGAL_MESSAGE_OFFSET = 30;
+
+ // Illegal message is for the sake of backward compatibility. In most case,
+ // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
+ ILLEGAL_MESSAGE = 31;
+
+ // Client type could not be recognized.
+ UNRECOGNIZED_CLIENT_TYPE = 32;
+
+ // Return different results for entries in composite request.
+ MULTIPLE_RESULTS = 33;
+
+ // Code indicates that the server encountered an unexpected condition
+ // that prevented it from fulfilling the request.
+ // This error response is a generic "catch-all" response.
+ // Usually, this indicates the server cannot find a better alternative
+ // error code to response. Sometimes, server administrators log error
+ // responses like the 500 status code with more details about the request
+ // to prevent the error from happening again in the future.
+ //
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+ INTERNAL_SERVER_ERROR = 500;
+
+ // Code means that the server or client does not support the functionality
+ // required to fulfill the request.
+ NOT_IMPLEMENTED = 501;
+
+ // Code indicates that the server, while acting as a gateway or proxy,
+ // did not get a response in time from the upstream server that
+ // it needed in order to complete the request.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+ GATEWAY_TIMEOUT = 504;
+}
+
+message Status {
+ Code code = 1;
+ string message = 2;
+}
+
+enum Language {
+ LANGUAGE_UNSPECIFIED = 0;
+ JAVA = 1;
+ CPP = 2;
+ DOT_NET = 3;
+ GOLANG = 4;
+ RUST = 5;
+}
+
+// User Agent
+message UA {
+ // SDK language
+ Language language = 1;
+
+ // SDK version
+ string version = 2;
+
+ // Platform details, including OS name, version, arch etc.
+ string platform = 3;
+
+ // Hostname of the node
+ string hostname = 4;
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
new file mode 100644
index 0000000..c7ce2e9
--- /dev/null
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+ Resource topic = 1;
+ Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+ Status status = 1;
+
+ repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+ repeated Message messages = 1;
+}
+
+message SendResultEntry {
+ Status status = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ int64 offset = 4;
+}
+
+message SendMessageResponse {
+ Status status = 1;
+
+ // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+ // each entry for best certainty.
+ repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+ Resource topic = 1;
+ Resource group = 2;
+ Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+ Status status = 1;
+ repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ FilterExpression filter_expression = 3;
+ int32 batch_size = 4;
+ // Required if client type is simple consumer.
+ optional google.protobuf.Duration invisible_duration = 5;
+ // For message auto renew and clean
+ bool auto_renew = 6;
+}
+
+message ReceiveMessageResponse {
+ oneof content {
+ Status status = 1;
+ Message message = 2;
+ // The timestamp that brokers start to deliver status line or message.
+ google.protobuf.Timestamp delivery_timestamp = 3;
+ }
+}
+
+message AckMessageEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+
+ // Acknowledge result may be acquired through inspecting
+ // `status.code`; In case acknowledgement failed, `status.message`
+ // is the explanation of the failure.
+ Status status = 3;
+}
+
+message AckMessageResponse {
+
+ // RPC tier status, which is used to represent RPC-level errors including
+ // authentication, authorization, throttling and other general failures.
+ Status status = 1;
+
+ repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ string receipt_handle = 3;
+ string message_id = 4;
+ int32 delivery_attempt = 5;
+ int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
+
+message HeartbeatRequest {
+ optional Resource group = 1;
+ ClientType client_type = 2;
+}
+
+message HeartbeatResponse { Status status = 1; }
+
+message EndTransactionRequest {
+ Resource topic = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ TransactionResolution resolution = 4;
+ TransactionSource source = 5;
+ string trace_context = 6;
+}
+
+message EndTransactionResponse { Status status = 1; }
+
+message PrintThreadStackTraceCommand { string nonce = 1; }
+
+message ThreadStackTrace {
+ string nonce = 1;
+ optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+ string nonce = 1;
+ MessageQueue message_queue = 2;
+ Message message = 3;
+}
+
+message VerifyMessageResult {
+ string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+ MessageQueue message_queue = 1;
+ Message orphaned_transactional_message = 2;
+ string transaction_id = 3;
+}
+
+message Publishing {
+ // Publishing settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // List of topics to which messages will publish to.
+ repeated Resource topics = 1;
+
+ // Publishing settings below here are from server, it is essential for
+ // server to push.
+ //
+ // Body of message will be deflated if its size in bytes exceeds the
+ // threshold.
+ int32 compress_body_threshold = 2;
+
+ // If the message body size exceeds `max_body_size`, broker servers would
+ // reject the request. As a result, it is advisable that Producer performs
+ // client-side check validation.
+ int32 max_body_size = 3;
+}
+
+message Subscription {
+ // Subscription settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // Consumer group.
+ optional Resource group = 1;
+
+ // Subscription for consumer.
+ repeated SubscriptionEntry subscriptions = 2;
+
+ // Subscription settings below here are from server, it is essential for
+ // server to push.
+ //
+ // When FIFO flag is `true`, messages of the same message group are processed
+ // in first-in-first-out manner.
+ //
+ // Brokers will not deliver further messages of the same group utill prior
+ // ones are completely acknowledged.
+ optional bool fifo = 3;
+
+ // Message receive batch size here is essential for push consumer.
+ optional int32 receive_batch_size = 4;
+
+ // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+ // push consumer.
+ optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+ // Indicates that if client should export local metrics to server.
+ bool on = 1;
+
+ // The endpoint that client metrics should be exported to, which is required if the switch is on.
+ optional Endpoints endpoints = 2;
+}
+
+message Settings {
+ // Configurations for all clients.
+ optional ClientType client_type = 1;
+
+ optional Endpoints access_point = 2;
+
+ // If publishing of messages encounters throttling or server internal errors,
+ // publishers should implement automatic retries after progressive longer
+ // back-offs for consecutive errors.
+ //
+ // When processing message fails, `backoff_policy` describes an interval
+ // after which the message should be available to consume again.
+ //
+ // For FIFO messages, the interval should be relatively small because
+ // messages of the same message group would not be readily available utill
+ // the prior one depletes its lifecycle.
+ optional RetryPolicy backoff_policy = 3;
+
+ // Request timeout for RPCs excluding long-polling.
+ optional google.protobuf.Duration request_timeout = 4;
+
+ oneof pub_sub {
+ Publishing publishing = 5;
+
+ Subscription subscription = 6;
+ }
+
+ // User agent details
+ UA user_agent = 7;
+
+ Metric metric = 8;
+}
+
+message TelemetryCommand {
+ optional Status status = 1;
+
+ oneof command {
+ // Client settings
+ Settings settings = 2;
+
+ // These messages are from client.
+ //
+ // Report thread stack trace to server.
+ ThreadStackTrace thread_stack_trace = 3;
+
+ // Report message verify result to server.
+ VerifyMessageResult verify_message_result = 4;
+
+ // There messages are from server.
+ //
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+ // Request client to print thread stack trace.
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+ // Request client to verify the consumption of the appointed message.
+ VerifyMessageCommand verify_message_command = 7;
+ }
+}
+
+message NotifyClientTerminationRequest {
+ // Consumer group, which is absent for producer.
+ optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+ Resource group = 1;
+ Resource topic = 2;
+
+ // Unique receipt handle to identify message to change
+ string receipt_handle = 3;
+
+ // New invisible duration
+ google.protobuf.Duration invisible_duration = 4;
+
+ // For message tracing
+ string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+ Status status = 1;
+
+ // Server may generate a new receipt handle for the message.
+ string receipt_handle = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+
+ // Queries the route entries of the requested topic in the perspective of the
+ // given endpoints. On success, servers should return a collection of
+ // addressable message-queues. Note servers may return customized route
+ // entries based on endpoints provided.
+ //
+ // If the requested topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
+
+ // Producer or consumer sends HeartbeatRequest to servers periodically to
+ // keep-alive. Additionally, it also reports client-side configuration,
+ // including topic subscription, load-balancing group name, etc.
+ //
+ // Returns `OK` if success.
+ //
+ // If a client specifies a language that is not yet supported by servers,
+ // returns `INVALID_ARGUMENT`
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+
+ // Delivers messages to brokers.
+ // Clients may further:
+ // 1. Refine a message destination to message-queues which fulfills parts of
+ // FIFO semantic;
+ // 2. Flag a message as transactional, which keeps it invisible to consumers
+ // until it commits;
+ // 3. Time a message, making it invisible to consumers till specified
+ // time-point;
+ // 4. And more...
+ //
+ // Returns message-id or transaction-id with status `OK` on success.
+ //
+ // If the destination topic doesn't exist, returns `NOT_FOUND`.
+ rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+
+ // Queries the assigned route info of a topic for current consumer,
+ // the returned assignment result is decided by server-side load balancer.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+ }
+
+ // Receives messages from the server in batch manner, returns a set of
+ // messages if success. The received messages should be acked or redelivered
+ // after processed.
+ //
+ // If the pending concurrent receive requests exceed the quota of the given
+ // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+ // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+ // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+ // message in the specific topic, returns `OK` with an empty message set.
+ // Please note that client may suffer from false empty responses.
+ //
+ // If failed to receive message from remote, server must return only one
+ // `ReceiveMessageResponse` as the reply to the request, whose `Status` indicates
+ // the specific reason of failure, otherwise, the reply is considered successful.
+ rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {
+ }
+
+ // Acknowledges the message associated with the `receipt_handle` or `offset`
+ // in the `AckMessageRequest`, it means the message has been successfully
+ // processed. Returns `OK` if the message server remove the relevant message
+ // successfully.
+ //
+ // If the given receipt_handle is illegal or out of date, returns
+ // `INVALID_ARGUMENT`.
+ rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+
+ // Forwards one message to dead letter queue if the max delivery attempts is
+ // exceeded by this message at client-side, return `OK` if success.
+ rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+ returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+ // Commits or rollback one transactional message.
+ rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+
+ // Once a client starts, it would immediately establishes bi-lateral stream
+ // RPCs with brokers, reporting its settings as the initiative command.
+ //
+ // When servers have need of inspecting client status, they would issue
+ // telemetry commands to clients. After executing received instructions,
+ // clients shall report command execution results through client-side streams.
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
+
+ // Notify the server that the client is terminated.
+ rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+ }
+
+ // Once a message is retrieved from consume queue on behalf of the group, it
+ // will be kept invisible to other clients of the same group for a period of
+ // time. The message is supposed to be processed within the invisible
+ // duration. If the client, which is in charge of the invisible message, is
+ // not capable of processing the message timely, it may use
+ // ChangeInvisibleDuration to lengthen invisible duration.
+ rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) {
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
index 9a1b66d..7d258b4 100644
--- a/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -16,58 +16,59 @@
*/
using System;
using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public class PublishLoadBalancer
{
public PublishLoadBalancer(TopicRouteData route)
{
- this.partitions = new List<Partition>();
- foreach (var partition in route.Partitions)
+ this._messageQueues = new List<rmq::MessageQueue>();
+ foreach (var messageQueue in route.MessageQueues)
{
- if (Permission.NONE == partition.Permission)
+ if (rmq::Permission.Unspecified == messageQueue.Permission)
{
continue;
}
- if (Permission.READ == partition.Permission)
+ if (rmq::Permission.Read == messageQueue.Permission)
{
continue;
}
- this.partitions.Add(partition);
+ this._messageQueues.Add(messageQueue);
}
- this.partitions.Sort();
+ this._messageQueues.Sort(Utilities.CompareMessageQueue);
Random random = new Random();
- this.roundRobinIndex = random.Next(0, this.partitions.Count);
+ this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
}
- public void update(TopicRouteData route)
+ public void Update(TopicRouteData route)
{
- List<Partition> partitions = new List<Partition>();
- foreach (var partition in route.Partitions)
+ List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
+ foreach (var partition in route.MessageQueues)
{
- if (Permission.NONE == partition.Permission)
+ if (rmq::Permission.Unspecified == partition.Permission)
{
continue;
}
- if (Permission.READ == partition.Permission)
+ if (rmq::Permission.Read == partition.Permission)
{
continue;
}
partitions.Add(partition);
}
partitions.Sort();
- this.partitions = partitions;
+ this._messageQueues = partitions;
}
/**
* Accept a partition iff its broker is different.
*/
- private bool accept(List<Partition> existing, Partition partition)
+ private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
{
if (0 == existing.Count)
{
@@ -76,7 +77,7 @@
foreach (var item in existing)
{
- if (item.Broker.Equals(partition.Broker))
+ if (item.Broker.Equals(messageQueue.Broker))
{
return false;
}
@@ -84,22 +85,22 @@
return true;
}
- public List<Partition> select(int maxAttemptTimes)
+ public List<rmq::MessageQueue> Select(int maxAttemptTimes)
{
- List<Partition> result = new List<Partition>();
+ List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
- List<Partition> all = this.partitions;
+ List<rmq::MessageQueue> all = this._messageQueues;
if (0 == all.Count)
{
return result;
}
- int start = ++roundRobinIndex;
+ int start = ++_roundRobinIndex;
int found = 0;
for (int i = 0; i < all.Count; i++)
{
int idx = ((start + i) & int.MaxValue) % all.Count;
- if (accept(result, all[idx]))
+ if (Accept(result, all[idx]))
{
result.Add(all[idx]);
if (++found >= maxAttemptTimes)
@@ -112,8 +113,8 @@
return result;
}
- private List<Partition> partitions;
+ private List<rmq::MessageQueue> _messageQueues;
- private int roundRobinIndex;
+ private int _roundRobinIndex;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/Publishing.cs
similarity index 75%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/Publishing.cs
index 568098f..ffedd17 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/Publishing.cs
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-namespace org.apache.rocketmq
+using rmq = Apache.Rocketmq.V2;
+using System.Collections.Generic;
+
+namespace Org.Apache.Rocketmq
{
- public interface INameServerResolver
+ // Settings for publishing
+ public class Publishing
{
- Task<List<string>> resolveAsync();
+ public List<rmq::Resource> Topics { get; set; }
+ public int CompressBodyThreshold { get; set; }
+
+ public int MaxBodySize { get; set; }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..cc30943
--- /dev/null
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+ public class PushConsumer : Client, IConsumer
+ {
+ public PushConsumer(AccessPoint accessPoint, string resourceNamespace, string group) : base(accessPoint, resourceNamespace)
+ {
+ _group = group;
+ _topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
+ _topicAssignmentsMap = new ConcurrentDictionary<string, List<rmq::Assignment>>();
+ _processQueueMap = new ConcurrentDictionary<rmq::Assignment, ProcessQueue>();
+ _scanAssignmentCTS = new CancellationTokenSource();
+ _scanExpiredProcessQueueCTS = new CancellationTokenSource();
+ }
+
+ public override async Task Start()
+ {
+ if (null == _messageListener)
+ {
+ throw new System.Exception("Bad configuration: message listener is required");
+ }
+
+ await base.Start();
+
+ // Step-1: Resolve topic routes
+ List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
+ foreach (var item in _topicFilterExpressionMap)
+ {
+ queryRouteTasks.Add(GetRouteFor(item.Key, true));
+ }
+ Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
+
+ // Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
+ await Heartbeat();
+
+ // Step-3: Scan load assignments that are assigned to current client
+ schedule(async () =>
+ {
+ await scanLoadAssignments();
+ }, 10, _scanAssignmentCTS.Token);
+
+ schedule(() =>
+ {
+ ScanExpiredProcessQueue();
+ }, 10, _scanExpiredProcessQueueCTS.Token);
+ }
+
+ public override async Task Shutdown()
+ {
+ _scanAssignmentCTS.Cancel();
+ _scanExpiredProcessQueueCTS.Cancel();
+
+ // Shutdown resources of derived class
+ await base.Shutdown();
+ }
+
+ private async Task scanLoadAssignments()
+ {
+ Logger.Debug("Start to scan load assignments from server");
+ List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq::Assignment>>>();
+ foreach (var item in _topicFilterExpressionMap)
+ {
+ tasks.Add(scanLoadAssignment(item.Key, _group));
+ }
+ var result = await Task.WhenAll(tasks);
+
+ foreach (var assignments in result)
+ {
+ if (assignments.Count == 0)
+ {
+ continue;
+ }
+
+ checkAndUpdateAssignments(assignments);
+ }
+ Logger.Debug("Completed scanning load assignments");
+ }
+
+ private void ScanExpiredProcessQueue()
+ {
+ foreach (var item in _processQueueMap)
+ {
+ if (item.Value.Expired())
+ {
+ Task.Run(async () =>
+ {
+ await ExecutePop0(item.Key);
+ });
+ }
+ }
+ }
+
+ private void checkAndUpdateAssignments(List<rmq::Assignment> assignments)
+ {
+ if (assignments.Count == 0)
+ {
+ return;
+ }
+
+ string topic = assignments[0].MessageQueue.Topic.Name;
+
+ // Compare to generate or cancel pop-cycles
+ List<rmq::Assignment> existing;
+ _topicAssignmentsMap.TryGetValue(topic, out existing);
+
+ foreach (var assignment in assignments)
+ {
+ if (null == existing || !existing.Contains(assignment))
+ {
+ ExecutePop(assignment);
+ }
+ }
+
+ if (null != existing)
+ {
+ foreach (var assignment in existing)
+ {
+ if (!assignments.Contains(assignment))
+ {
+ Logger.Info($"Stop receiving messages from {assignment.MessageQueue.ToString()}");
+ CancelPop(assignment);
+ }
+ }
+ }
+
+ }
+
+ private void ExecutePop(rmq::Assignment assignment)
+ {
+ var processQueue = new ProcessQueue();
+ if (_processQueueMap.TryAdd(assignment, processQueue))
+ {
+ Task.Run(async () =>
+ {
+ await ExecutePop0(assignment);
+ });
+ }
+ }
+
+ private async Task ExecutePop0(rmq::Assignment assignment)
+ {
+ Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
+ while (true)
+ {
+ try
+ {
+ ProcessQueue processQueue;
+ if (!_processQueueMap.TryGetValue(assignment, out processQueue))
+ {
+ break;
+ }
+
+ if (processQueue.Dropped)
+ {
+ break;
+ }
+
+ List<Message> messages = await base.ReceiveMessage(assignment, _group);
+ processQueue.LastReceiveTime = System.DateTime.UtcNow;
+
+ // TODO: cache message and dispatch them
+
+ List<Message> failed = new List<Message>();
+ await _messageListener.Consume(messages, failed);
+
+ foreach (var message in failed)
+ {
+ await base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+ }
+
+ foreach (var message in messages)
+ {
+ if (!failed.Contains(message))
+ {
+ bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+ if (!success)
+ {
+ //TODO: log error.
+ }
+ }
+ }
+ }
+ catch (System.Exception)
+ {
+ // TODO: log exception raised.
+ }
+
+
+ }
+ }
+
+ private void CancelPop(rmq::Assignment assignment)
+ {
+ if (!_processQueueMap.ContainsKey(assignment))
+ {
+ return;
+ }
+
+ ProcessQueue processQueue;
+ if (_processQueueMap.Remove(assignment, out processQueue))
+ {
+ processQueue.Dropped = true;
+ }
+ }
+
+ protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+ {
+ }
+
+ public void Subscribe(string topic, string expression, ExpressionType type)
+ {
+ var filterExpression = new FilterExpression(expression, type);
+ _topicFilterExpressionMap[topic] = filterExpression;
+
+ }
+
+ public void RegisterListener(IMessageListener listener)
+ {
+ if (null != listener)
+ {
+ _messageListener = listener;
+ }
+ }
+
+ private string _group;
+
+ private ConcurrentDictionary<string, FilterExpression> _topicFilterExpressionMap;
+ private IMessageListener _messageListener;
+
+ private CancellationTokenSource _scanAssignmentCTS;
+
+ private ConcurrentDictionary<string, List<rmq::Assignment>> _topicAssignmentsMap;
+
+ private ConcurrentDictionary<rmq::Assignment, ProcessQueue> _processQueueMap;
+
+ private CancellationTokenSource _scanExpiredProcessQueueCTS;
+
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 0191e91..c1f1cd6 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -15,47 +15,176 @@
* limitations under the License.
*/
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Net.Security;
+using System.Threading;
using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
+using NLog;
-namespace org.apache.rocketmq {
- public class RpcClient : IRpcClient {
- public RpcClient(MessagingService.MessagingServiceClient client) {
- stub = client;
+namespace Org.Apache.Rocketmq
+{
+ public class RpcClient : IRpcClient
+ {
+ protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ private readonly rmq::MessagingService.MessagingServiceClient _stub;
+ private readonly GrpcChannel _channel;
+
+ public RpcClient(string target)
+ {
+ _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+ {
+ HttpHandler = CreateHttpHandler()
+ });
+ var invoker = _channel.Intercept(new ClientLoggerInterceptor());
+ _stub = new rmq::MessagingService.MessagingServiceClient(invoker);
}
- public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions)
+ public async Task Shutdown()
{
- var call = stub.QueryRouteAsync(request, callOptions);
- var response = await call.ResponseAsync;
- var status = call.GetStatus();
- if (status.StatusCode != grpc.StatusCode.OK) {
- //TODO: Something is wrong, raise an exception here.
+ if (null != _channel)
+ {
+ await _channel.ShutdownAsync();
}
- return response;
}
- public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions)
+ /**
+ * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
+ * why parameters are configured this way.
+ */
+ private HttpMessageHandler CreateHttpHandler()
{
- var call = stub.HeartbeatAsync(request, callOptions);
- var response = await call.ResponseAsync;
- return response;
+ var sslOptions = new SslClientAuthenticationOptions();
+ // Disable server certificate validation during development phase.
+ // Comment out the following line if server certificate validation is required.
+ sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+ var handler = new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+ KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+ KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+ EnableMultipleHttp2Connections = true,
+ SslOptions = sslOptions,
+ };
+ return handler;
}
- public async Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions)
+ public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
{
- var call = stub.NotifyClientTerminationAsync(request, callOptions);
- var response = await call.ResponseAsync;
- return response;
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var callOptions = new CallOptions(metadata, deadline);
+ return _stub.Telemetry(callOptions);
}
- public async Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions)
+ public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
{
- var call = stub.SendMessageAsync(request, callOptions);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.QueryRouteAsync(request, callOptions);
return await call.ResponseAsync;
}
- private MessagingService.MessagingServiceClient stub;
+
+ public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.HeartbeatAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request,
+ TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.SendMessageAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata, rmq::QueryAssignmentRequest request,
+ TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.QueryAssignmentAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
+ rmq::ReceiveMessageRequest request, TimeSpan timeout) {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+ var call = _stub.ReceiveMessage(request, callOptions);
+ var result = new List<rmq::ReceiveMessageResponse>();
+ var stream = call.ResponseStream;
+ while (await stream.MoveNext())
+ {
+ var entry = stream.Current;
+ Logger.Debug($"Got ReceiveMessageResponse {entry}");
+ result.Add(entry);
+ }
+ Logger.Debug($"Receiving of messages completed");
+ return result;
+ }
+
+ public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+ TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.AckMessageAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, rmq::ChangeInvisibleDurationRequest request,
+ TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.ChangeInvisibleDurationAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,
+ rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.ForwardMessageToDeadLetterQueueAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata, rmq::EndTransactionRequest request,
+ TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.EndTransactionAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
+ public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+ rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.NotifyClientTerminationAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/SendResult.cs b/rocketmq-client-csharp/SendReceipt.cs
similarity index 79%
rename from rocketmq-client-csharp/SendResult.cs
rename to rocketmq-client-csharp/SendReceipt.cs
index 5967cca..0f29991 100644
--- a/rocketmq-client-csharp/SendResult.cs
+++ b/rocketmq-client-csharp/SendReceipt.cs
@@ -14,27 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace org.apache.rocketmq {
- public sealed class SendResult {
- public SendResult(string messageId) {
+
+namespace Org.Apache.Rocketmq
+{
+ public sealed class SendReceipt
+ {
+ public SendReceipt(string messageId)
+ {
status_ = SendStatus.SEND_OK;
messageId_ = messageId;
}
- public SendResult(string messageId, SendStatus status) {
+ public SendReceipt(string messageId, SendStatus status)
+ {
status_ = status;
messageId_ = messageId;
}
private string messageId_;
- public string MessageId {
+ public string MessageId
+ {
get { return messageId_; }
}
private SendStatus status_;
- public SendStatus Status {
+
+ public SendStatus Status
+ {
get { return status_; }
}
}
diff --git a/rocketmq-client-csharp/SendStatus.cs b/rocketmq-client-csharp/SendStatus.cs
index 8964211..7586d22 100644
--- a/rocketmq-client-csharp/SendStatus.cs
+++ b/rocketmq-client-csharp/SendStatus.cs
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-namespace org.apache.rocketmq {
- public enum SendStatus {
+namespace Org.Apache.Rocketmq
+{
+ public enum SendStatus
+ {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
diff --git a/rocketmq-client-csharp/SequenceGenerator.cs b/rocketmq-client-csharp/SequenceGenerator.cs
index aa92c80..97a1eb9 100644
--- a/rocketmq-client-csharp/SequenceGenerator.cs
+++ b/rocketmq-client-csharp/SequenceGenerator.cs
@@ -17,8 +17,9 @@
using System;
using System.Threading;
using System.Net.NetworkInformation;
+using NLog;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
/**
* See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
@@ -27,6 +28,7 @@
*/
public sealed class SequenceGenerator
{
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
public static SequenceGenerator Instance
{
@@ -94,10 +96,11 @@
{
if (nic.OperationalStatus == OperationalStatus.Up)
{
- if (nic.Name.Equals("lo"))
+ if (nic.Name.StartsWith("lo"))
{
continue;
}
+ Logger.Debug($"NIC={nic.Name}");
return nic.GetPhysicalAddress().GetAddressBytes();
}
}
diff --git a/rocketmq-client-csharp/ServiceAddress.cs b/rocketmq-client-csharp/ServiceAddress.cs
deleted file mode 100644
index 4aab213..0000000
--- a/rocketmq-client-csharp/ServiceAddress.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System.Collections.Generic;
-
-namespace org.apache.rocketmq {
- public sealed class ServiceAddress {
-
- public ServiceAddress(AddressScheme scheme, List<Address> addresses) {
- this.scheme = scheme;
- this.addresses = addresses;
- }
-
- private AddressScheme scheme;
- public AddressScheme Scheme {
- get { return scheme; }
- }
-
- private List<Address> addresses;
- public List<Address> Addresses{
- get { return addresses; }
- }
-
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
new file mode 100644
index 0000000..a6be057
--- /dev/null
+++ b/rocketmq-client-csharp/Session.cs
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using grpc = global::Grpc.Core;
+using NLog;
+using rmq = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class Session
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+ public Session(string target,
+ grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+ IClient client)
+ {
+ this._target = target;
+ this._stream = stream;
+ this._client = client;
+ this._channel = Channel.CreateUnbounded<bool>();
+ }
+
+ public async Task Loop()
+ {
+ var reader = this._stream.ResponseStream;
+ var writer = this._stream.RequestStream;
+ var request = new rmq::TelemetryCommand();
+ request.Settings = new rmq::Settings();
+ _client.BuildClientSetting(request.Settings);
+ await writer.WriteAsync(request);
+ Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
+ while (!_client.TelemetryCts().IsCancellationRequested)
+ {
+ if (await reader.MoveNext(_client.TelemetryCts().Token))
+ {
+ var cmd = reader.Current;
+ Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
+ switch (cmd.CommandCase)
+ {
+ case rmq::TelemetryCommand.CommandOneofCase.None:
+ {
+ Logger.Warn($"Telemetry failed: {cmd.Status}");
+ if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
+ {
+ await _channel.Writer.WriteAsync(false);
+ }
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.Settings:
+ {
+ if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
+ {
+ await _channel.Writer.WriteAsync(true);
+ }
+
+ Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+ _client.OnSettingsReceived(cmd.Settings);
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
+ {
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
+ {
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
+ {
+ break;
+ }
+ }
+ }
+ }
+ Logger.Info("Telemetry stream cancelled");
+ await writer.CompleteAsync();
+ }
+
+ private string _target;
+
+ public string Target
+ {
+ get { return _target; }
+ }
+
+ public async Task AwaitSettingNegotiationCompletion()
+ {
+ if (0 != Interlocked.Read(ref _established))
+ {
+ return;
+ }
+
+ Logger.Debug("Await setting negotiation");
+ await _channel.Reader.ReadAsync();
+ }
+
+ private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+ private IClient _client;
+
+ private long _established = 0;
+
+ private Channel<bool> _channel;
+ };
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index 70e038a..2331b53 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -19,29 +19,38 @@
using grpc = global::Grpc.Core;
using System.Security.Cryptography;
-namespace org.apache.rocketmq {
- public class Signature {
- public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
+namespace Org.Apache.Rocketmq
+{
+ public class Signature
+ {
+ public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+ {
metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
- if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
+ metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
+ if (!String.IsNullOrEmpty(clientConfig.tenantId()))
+ {
metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
}
- if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) {
+ if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
+ {
metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace());
}
string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT);
metadata.Add(MetadataConstants.DATE_TIME_KEY, time);
- if (null != clientConfig.credentialsProvider()) {
+ if (null != clientConfig.credentialsProvider())
+ {
var credentials = clientConfig.credentialsProvider().getCredentials();
- if (null == credentials || credentials.expired()) {
+ if (null == credentials || credentials.expired())
+ {
return;
}
- if (!String.IsNullOrEmpty(credentials.SessionToken)) {
+ if (!String.IsNullOrEmpty(credentials.SessionToken))
+ {
metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken);
}
@@ -50,7 +59,7 @@
HMACSHA1 signer = new HMACSHA1(secretData);
byte[] digest = signer.ComputeHash(data);
string hmac = BitConverter.ToString(digest).Replace("-", "");
- string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
+ string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
MetadataConstants.ALGORITHM_KEY,
MetadataConstants.CREDENTIAL_KEY,
credentials.AccessKey,
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 0000000..154efa0
--- /dev/null
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+using System.Threading;
+using Grpc.Core;
+using System.Collections.Generic;
+using System.Linq;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Org.Apache.Rocketmq
+{
+ public class SimpleConsumer : Client
+ {
+
+ public SimpleConsumer(AccessPoint accessPoint,
+ string resourceNamespace, string group)
+ : base(accessPoint, resourceNamespace)
+ {
+ _fifo = false;
+ _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+ _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+ _group = group;
+ }
+
+ public override void BuildClientSetting(rmq::Settings settings)
+ {
+ base.BuildClientSetting(settings);
+
+ settings.ClientType = rmq::ClientType.SimpleConsumer;
+ settings.Subscription = new rmq::Subscription();
+ settings.Subscription.Group = new rmq::Resource();
+ settings.Subscription.Group.Name = _group;
+ settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+
+ foreach (var kv in _subscriptions)
+ {
+ settings.Subscription.Subscriptions.Add(kv.Value);
+ }
+ }
+
+ public override async Task Start()
+ {
+ await base.Start();
+
+ // Scan load assignment periodically
+ schedule(async () =>
+ {
+ while (!_scanAssignmentCts.IsCancellationRequested)
+ {
+ await ScanLoadAssignments();
+ }
+ }, 30, _scanAssignmentCts.Token);
+
+ await ScanLoadAssignments();
+ }
+
+ public override async Task Shutdown()
+ {
+ await base.Shutdown();
+ if (!await NotifyClientTermination())
+ {
+ Logger.Warn("Failed to NotifyClientTermination");
+ }
+ }
+
+ private async Task ScanLoadAssignments()
+ {
+
+ List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
+ List<string> topics = new List<string>();
+ foreach (var sub in _subscriptions)
+ {
+ var request = new rmq::QueryAssignmentRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = ResourceNamespace;
+ request.Topic.Name = sub.Key;
+ topics.Add(sub.Key);
+ request.Group = new rmq::Resource();
+ request.Group.Name = _group;
+ request.Group.ResourceNamespace = ResourceNamespace;
+
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Host = _accessPoint.Host;
+ address.Port = _accessPoint.Port;
+ request.Endpoints.Addresses.Add(address);
+
+ var metadata = new Metadata();
+ Signature.sign(this, metadata);
+ tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+ }
+
+ List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
+
+ var i = 0;
+ foreach (var assignments in list)
+ {
+ string topic = topics[i];
+ if (null == assignments || 0 == assignments.Count)
+ {
+ Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
+ ++i;
+ continue;
+ }
+ Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
+ _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
+ ++i;
+ }
+ }
+
+ protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+ {
+ request.ClientType = rmq::ClientType.SimpleConsumer;
+ request.Group = new rmq::Resource();
+ request.Group.Name = _group;
+ request.Group.ResourceNamespace = ResourceNamespace;
+ }
+
+ public void Subscribe(string topic, rmq::FilterType filterType, string expression)
+ {
+ var entry = new rmq::SubscriptionEntry();
+ entry.Topic = new rmq::Resource();
+ entry.Topic.Name = topic;
+ entry.Topic.ResourceNamespace = ResourceNamespace;
+ entry.Expression = new rmq::FilterExpression();
+ entry.Expression.Type = filterType;
+ entry.Expression.Expression = expression;
+ _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
+ AddTopicOfInterest(topic);
+ }
+
+ public override void OnSettingsReceived(rmq.Settings settings)
+ {
+ base.OnSettingsReceived(settings);
+
+ if (settings.Subscription.Fifo)
+ {
+ _fifo = true;
+ Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
+ }
+ }
+
+ public async Task<List<Message>> Receive(int batchSize, TimeSpan timeout)
+ {
+ var messageQueue = NextQueue();
+ if (null == messageQueue)
+ {
+ Logger.Debug("NextQueue returned null");
+ return new List<Message>();
+ }
+
+ var request = new rmq.ReceiveMessageRequest();
+ request.Group = new rmq.Resource();
+ request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group.Name = _group;
+
+ request.MessageQueue = new rmq.MessageQueue();
+ request.MessageQueue.MergeFrom(messageQueue);
+ request.BatchSize = batchSize;
+
+ // Client is responsible of extending message invisibility duration
+ request.AutoRenew = false;
+
+ var targetUrl = Utilities.TargetUrl(messageQueue);
+ var metadata = new Metadata();
+ Signature.sign(this, metadata);
+
+ return await Manager.ReceiveMessage(targetUrl, metadata, request, timeout);
+ }
+
+
+ public async Task Ack(Message message)
+ {
+ var request = new rmq.AckMessageRequest();
+ request.Group = new rmq.Resource();
+ request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group.Name = _group;
+
+ request.Topic = new rmq.Resource();
+ request.Topic.ResourceNamespace = ResourceNamespace;
+ request.Topic.Name = message.Topic;
+
+ var entry = new rmq.AckMessageEntry();
+ request.Entries.Add(entry);
+ entry.MessageId = message.MessageId;
+ entry.ReceiptHandle = message._receiptHandle;
+
+ var targetUrl = message._sourceHost;
+ var metadata = new Metadata();
+ Signature.sign(this, metadata);
+ await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+ }
+
+ public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
+ {
+ var request = new rmq.ChangeInvisibleDurationRequest();
+ request.Group = new rmq.Resource();
+ request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group.Name = _group;
+
+ request.Topic = new rmq.Resource();
+ request.Topic.ResourceNamespace = ResourceNamespace;
+ request.Topic.Name = message.Topic;
+
+ request.ReceiptHandle = message._receiptHandle;
+ request.MessageId = message.MessageId;
+
+ request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
+
+ var targetUrl = message._sourceHost;
+ var metadata = new Metadata();
+ Signature.sign(this, metadata);
+ await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
+ }
+
+ private rmq.MessageQueue NextQueue()
+ {
+ if (_topicAssignments.IsEmpty)
+ {
+ return null;
+ }
+
+ UInt32 topicSeq = CurrentTopicSequence.Value;
+ CurrentTopicSequence.Value = topicSeq + 1;
+
+ var total = _topicAssignments.Count;
+ var topicIndex = topicSeq % total;
+ var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
+
+ UInt32 queueSeq = CurrentQueueSequence.Value;
+ CurrentQueueSequence.Value = queueSeq + 1;
+ List<rmq.Assignment> assignments;
+ if (_topicAssignments.TryGetValue(topic, out assignments))
+ {
+ if (null == assignments)
+ {
+ return null;
+ }
+ var idx = queueSeq % assignments.Count;
+ return assignments[(int)idx].MessageQueue;
+
+ }
+
+ return null;
+ }
+
+ private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
+ private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
+
+ private readonly string _group;
+ private bool _fifo;
+ private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
+ private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+ private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticCredentialsProvider.cs b/rocketmq-client-csharp/StaticCredentialsProvider.cs
index 301613b..edd810d 100644
--- a/rocketmq-client-csharp/StaticCredentialsProvider.cs
+++ b/rocketmq-client-csharp/StaticCredentialsProvider.cs
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace org.apache.rocketmq {
- public class StaticCredentialsProvider : ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+ public class StaticCredentialsProvider : ICredentialsProvider
+ {
- public StaticCredentialsProvider(string accessKey, string accessSecret) {
+ public StaticCredentialsProvider(string accessKey, string accessSecret)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
}
- public Credentials getCredentials() {
+ public Credentials getCredentials()
+ {
return new Credentials(accessKey, accessSecret);
}
diff --git a/rocketmq-client-csharp/Topic.cs b/rocketmq-client-csharp/Topic.cs
index dcc7100..f1ae453 100644
--- a/rocketmq-client-csharp/Topic.cs
+++ b/rocketmq-client-csharp/Topic.cs
@@ -17,50 +17,68 @@
using System;
-namespace org.apache.rocketmq {
- public class Topic : IComparable<Topic>, IEquatable<Topic> {
- public Topic(string resource_namespace, string name) {
- resourceNamespace = resource_namespace;
- this.name = name;
+namespace Org.Apache.Rocketmq
+{
+ public class Topic : IComparable<Topic>, IEquatable<Topic>
+ {
+ public Topic(string resourceNamespace, string name)
+ {
+ ResourceNamespace = resourceNamespace;
+ Name = name;
}
- private string resourceNamespace;
- public string ResourceNamespace {
- get { return resourceNamespace; }
- }
+ public string ResourceNamespace { get; }
+ public string Name { get; }
- private string name;
- public string Name {
- get { return name; }
- }
-
- public int CompareTo(Topic other) {
- if (0 != resourceNamespace.CompareTo(other.resourceNamespace)) {
- return resourceNamespace.CompareTo(other.resourceNamespace);
- }
-
- if (0 != name.CompareTo(other.name)) {
- return name.CompareTo(other.name);
- }
-
- return 0;
- }
-
- public bool Equals(Topic other) {
- return resourceNamespace.Equals(other.resourceNamespace) && name.Equals(other.name);
- }
-
- public override bool Equals(Object other) {
- if (!(other is Topic)) {
+ public bool Equals(Topic other)
+ {
+ if (ReferenceEquals(null, other))
+ {
return false;
}
- return Equals(other as Topic);
+
+ if (ReferenceEquals(this, other))
+ {
+ return true;
+ }
+
+ return ResourceNamespace == other.ResourceNamespace && Name == other.Name;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj) || obj.GetType() != GetType())
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ return Equals((Topic)obj);
}
public override int GetHashCode()
{
- return HashCode.Combine(resourceNamespace, name);
+ return HashCode.Combine(ResourceNamespace, Name);
}
+ public int CompareTo(Topic other)
+ {
+ if (ReferenceEquals(null, other))
+ {
+ return -1;
+ }
+
+ var compareTo = String.CompareOrdinal(ResourceNamespace, other.ResourceNamespace);
+ if (0 == compareTo)
+ {
+ compareTo = String.CompareOrdinal(Name, other.Name);
+ }
+
+ return compareTo;
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/TopicRouteData.cs b/rocketmq-client-csharp/TopicRouteData.cs
index a860669..e4aa04c 100644
--- a/rocketmq-client-csharp/TopicRouteData.cs
+++ b/rocketmq-client-csharp/TopicRouteData.cs
@@ -14,43 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq {
-
- public class TopicRouteData : IEquatable<TopicRouteData> {
-
- public TopicRouteData(List<Partition> partitions) {
- this.partitions = partitions;
- this.partitions.Sort();
- }
-
- private List<Partition> partitions;
-
- public List<Partition> Partitions {
- get { return partitions; }
- }
-
- public bool Equals(TopicRouteData other) {
- return partitions.Equals(other.partitions);
- }
-
- public override bool Equals(object other)
+namespace Org.Apache.Rocketmq
+{
+ public class TopicRouteData : IEquatable<TopicRouteData>
+ {
+ public TopicRouteData(List<rmq::MessageQueue> partitions)
{
+ _messageQueues = partitions;
- if (!(other is TopicRouteData)) {
- return false;
- }
+ _messageQueues.Sort(Utilities.CompareMessageQueue);
+ }
- return Equals(other as TopicRouteData);
+ private List<rmq::MessageQueue> _messageQueues;
+ public List<rmq::MessageQueue> MessageQueues { get { return _messageQueues; } }
+
+ public bool Equals(TopicRouteData other)
+ {
+ if (ReferenceEquals(null, other)) return false;
+ if (ReferenceEquals(this, other)) return true;
+ return Equals(_messageQueues, other._messageQueues);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((TopicRouteData)obj);
}
public override int GetHashCode()
{
- return HashCode.Combine(partitions);
+ return (_messageQueues != null ? _messageQueues.GetHashCode() : 0);
}
-
}
-
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/TopicRouteException.cs b/rocketmq-client-csharp/TopicRouteException.cs
index b520e72..75462fd 100644
--- a/rocketmq-client-csharp/TopicRouteException.cs
+++ b/rocketmq-client-csharp/TopicRouteException.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public class TopicRouteException : Exception
{
diff --git a/rocketmq-client-csharp/Utilities.cs b/rocketmq-client-csharp/Utilities.cs
index 1834a77..23ed8db 100644
--- a/rocketmq-client-csharp/Utilities.cs
+++ b/rocketmq-client-csharp/Utilities.cs
@@ -19,8 +19,10 @@
using System.Linq;
using System.Net.NetworkInformation;
using System.Text;
+using System;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
public static class Utilities
{
@@ -49,5 +51,29 @@
return result.ToString();
}
+
+ public static string TargetUrl(rmq::MessageQueue messageQueue)
+ {
+ // TODO: Assert associated broker has as least one service endpoint.
+ var serviceEndpoint = messageQueue.Broker.Endpoints.Addresses[0];
+ return $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+ }
+
+ public static int CompareMessageQueue(rmq::MessageQueue lhs, rmq::MessageQueue rhs)
+ {
+ int topic_comparison = String.Compare(lhs.Topic.ResourceNamespace + lhs.Topic.Name, rhs.Topic.ResourceNamespace + rhs.Topic.Name);
+ if (topic_comparison != 0)
+ {
+ return topic_comparison;
+ }
+
+ int broker_name_comparison = String.Compare(lhs.Broker.Name, rhs.Broker.Name);
+ if (0 != broker_name_comparison)
+ {
+ return broker_name_comparison;
+ }
+
+ return lhs.Id < rhs.Id ? -1 : (lhs.Id == rhs.Id ? 0 : 1);
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 24cc710..baf103f 100644
--- a/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -6,25 +6,29 @@
<Authors>Zhanhui Li</Authors>
<Company>Apache Software Foundation</Company>
<TargetFramework>net5.0</TargetFramework>
- <RootNamespace>org.apache.rocketmq</RootNamespace>
+ <RootNamespace>Org.Apache.Rocketmq</RootNamespace>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Crc32.NET" Version="1.2.0" />
<PackageReference Include="Google.Protobuf" Version="3.19.4" />
- <PackageReference Include="Grpc.Net.Client" Version="2.42.0" />
+ <PackageReference Include="Grpc.Net.Client" Version="2.43.0" />
<PackageReference Include="Grpc.Tools" Version="2.43.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="NLog" Version="4.7.13" />
+ <PackageReference Include="OpenTelemetry" Version="1.3.0" />
+ <PackageReference Include="OpenTelemetry.Api" Version="1.3.0" />
+ <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.0" />
- <Protobuf Include="Protos\apache\rocketmq\v1\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />
+ <Protobuf Include="Protos\apache\rocketmq\v2\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />
<Protobuf Include="Protos\google\rpc\code.proto" ProtoRoot="Protos" GrpcServices="Client" />
<Protobuf Include="Protos\google\rpc\error_details.proto" ProtoRoot="Protos" GrpcServices="Client" />
<Protobuf Include="Protos\google\rpc\status.proto" ProtoRoot="Protos" GrpcServices="Client" />
- <Protobuf Include="Protos\apache\rocketmq\v1\service.proto" ProtoRoot="Protos" GrpcServices="Client">
- <Link>Protos\apache\rocketmq\v1\definition.proto</Link>
+ <Protobuf Include="Protos\apache\rocketmq\v2\service.proto" ProtoRoot="Protos" GrpcServices="Client">
+ <Link>Protos\apache\rocketmq\v2\definition.proto</Link>
<Link>Protos\google\rpc\status.proto</Link>
<Link>Protos\google\rpc\error_details.proto</Link>
</Protobuf>
diff --git a/tests/BrokerTest.cs b/tests/BrokerTest.cs
deleted file mode 100644
index 8de89d5..0000000
--- a/tests/BrokerTest.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
-
-namespace org.apache.rocketmq {
- [TestClass]
- public class BrokerTest {
-
- [TestMethod]
- public void testCompareTo() {
- var b1 = new Broker("b1", 0, null);
- var b2 = new Broker("b1", 1, null);
- Assert.AreEqual(b1.CompareTo(b2), -1);
- }
-
- [TestMethod]
- public void testEquals() {
- var b1 = new Broker("b1", 0, null);
- var b2 = new Broker("b1", 0, null);
- Assert.AreEqual(b1, b2, "Equals method should be employed to test equality");
- }
-
- }
-}
\ No newline at end of file
diff --git a/tests/ClientConfigTest.cs b/tests/ClientConfigTest.cs
index c6d83cf..4d8dec1 100644
--- a/tests/ClientConfigTest.cs
+++ b/tests/ClientConfigTest.cs
@@ -17,11 +17,14 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ClientConfigTest {
+ public class ClientConfigTest
+ {
[TestMethod]
- public void testClientId() {
+ public void testClientId()
+ {
var clientConfig = new ClientConfig();
string clientId = clientConfig.clientId();
Assert.IsTrue(clientId.Contains("@"));
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
index 0f8bff7..af5983c 100644
--- a/tests/ClientManagerTest.cs
+++ b/tests/ClientManagerTest.cs
@@ -15,19 +15,20 @@
* limitations under the License.
*/
using System;
+using Grpc.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using rmq = apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-using System.Threading;
-using System.Threading.Tasks;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ClientManagerTest {
-
+ public class ClientManagerTest
+ {
+
[TestMethod]
- public void testResolveRoute() {
+ public void TestResolveRoute()
+ {
string topic = "cpp_sdk_standard";
string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
var request = new rmq::QueryRouteRequest();
@@ -41,7 +42,7 @@
address.Port = 80;
request.Endpoints.Addresses.Add(address);
- var metadata = new grpc::Metadata();
+ var metadata = new Metadata();
var clientConfig = new ClientConfig();
var credentialsProvider = new ConfigFileCredentialsProvider();
clientConfig.CredentialsProvider = credentialsProvider;
@@ -50,7 +51,7 @@
Signature.sign(clientConfig, metadata);
var clientManager = new ClientManager();
string target = "https://116.62.231.199:80";
- var topicRouteData = clientManager.resolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ var topicRouteData = clientManager.ResolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
Console.WriteLine(topicRouteData);
}
}
diff --git a/tests/ConfigFileCredentialsProviderTest.cs b/tests/ConfigFileCredentialsProviderTest.cs
index f94d364..7741295 100644
--- a/tests/ConfigFileCredentialsProviderTest.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -18,11 +18,14 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ConfigFileCredentialsProviderTest {
+ public class ConfigFileCredentialsProviderTest
+ {
[TestMethod]
- public void testGetCredentials() {
+ public void testGetCredentials()
+ {
var provider = new ConfigFileCredentialsProvider();
var credentials = provider.getCredentials();
Assert.IsNotNull(credentials);
diff --git a/tests/DateTimeTest.cs b/tests/DateTimeTest.cs
index 568d59e..fdf7d53 100644
--- a/tests/DateTimeTest.cs
+++ b/tests/DateTimeTest.cs
@@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace org.apache.rocketmq {
-
+namespace Org.Apache.Rocketmq
+{
+
[TestClass]
- public class DateTimeTest {
-
+ public class DateTimeTest
+ {
+
[TestMethod]
- public void testFormat() {
+ public void testFormat()
+ {
DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56);
- string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
+ string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
string expected = "20220215T083156Z";
Assert.AreEqual(time, expected);
}
diff --git a/tests/MessageIdGeneratorTest.cs b/tests/MessageIdGeneratorTest.cs
index 6ed34d6..c98e113 100644
--- a/tests/MessageIdGeneratorTest.cs
+++ b/tests/MessageIdGeneratorTest.cs
@@ -16,7 +16,7 @@
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
namespace tests
{
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index 3dd7f4b..f1c71f8 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -19,12 +19,15 @@
using System.Text;
using System.Collections.Generic;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class MessageTest {
+ public class MessageTest
+ {
[TestMethod]
- public void testCtor() {
+ public void testCtor()
+ {
var msg1 = new Message();
Assert.IsNotNull(msg1.MessageId);
Assert.IsTrue(msg1.MessageId.StartsWith("01"));
@@ -36,7 +39,8 @@
}
[TestMethod]
- public void testCtor2() {
+ public void testCtor2()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -49,7 +53,8 @@
}
[TestMethod]
- public void testCtor3() {
+ public void testCtor3()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -63,7 +68,8 @@
}
[TestMethod]
- public void testCtor4() {
+ public void testCtor4()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -81,7 +87,8 @@
}
[TestMethod]
- public void testCtor5() {
+ public void testCtor5()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
diff --git a/tests/MqLogManagerTest.cs b/tests/MqLogManagerTest.cs
index 71be3f5..4d163b2 100644
--- a/tests/MqLogManagerTest.cs
+++ b/tests/MqLogManagerTest.cs
@@ -1,7 +1,7 @@
using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using NLog;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
namespace tests
{
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 961b167..663980a 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -15,60 +15,173 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
-namespace org.apache.rocketmq
+
+namespace tests
{
-
[TestClass]
public class ProducerTest
{
+ private static AccessPoint _accessPoint;
+
[ClassInitialize]
public static void SetUp(TestContext context)
{
- List<string> nameServerAddress = new List<string>();
- nameServerAddress.Add(string.Format("{0}:{1}", host, port));
- resolver = new StaticNameServerResolver(nameServerAddress);
-
- credentialsProvider = new ConfigFileCredentialsProvider();
+ _accessPoint = new AccessPoint
+ {
+ Host = HOST,
+ Port = PORT
+ };
}
[ClassCleanup]
public static void TearDown()
{
-
}
-
[TestMethod]
- public void testSendMessage()
+ public async Task TestLifecycle()
{
- var producer = new Producer(resolver, resourceNamespace);
- producer.ResourceNamespace = resourceNamespace;
+ var producer = new Producer(_accessPoint, resourceNamespace);
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
- producer.start();
+ await producer.Start();
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendStandardMessage()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
var msg = new Message(topic, body);
- var sendResult = producer.send(msg).GetAwaiter().GetResult();
+
+ // Tag the massage. A message has at most one tag.
+ msg.Tag = "Tag-0";
+
+ // Associate the message with one or multiple keys
+ var keys = new List<string>();
+ keys.Add("k1");
+ keys.Add("k2");
+ msg.Keys = keys;
+
+ var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
- producer.shutdown();
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendMultipleMessages()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ for (var i = 0; i < 128; i++)
+ {
+ var msg = new Message(topic, body);
+
+ // Tag the massage. A message has at most one tag.
+ msg.Tag = "Tag-0";
+
+ // Associate the message with one or multiple keys
+ var keys = new List<string>();
+ keys.Add("k1");
+ keys.Add("k2");
+ msg.Keys = keys;
+ var sendResult = await producer.Send(msg);
+ Assert.IsNotNull(sendResult);
+ }
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendFifoMessage()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+
+ // Messages of the same group will get delivered one after another.
+ msg.MessageGroup = "message-group-0";
+
+ // Verify messages are FIFO iff their message group is not null or empty.
+ Assert.IsTrue(msg.Fifo());
+
+ var sendResult = await producer.Send(msg);
+ Assert.IsNotNull(sendResult);
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendScheduledMessage()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+
+ msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ Assert.IsTrue(msg.Scheduled());
+
+ var sendResult = await producer.Send(msg);
+ Assert.IsNotNull(sendResult);
+ await producer.Shutdown();
+ }
+
+
+ /**
+ * Trying send a message that is both FIFO and Scheduled should fail.
+ */
+ [TestMethod]
+ public async Task TestSendMessage_Failure()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+ msg.MessageGroup = "Group-0";
+ msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ Assert.IsTrue(msg.Scheduled());
+
+ try
+ {
+ await producer.Send(msg);
+ Assert.Fail("Should have raised an exception");
+ }
+ catch (MessageException e)
+ {
+ }
+ await producer.Shutdown();
}
- private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+ private static string resourceNamespace = "";
private static string topic = "cpp_sdk_standard";
- private static string clientId = "C001";
- private static string group = "GID_cpp_sdk_standard";
-
- private static INameServerResolver resolver;
- private static ICredentialsProvider credentialsProvider;
- private static string host = "116.62.231.199";
- private static int port = 80;
+ private static string HOST = "127.0.0.1";
+ private static int PORT = 8081;
}
}
\ No newline at end of file
diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs
new file mode 100644
index 0000000..78f01de
--- /dev/null
+++ b/tests/PushConsumerTest.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+using System;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+
+ public class TestMessageListener : IMessageListener
+ {
+ public Task Consume(List<Message> messages, List<Message> failed)
+ {
+ foreach (var message in messages)
+ {
+ Console.WriteLine("");
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+
+ public class CountableMessageListener : IMessageListener
+ {
+ public Task Consume(List<Message> messages, List<Message> failed)
+ {
+ foreach (var message in messages)
+ {
+ Console.WriteLine("{}", message.MessageId);
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+
+ [TestClass]
+ public class PushConsumerTest
+ {
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ credentialsProvider = new ConfigFileCredentialsProvider();
+
+ }
+
+ [ClassCleanup]
+ public static void TearDown()
+ {
+
+ }
+
+ [TestInitialize]
+ public void SetUp()
+ {
+ accessPoint = new AccessPoint();
+ accessPoint.Host = host;
+ accessPoint.Port = port;
+ }
+
+ [TestMethod]
+ public void testLifecycle()
+ {
+ var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+ consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ consumer.Region = "cn-hangzhou-pre";
+ consumer.Subscribe(topic, "*", ExpressionType.TAG);
+ consumer.RegisterListener(new TestMessageListener());
+ consumer.Start();
+
+ consumer.Shutdown();
+ }
+
+
+ // [Ignore]
+ [TestMethod]
+ public void testConsumeMessage()
+ {
+ var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+ consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ consumer.Region = "cn-hangzhou-pre";
+ consumer.Subscribe(topic, "*", ExpressionType.TAG);
+ consumer.RegisterListener(new CountableMessageListener());
+ consumer.Start();
+ System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
+ consumer.Shutdown();
+ }
+
+
+ private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+ private static string topic = "cpp_sdk_standard";
+
+ private static string group = "GID_cpp_sdk_standard";
+
+ private static ICredentialsProvider credentialsProvider;
+ private static string host = "116.62.231.199";
+ private static int port = 80;
+
+ private AccessPoint accessPoint;
+
+ }
+
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 5425973..a1ecf82 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -15,149 +15,132 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
-using Grpc.Net.Client;
-using rmq = global::apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
using System;
-using pb = global::Google.Protobuf;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using grpc = Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
+
[TestClass]
public class RpcClientTest
{
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
+ [TestMethod]
+ public async Task testTelemetry()
{
- string target = string.Format("https://{0}:{1}", host, port);
- var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+ Console.WriteLine("Test Telemetry streaming");
+ string target = "https://11.166.42.94:8081";
+ var rpc_client = new RpcClient(target);
+ var client_config = new ClientConfig();
+ var metadata = new grpc::Metadata();
+ Signature.sign(client_config, metadata);
+
+ var cmd = new rmq::TelemetryCommand();
+ cmd.Settings = new rmq::Settings();
+ cmd.Settings.ClientType = rmq::ClientType.Producer;
+ cmd.Settings.AccessPoint = new rmq::Endpoints();
+ cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Port = 8081;
+ address.Host = "11.166.42.94";
+ cmd.Settings.AccessPoint.Addresses.Add(address);
+ cmd.Settings.RequestTimeout = new Google.Protobuf.WellKnownTypes.Duration();
+ cmd.Settings.RequestTimeout.Seconds = 3;
+ cmd.Settings.RequestTimeout.Nanos = 0;
+ cmd.Settings.Publishing = new rmq::Publishing();
+ var topic = new rmq::Resource();
+ topic.Name = "cpp_sdk_standard";
+ cmd.Settings.Publishing.Topics.Add(topic);
+ cmd.Settings.UserAgent = new rmq::UA();
+ cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
+ cmd.Settings.UserAgent.Version = "1.0";
+ cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+ cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
+
+ var duplexStreaming = rpc_client.Telemetry(metadata);
+ var reader = duplexStreaming.ResponseStream;
+ var writer = duplexStreaming.RequestStream;
+
+ var cts = new CancellationTokenSource();
+ await writer.WriteAsync(cmd);
+ Console.WriteLine("Command written");
+ if (await reader.MoveNext(cts.Token))
{
- HttpHandler = ClientManager.createHttpHandler()
- });
- var invoker = channel.Intercept(new ClientLoggerInterceptor());
- var client = new rmq::MessagingService.MessagingServiceClient(invoker);
- rpcClient = new RpcClient(client);
+ var response = reader.Current;
+ switch (response.CommandCase)
+ {
+ case rmq::TelemetryCommand.CommandOneofCase.Settings:
+ {
+ var responded_settings = response.Settings;
+ Console.WriteLine($"{responded_settings.ToString()}");
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.None:
+ {
+ Console.WriteLine($"Unknown response command type: {response.Status.ToString()}");
+ break;
+ }
+ }
+ Console.WriteLine("Server responded ");
+ }
+ else
+ {
+ Console.WriteLine("Server is not responding");
+ var status = duplexStreaming.GetStatus();
+ Console.WriteLine($"status={status.ToString()}");
- clientConfig = new ClientConfig();
- var credentialsProvider = new ConfigFileCredentialsProvider();
- clientConfig.CredentialsProvider = credentialsProvider;
- clientConfig.ResourceNamespace = resourceNamespace;
- clientConfig.Region = "cn-hangzhou-pre";
- }
-
- [ClassCleanup]
- public static void TearDown()
- {
-
+ var trailers = duplexStreaming.GetTrailers();
+ Console.WriteLine($"trailers={trailers.ToString()}");
+ }
}
[TestMethod]
public void testQueryRoute()
{
+ string target = "https://11.166.42.94:8081";
+ var rpc_client = new RpcClient(target);
+ var client_config = new ClientConfig();
+ var metadata = new grpc::Metadata();
+ Signature.sign(client_config, metadata);
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = resourceNamespace;
- request.Topic.Name = topic;
+ request.Topic.Name = "cpp_sdk_standard";
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
var address = new rmq::Address();
- address.Host = host;
- address.Port = port;
+ address.Port = 8081;
+ address.Host = "11.166.42.94";
request.Endpoints.Addresses.Add(address);
-
- var metadata = new grpc::Metadata();
- Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.queryRoute(request, callOptions).GetAwaiter().GetResult();
- }
-
-
- [TestMethod]
- public void testHeartbeat()
- {
- var request = new rmq::HeartbeatRequest();
- request.ClientId = clientId;
- request.ProducerData = new rmq::ProducerData();
- request.ProducerData.Group = new rmq::Resource();
- request.ProducerData.Group.ResourceNamespace = resourceNamespace;
- request.ProducerData.Group.Name = topic;
- request.FifoFlag = false;
-
- var metadata = new grpc::Metadata();
- Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.heartbeat(request, callOptions).GetAwaiter().GetResult();
+ var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
+ var result = response.GetAwaiter().GetResult();
}
[TestMethod]
- public void testSendMessage()
+ public async Task TestSend()
{
+ string target = "https://11.166.42.94:8081";
+ var rpc_client = new RpcClient(target);
+ var client_config = new ClientConfig();
+ var metadata = new grpc::Metadata();
+ Signature.sign(client_config, metadata);
+
var request = new rmq::SendMessageRequest();
- request.Message = new rmq::Message();
- byte[] body = new byte[1024];
- for (int i = 0; i < body.Length; i++)
- {
- body[i] = (byte)'x';
- }
- request.Message.Body = pb::ByteString.CopyFrom(body);
- request.Message.Topic = new rmq::Resource();
- request.Message.Topic.ResourceNamespace = resourceNamespace;
- request.Message.Topic.Name = topic;
- request.Message.UserAttribute.Add("k", "v");
- request.Message.UserAttribute.Add("key", "value");
- request.Message.SystemAttribute = new rmq::SystemAttribute();
- request.Message.SystemAttribute.Tag = "TagA";
- request.Message.SystemAttribute.Keys.Add("key1");
- request.Message.SystemAttribute.MessageId = SequenceGenerator.Instance.Next();
-
- var metadata = new grpc::Metadata();
- Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.AddSeconds(3);
- var callOptions = new grpc::CallOptions(metadata, deadline);
-
- var response = rpcClient.sendMessage(request, callOptions).GetAwaiter().GetResult();
+ var message = new rmq::Message();
+ message.Topic = new rmq::Resource();
+ message.Topic.Name = "cpp_sdk_standard";
+ message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
+ message.SystemProperties = new rmq::SystemProperties();
+ message.SystemProperties.Tag = "TagA";
+ message.SystemProperties.MessageId = "abc";
+ request.Messages.Add(message);
+ var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
+ Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
}
-
- // Remove the Ignore annotation if server has fixed
- [Ignore]
- [TestMethod]
- public void testNotifyClientTermiantion()
- {
- var request = new rmq::NotifyClientTerminationRequest();
- request.ClientId = clientId;
- request.ProducerGroup = new rmq::Resource();
- request.ProducerGroup.ResourceNamespace = resourceNamespace;
- request.ProducerGroup.Name = group;
-
- var metadata = new grpc::Metadata();
- Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.AddSeconds(3);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.notifyClientTermination(request, callOptions).GetAwaiter().GetResult();
- }
-
- private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
-
- private static string topic = "cpp_sdk_standard";
-
- private static string clientId = "C001";
- private static string group = "GID_cpp_sdk_standard";
-
- private static string host = "116.62.231.199";
- private static int port = 80;
-
- private static IRpcClient rpcClient;
- private static ClientConfig clientConfig;
}
}
\ No newline at end of file
diff --git a/tests/SendResultTest.cs b/tests/SendResultTest.cs
index 8dd033a..4e3d9a0 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SendResultTest.cs
@@ -17,28 +17,32 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class SendResultTest {
+ public class SendResultTest
+ {
[TestMethod]
- public void testCtor() {
+ public void testCtor()
+ {
string messageId = new string("abc");
- var sendResult = new SendResult(messageId);
+ var sendResult = new SendReceipt(messageId);
Assert.AreEqual(messageId, sendResult.MessageId);
Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status);
}
[TestMethod]
- public void testCtor2() {
+ public void testCtor2()
+ {
string messageId = new string("abc");
- var sendResult = new SendResult(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
+ var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
Assert.AreEqual(messageId, sendResult.MessageId);
Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status);
}
}
-
+
}
\ No newline at end of file
diff --git a/tests/SequenceGeneratorTest.cs b/tests/SequenceGeneratorTest.cs
index fc0ceb0..9b55334 100644
--- a/tests/SequenceGeneratorTest.cs
+++ b/tests/SequenceGeneratorTest.cs
@@ -19,7 +19,7 @@
using System;
using System.Collections.Generic;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
{
[TestClass]
public class SequenceGeneratorTest
diff --git a/tests/SignatureTest.cs b/tests/SignatureTest.cs
index cece257..16d0f46 100644
--- a/tests/SignatureTest.cs
+++ b/tests/SignatureTest.cs
@@ -19,10 +19,12 @@
using Moq;
using System;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class SignatureTest {
+ public class SignatureTest
+ {
[TestMethod]
public void testSign()
@@ -33,7 +35,7 @@
mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
mock.Setup(x => x.serviceName()).Returns("mq");
mock.Setup(x => x.region()).Returns("cn-hangzhou");
-
+
string accessKey = "key";
string accessSecret = "secret";
var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
new file mode 100644
index 0000000..c986614
--- /dev/null
+++ b/tests/SimpleConsumerTest.cs
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using Castle.Core.Logging;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+
+ [TestClass]
+ public class SimpleConsumerTest
+ {
+
+ private static AccessPoint accessPoint;
+ private static string _resourceNamespace = "";
+ private static string _group = "GID_cpp_sdk_standard";
+ private static string _topic = "cpp_sdk_standard";
+
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ accessPoint = new AccessPoint
+ {
+ Host = "127.0.0.1",
+ Port = 8081
+ };
+ }
+
+ [TestMethod]
+ public async Task TestLifecycle()
+ {
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ await simpleConsumer.Start();
+ Thread.Sleep(1_000);
+ await simpleConsumer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestReceive()
+ {
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ await simpleConsumer.Start();
+ var batchSize = 32;
+ var receiveTimeout = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize, receiveTimeout);
+ Assert.IsTrue(messages.Count > 0);
+ Assert.IsTrue(messages.Count <= batchSize);
+ await simpleConsumer.Shutdown();
+ }
+
+
+ [TestMethod]
+ public async Task TestAck()
+ {
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ await simpleConsumer.Start();
+ var batchSize = 32;
+ var receiveTimeout = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize, receiveTimeout);
+ foreach (var message in messages)
+ {
+ await simpleConsumer.Ack(message);
+ Console.WriteLine($"Ack {message.MessageId} OK");
+ }
+ await simpleConsumer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestChangeInvisibleDuration()
+ {
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ await simpleConsumer.Start();
+ var batchSize = 32;
+ var receiveTimeout = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize, receiveTimeout);
+ foreach (var message in messages)
+ {
+ await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(10));
+ Console.WriteLine($"ChangeInvisibleDuration for message[MsgId={message.MessageId}] OK");
+ }
+ await simpleConsumer.Shutdown();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/StaticCredentialsProviderTest.cs b/tests/StaticCredentialsProviderTest.cs
index 20b957e..8b5f012 100644
--- a/tests/StaticCredentialsProviderTest.cs
+++ b/tests/StaticCredentialsProviderTest.cs
@@ -17,12 +17,15 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class StaticCredentialsProviderTest {
+ public class StaticCredentialsProviderTest
+ {
[TestMethod]
- public void testGetCredentials() {
+ public void testGetCredentials()
+ {
var accessKey = "key";
var accessSecret = "secret";
var provider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/StaticNameServerResolverTest.cs b/tests/StaticNameServerResolverTest.cs
deleted file mode 100644
index 88955e9..0000000
--- a/tests/StaticNameServerResolverTest.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-
-namespace org.apache.rocketmq
-{
- [TestClass]
- public class StaticNameServerResolverTest
- {
- [TestMethod]
- public void testResolve()
- {
- List<string> list = new List<string>();
- list.Add("https://localhost:80");
- var resolver = new StaticNameServerResolver(list);
- var result = resolver.resolveAsync().GetAwaiter().GetResult();
- Assert.AreSame(list, result);
- }
- }
-}
\ No newline at end of file
diff --git a/tests/TopicTest.cs b/tests/TopicTest.cs
index fcc15e4..9f386de 100644
--- a/tests/TopicTest.cs
+++ b/tests/TopicTest.cs
@@ -17,13 +17,16 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
-namespace org.apache.rocketmq {
-
- [TestClass]
- public class TopicTest {
+namespace Org.Apache.Rocketmq
+{
- [TestMethod]
- public void testCompareTo() {
+ [TestClass]
+ public class TopicTest
+ {
+
+ [TestMethod]
+ public void testCompareTo()
+ {
List<Topic> topics = new List<Topic>();
topics.Add(new Topic("ns1", "t1"));
topics.Add(new Topic("ns0", "t1"));
@@ -36,13 +39,13 @@
Assert.AreEqual(topics[1].ResourceNamespace, "ns0");
Assert.AreEqual(topics[1].Name, "t1");
-
+
Assert.AreEqual(topics[2].ResourceNamespace, "ns1");
Assert.AreEqual(topics[2].Name, "t1");
-
+
}
- }
- }
\ No newline at end of file
+ }
+}
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index 4689e52..bbf537a 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -1,8 +1,12 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
using Grpc.Net.Client;
-using apache.rocketmq.v1;
+using rmq = Apache.Rocketmq.V2;
+
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
namespace tests
{
[TestClass]
@@ -11,40 +15,61 @@
[TestMethod]
public void TestMethod1()
{
- apache.rocketmq.v1.Permission perm = apache.rocketmq.v1.Permission.None;
- switch(perm) {
- case apache.rocketmq.v1.Permission.None:
- {
- Console.WriteLine("None");
- break;
- }
+ rmq::Permission perm = rmq::Permission.None;
+ switch (perm)
+ {
+ case rmq::Permission.None:
+ {
+ Console.WriteLine("None");
+ break;
+ }
- case apache.rocketmq.v1.Permission.Read:
- {
- Console.WriteLine("Read");
- break;
- }
+ case rmq::Permission.Read:
+ {
+ Console.WriteLine("Read");
+ break;
+ }
- case apache.rocketmq.v1.Permission.Write:
- {
- Console.WriteLine("Write");
- break;
- }
+ case rmq::Permission.Write:
+ {
+ Console.WriteLine("Write");
+ break;
+ }
- case apache.rocketmq.v1.Permission.ReadWrite:
- {
- Console.WriteLine("ReadWrite");
- break;
- }
+ case rmq::Permission.ReadWrite:
+ {
+ Console.WriteLine("ReadWrite");
+ break;
+ }
}
}
[TestMethod]
- public void TestRpcClientImplCtor() {
- using var channel = GrpcChannel.ForAddress("https://localhost:5001");
- var client = new MessagingService.MessagingServiceClient(channel);
- RpcClient impl = new RpcClient(client);
+ public void TestRpcClientImplCtor()
+ {
+ RpcClient impl = new RpcClient("https://localhost:5001");
+ }
+
+ [TestMethod]
+ public void TestConcurrentDictionary()
+ {
+ var dict = new ConcurrentDictionary<string, List<String>>();
+ string s = "abc";
+ List<String> result;
+ var exists = dict.TryGetValue(s, out result);
+ Assert.IsFalse(exists);
+ Assert.IsNull(result);
+
+ result = new List<string>();
+ result.Add("abc");
+ Assert.IsTrue(dict.TryAdd(s, result));
+
+ List<String> list;
+ exists = dict.TryGetValue(s, out list);
+ Assert.IsTrue(exists);
+ Assert.IsNotNull(list);
+ Assert.AreEqual(1, list.Count);
}
}
}