Make producer work
diff --git a/rocketmq-client-csharp/Address.cs b/rocketmq-client-csharp/AccessPoint.cs
similarity index 74%
rename from rocketmq-client-csharp/Address.cs
rename to rocketmq-client-csharp/AccessPoint.cs
index bd862b5..f97d216 100644
--- a/rocketmq-client-csharp/Address.cs
+++ b/rocketmq-client-csharp/AccessPoint.cs
@@ -14,18 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
namespace Org.Apache.Rocketmq
{
- public class Address
+ public class AccessPoint
{
- public Address(string host, int port)
+ private string _host;
+
+ public string Host
{
- Host = host;
- Port = port;
+ get { return _host; }
+ set { _host = value; }
}
- public string Host { get; }
- public int Port { get; }
+ private int _port;
+
+ public int Port
+ {
+ get { return _port; }
+ set { _port = value; }
+ }
}
-}
\ No newline at end of file
+}
diff --git a/rocketmq-client-csharp/AddressScheme.cs b/rocketmq-client-csharp/AddressScheme.cs
deleted file mode 100644
index 822ee4a..0000000
--- a/rocketmq-client-csharp/AddressScheme.cs
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Org.Apache.Rocketmq
-{
- public enum AddressScheme
- {
- Ipv4,
- Ipv6,
- DomainName,
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Broker.cs b/rocketmq-client-csharp/Broker.cs
deleted file mode 100644
index b0c11d1..0000000
--- a/rocketmq-client-csharp/Broker.cs
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace Org.Apache.Rocketmq
-{
- public class Broker : IComparable<Broker>, IEquatable<Broker>
- {
- public Broker(string name, int id, ServiceAddress address)
- {
- Name = name;
- Id = id;
- Address = address;
- }
-
- public string Name { get; }
- public int Id { get; }
- public ServiceAddress Address { get; }
-
- /// <summary>
- /// Calculate context aware primary target URL.
- /// </summary>
- /// <returns>Context aware primary target URL.</returns>
- public string TargetUrl()
- {
- var address = Address.Addresses[0];
- return $"https://{address.Host}:{address.Port}";
- }
-
- /// <summary>
- /// Judge whether equals to other or not, ignore <see cref="Address"/> on purpose.
- /// </summary>
- public bool Equals(Broker other)
- {
- if (ReferenceEquals(null, other))
- {
- return false;
- }
-
- if (ReferenceEquals(this, other))
- {
- return true;
- }
-
- return Name == other.Name && Id == other.Id;
- }
-
- public override bool Equals(object obj)
- {
- if (ReferenceEquals(null, obj) || obj.GetType() != GetType())
- {
- return false;
- }
-
- if (ReferenceEquals(this, obj))
- {
- return true;
- }
-
- return Equals((Broker)obj);
- }
-
- /// <summary>
- /// Return the hash code, ignore <see cref="Address"/> on purpose.
- /// </summary>
- public override int GetHashCode()
- {
- return HashCode.Combine(Name, Id);
- }
-
- /// <summary>
- /// Compare with other, ignore <see cref="Address"/> on purpose.
- /// </summary>
- public int CompareTo(Broker other)
- {
- if (ReferenceEquals(null, other))
- {
- return -1;
- }
-
- var compareTo = String.CompareOrdinal(Name, other.Name);
- if (0 == compareTo)
- {
- compareTo = Id.CompareTo(other.Id);
- }
-
- return compareTo;
- }
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 50f85af..164e98a 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -31,12 +31,18 @@
{
protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public Client(INameServerResolver resolver, string resourceNamespace)
+ public Client(AccessPoint accessPoint, string resourceNamespace)
{
- _nameServerResolver = resolver;
+ // Support IPv4 for now
+ AccessPointScheme = rmq::AddressScheme.Ipv4;
+
+ var serviceEndpoint = new rmq::Address();
+ serviceEndpoint.Host = accessPoint.Host;
+ serviceEndpoint.Port = accessPoint.Port;
+ AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
+
_resourceNamespace = resourceNamespace;
Manager = ClientManagerFactory.getClientManager(resourceNamespace);
- _nameServerResolverCts = new CancellationTokenSource();
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
@@ -48,11 +54,6 @@
{
schedule(async () =>
{
- await UpdateNameServerList();
- }, 30, _nameServerResolverCts.Token);
-
- schedule(async () =>
- {
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
@@ -62,7 +63,6 @@
{
Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
- _nameServerResolverCts.Cancel();
Manager.Shutdown().GetAwaiter().GetResult();
}
@@ -70,9 +70,9 @@
{
foreach (var item in _topicRouteTable)
{
- foreach (var partition in item.Value.Partitions)
+ foreach (var partition in item.Value.MessageQueues)
{
- string target = partition.Broker.TargetUrl();
+ string target = Utilities.TargetUrl(partition);
if (acceptor(target))
{
return target;
@@ -90,9 +90,9 @@
List<string> endpoints = new List<string>();
foreach (var item in _topicRouteTable)
{
- foreach (var partition in item.Value.Partitions)
+ foreach (var partition in item.Value.MessageQueues)
{
- string endpoint = partition.Broker.TargetUrl();
+ string endpoint = Utilities.TargetUrl(partition);
if (!endpoints.Contains(endpoint))
{
endpoints.Add(endpoint);
@@ -102,47 +102,8 @@
return endpoints;
}
- private async Task UpdateNameServerList()
- {
- List<string> nameServers = await _nameServerResolver.resolveAsync();
- if (0 == nameServers.Count)
- {
- // Whoops, something should be wrong. We got an empty name server list.
- Logger.Warn("Got an empty name server list");
- return;
- }
-
- if (nameServers.Equals(this._nameServers))
- {
- Logger.Debug("Name server list remains unchanged");
- return;
- }
-
- // Name server list is updated.
- // TODO: Locking is required
- this._nameServers = nameServers;
- this._currentNameServerIndex = 0;
- }
-
private async Task UpdateTopicRoute()
{
- if (null == _nameServers || 0 == _nameServers.Count)
- {
- List<string> list = await _nameServerResolver.resolveAsync();
- if (null != list && 0 != list.Count)
- {
- this._nameServers = list;
- }
- else
- {
- Logger.Error("Failed to resolve name server list");
- return;
- }
- }
-
- // We got one or more name servers available.
- string nameServer = _nameServers[_currentNameServerIndex];
-
List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
foreach (var item in _topicRouteTable)
{
@@ -158,12 +119,12 @@
continue;
}
- if (0 == item.Partitions.Count)
+ if (0 == item.MessageQueues.Count)
{
continue;
}
- var topicName = item.Partitions[0].Topic.Name;
+ var topicName = item.MessageQueues[0].Topic.Name;
var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
{
@@ -190,18 +151,6 @@
});
}
- protected rmq::Endpoints AccessEndpoint(string nameServer)
- {
- var endpoints = new rmq::Endpoints();
- endpoints.Scheme = rmq::AddressScheme.Ipv4;
- var address = new rmq::Address();
- int pos = nameServer.LastIndexOf(':');
- address.Host = nameServer.Substring(0, pos);
- address.Port = Int32.Parse(nameServer.Substring(pos + 1));
- endpoints.Addresses.Add(address);
- return endpoints;
- }
-
/**
* Parameters:
* topic
@@ -216,59 +165,44 @@
return _topicRouteTable[topic];
}
- if (null == _nameServers || 0 == _nameServers.Count)
+ // We got one or more name servers available.
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = AccessPointScheme;
+ foreach (var address in AccessPointEndpoints)
{
- List<string> list = await _nameServerResolver.resolveAsync();
- if (null != list && 0 != list.Count)
+ request.Endpoints.Addresses.Add(address);
+ }
+
+ var metadata = new grpc.Metadata();
+ Signature.sign(this, metadata);
+ int index = random.Next(0, AccessPointEndpoints.Count);
+ var serviceEndpoint = AccessPointEndpoints[index];
+ // AccessPointAddresses.Count
+ string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+ TopicRouteData topicRouteData;
+ try
+ {
+ topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
+ if (null != topicRouteData)
{
- this._nameServers = list;
+ Logger.Debug($"Got route entries for {topic} from name server");
+ _topicRouteTable.TryAdd(topic, topicRouteData);
+ return topicRouteData;
}
else
{
- Logger.Error("Name server is not properly configured. List is null or empty");
- return null;
+ Logger.Warn($"Failed to query route of {topic} from {target}");
}
}
-
-
- for (int retry = 0; retry < MaxTransparentRetry; retry++)
+ catch (System.Exception e)
{
- // We got one or more name servers available.
- int index = (_currentNameServerIndex + retry) % _nameServers.Count;
- string nameServer = _nameServers[index];
- var request = new rmq::QueryRouteRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
- request.Endpoints = AccessEndpoint(nameServer);
- var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
- string target = $"https://{nameServer}";
- TopicRouteData topicRouteData;
- try
- {
- topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
- if (null != topicRouteData)
- {
- Logger.Debug($"Got route entries for {topic} from name server");
- _topicRouteTable.TryAdd(topic, topicRouteData);
-
- if (retry > 0)
- {
- _currentNameServerIndex = index;
- }
- return topicRouteData;
- }
- else
- {
- Logger.Warn($"Failed to query route of {topic} from {target}");
- }
- }
- catch (System.Exception e)
- {
- Logger.Warn(e, "Failed when querying route");
- }
+ Logger.Warn(e, "Failed when querying route");
}
+
return null;
}
@@ -320,7 +254,12 @@
request.Group = new rmq::Resource();
request.Group.ResourceNamespace = _resourceNamespace;
request.Group.Name = group;
- request.Endpoints = AccessEndpoint(_nameServers[_currentNameServerIndex]);
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = AccessPointScheme;
+ foreach (var endpoint in AccessPointEndpoints)
+ {
+ request.Endpoints.Addresses.Add(endpoint);
+ }
try
{
var metadata = new grpc::Metadata();
@@ -344,6 +283,23 @@
return $"https://{address.Host}:{address.Port}";
}
+ public void buildClientSetting(rmq::Settings settings)
+ {
+
+ }
+
+ public void createSession(string url)
+ {
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ var stream = Manager.Telemetry(url, metadata);
+ var session = new Session(url, stream, this);
+ Task.Run(async () =>
+ {
+ await session.Loop();
+ });
+ }
+
public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
{
@@ -427,17 +383,11 @@
}
protected readonly IClientManager Manager;
-
- private readonly INameServerResolver _nameServerResolver;
- private readonly CancellationTokenSource _nameServerResolverCts;
- private List<string> _nameServers;
- private int _currentNameServerIndex;
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
private readonly CancellationTokenSource _healthCheckCts;
-
- protected const int MaxTransparentRetry = 3;
+ private Random random = new Random();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index dfc30c6..6dc3eba 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -31,6 +31,7 @@
this.client_type_ = rmq::ClientType.Unspecified;
this.access_point_ = new rmq::Endpoints();
this.back_off_policy_ = new rmq::RetryPolicy();
+ this._publishing = new Publishing();
}
public string region() {
@@ -141,13 +142,13 @@
private rmq::Endpoints access_point_;
- public rmq::AddressScheme AccessPointAddressScheme
+ public rmq::AddressScheme AccessPointScheme
{
get { return access_point_.Scheme; }
set { access_point_.Scheme = value; }
}
- public List<rmq::Address> AccessPointAddresses
+ public List<rmq::Address> AccessPointEndpoints
{
get
{
@@ -171,7 +172,11 @@
private rmq::RetryPolicy back_off_policy_;
-
+ private Publishing _publishing;
+ public Publishing Publishing
+ {
+ get { return _publishing; }
+ }
}
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index df0f32f..a0b377b 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -74,6 +74,12 @@
}
}
+ public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata)
+ {
+ var rpcClient = GetRpcClient(target);
+ return rpcClient.Telemetry(metadata);
+ }
+
public async Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata,
rmq::QueryRouteRequest request, TimeSpan timeout)
{
@@ -82,72 +88,16 @@
if (queryRouteResponse.Status.Code != rmq::Code.Ok)
{
+ Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status.ToString()}");
// Raise an application layer exception
}
- var partitions = new List<Partition>();
- // Translate protobuf object to domain specific one
- foreach (var partition in queryRouteResponse.MessageQueues)
+ var messageQueues = new List<rmq::MessageQueue>();
+ foreach (var messageQueue in queryRouteResponse.MessageQueues)
{
- var topic = new Topic(partition.Topic.ResourceNamespace, partition.Topic.Name);
- var id = partition.Id;
- Permission permission = Permission.ReadWrite;
- switch (partition.Permission)
- {
- case rmq::Permission.None:
- {
- permission = Permission.None;
- break;
- }
- case rmq::Permission.Read:
- {
- permission = Permission.Read;
- break;
- }
- case rmq::Permission.Write:
- {
- permission = Permission.Write;
- break;
- }
- case rmq::Permission.ReadWrite:
- {
- permission = Permission.ReadWrite;
- break;
- }
- }
-
- AddressScheme scheme = AddressScheme.Ipv4;
- switch (partition.Broker.Endpoints.Scheme)
- {
- case rmq::AddressScheme.Ipv4:
- {
- scheme = AddressScheme.Ipv4;
- break;
- }
- case rmq::AddressScheme.Ipv6:
- {
- scheme = AddressScheme.Ipv6;
- break;
- }
- case rmq::AddressScheme.DomainName:
- {
- scheme = AddressScheme.DomainName;
- break;
- }
- }
-
- List<Address> addresses = new List<Address>();
- foreach (var item in partition.Broker.Endpoints.Addresses)
- {
- addresses.Add(new Address(item.Host, item.Port));
- }
-
- ServiceAddress serviceAddress = new ServiceAddress(scheme, addresses);
- Broker broker = new Broker(partition.Broker.Name, id, serviceAddress);
- partitions.Add(new Partition(topic, broker, id, permission));
+ messageQueues.Add(messageQueue);
}
-
- var topicRouteData = new TopicRouteData(partitions);
+ var topicRouteData = new TopicRouteData(messageQueues);
return topicRouteData;
}
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index bd58bbb..f4115a2 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -17,6 +17,7 @@
using System.Threading.Tasks;
using System;
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -27,5 +28,6 @@
Task<bool> NotifyClientTermination();
+ void buildClientSetting(rmq::Settings settings);
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index f2d2908..d5c3ea3 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -26,6 +26,8 @@
public interface IClientManager {
IRpcClient GetRpcClient(string target);
+ grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
+
Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout);
Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/INameServerResolver.cs
deleted file mode 100644
index 4e3d10a..0000000
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
- public interface INameServerResolver
- {
- Task<List<string>> resolveAsync();
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Partition.cs b/rocketmq-client-csharp/Partition.cs
deleted file mode 100644
index 5c2e748..0000000
--- a/rocketmq-client-csharp/Partition.cs
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace Org.Apache.Rocketmq
-{
- public class Partition : IEquatable<Partition>, IComparable<Partition>
- {
- public Partition(Topic topic, Broker broker, int id, Permission permission)
- {
- Topic = topic;
- Broker = broker;
- Id = id;
- Permission = permission;
- }
-
- public Topic Topic { get; }
- public Broker Broker { get; }
- public int Id { get; }
-
- public Permission Permission { get; }
-
- public bool Equals(Partition other)
- {
- if (ReferenceEquals(null, other))
- {
- return false;
- }
-
- if (ReferenceEquals(this, other))
- {
- return true;
- }
-
- return Equals(Topic, other.Topic) && Equals(Broker, other.Broker) && Id == other.Id &&
- Permission == other.Permission;
- }
-
- public override bool Equals(object obj)
- {
- if (ReferenceEquals(null, obj) || obj.GetType() != GetType())
- {
- return false;
- }
-
- if (ReferenceEquals(this, obj))
- {
- return true;
- }
-
- return Equals((Partition)obj);
- }
-
-
- public override int GetHashCode()
- {
- return HashCode.Combine(Topic, Broker, Id, (int)Permission);
- }
-
- public int CompareTo(Partition other)
- {
- if (ReferenceEquals(null, other))
- {
- return -1;
- }
-
- var compareTo = Topic.CompareTo(other.Topic);
- if (0 == compareTo)
- {
- compareTo = Broker.CompareTo(other.Broker);
- }
-
- if (0 == compareTo)
- {
- compareTo = Id.CompareTo(other.Id);
- }
-
- if (0 == compareTo)
- {
- compareTo = Permission.CompareTo(other.Permission);
- }
-
- return compareTo;
- }
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Permission.cs b/rocketmq-client-csharp/Permission.cs
deleted file mode 100644
index 6111b20..0000000
--- a/rocketmq-client-csharp/Permission.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Org.Apache.Rocketmq
-{
- public enum Permission
- {
- None,
- Read,
- Write,
- ReadWrite,
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 226ed5f..7932282 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -28,7 +28,7 @@
{
public class Producer : Client, IProducer
{
- public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
+ public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
{
this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
}
@@ -55,9 +55,9 @@
if (!loadBalancer.ContainsKey(message.Topic))
{
var topicRouteData = await GetRouteFor(message.Topic, false);
- if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+ if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
{
- Logger.Error($"Failed to resolve route info for {message.Topic} after {MaxTransparentRetry} attempts");
+ Logger.Error($"Failed to resolve route info for {message.Topic}");
throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
}
@@ -98,10 +98,10 @@
// string target = "https://";
List<string> targets = new List<string>();
- List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
- foreach (var partition in candidates)
+ List<rmq::MessageQueue> candidates = publishLB.select(message.MaxAttemptTimes);
+ foreach (var messageQueue in candidates)
{
- targets.Add(partition.Broker.TargetUrl());
+ targets.Add(Utilities.TargetUrl(messageQueue));
}
var metadata = new Metadata();
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
index bf341c1..5410b5a 100644
--- a/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -16,6 +16,7 @@
*/
using System;
using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -23,51 +24,51 @@
{
public PublishLoadBalancer(TopicRouteData route)
{
- this.partitions = new List<Partition>();
- foreach (var partition in route.Partitions)
+ this._messageQueues = new List<rmq::MessageQueue>();
+ foreach (var messageQueue in route.MessageQueues)
{
- if (Permission.None == partition.Permission)
+ if (rmq::Permission.Unspecified == messageQueue.Permission)
{
continue;
}
- if (Permission.Read == partition.Permission)
+ if (rmq::Permission.Read == messageQueue.Permission)
{
continue;
}
- this.partitions.Add(partition);
+ this._messageQueues.Add(messageQueue);
}
- this.partitions.Sort();
+ this._messageQueues.Sort(Utilities.CompareMessageQueue);
Random random = new Random();
- this.roundRobinIndex = random.Next(0, this.partitions.Count);
+ this.roundRobinIndex = random.Next(0, this._messageQueues.Count);
}
public void update(TopicRouteData route)
{
- List<Partition> partitions = new List<Partition>();
- foreach (var partition in route.Partitions)
+ List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
+ foreach (var partition in route.MessageQueues)
{
- if (Permission.None == partition.Permission)
+ if (rmq::Permission.Unspecified == partition.Permission)
{
continue;
}
- if (Permission.Read == partition.Permission)
+ if (rmq::Permission.Read == partition.Permission)
{
continue;
}
partitions.Add(partition);
}
partitions.Sort();
- this.partitions = partitions;
+ this._messageQueues = partitions;
}
/**
* Accept a partition iff its broker is different.
*/
- private bool accept(List<Partition> existing, Partition partition)
+ private bool accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
{
if (0 == existing.Count)
{
@@ -76,7 +77,7 @@
foreach (var item in existing)
{
- if (item.Broker.Equals(partition.Broker))
+ if (item.Broker.Equals(messageQueue.Broker))
{
return false;
}
@@ -84,11 +85,11 @@
return true;
}
- public List<Partition> select(int maxAttemptTimes)
+ public List<rmq::MessageQueue> select(int maxAttemptTimes)
{
- List<Partition> result = new List<Partition>();
+ List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
- List<Partition> all = this.partitions;
+ List<rmq::MessageQueue> all = this._messageQueues;
if (0 == all.Count)
{
return result;
@@ -112,7 +113,7 @@
return result;
}
- private List<Partition> partitions;
+ private List<rmq::MessageQueue> _messageQueues;
private int roundRobinIndex;
}
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/Publishing.cs
similarity index 60%
rename from rocketmq-client-csharp/StaticNameServerResolver.cs
rename to rocketmq-client-csharp/Publishing.cs
index bc1f670..138b65a 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/Publishing.cs
@@ -14,24 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+using rmq = Apache.Rocketmq.V2;
+
using System.Collections.Generic;
-using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
- public class StaticNameServerResolver : INameServerResolver
+ // Settings for publishing
+ public class Publishing
{
-
- public StaticNameServerResolver(List<string> nameServerList)
+ private List<rmq::Resource> _topics;
+ public List<rmq::Resource> Topics
{
- this.nameServerList = nameServerList;
+ get { return _topics; }
+ set { _topics = value; }
}
- public async Task<List<string>> resolveAsync()
+ private int _compressBodyThreshold;
+ public int CompressBodyThreshold
{
- return nameServerList;
+ get { return _compressBodyThreshold; }
+ set { _compressBodyThreshold = value; }
}
- private List<string> nameServerList;
+ private int _maxBodySize;
+ public int MaxBodySize
+ {
+ get { return _maxBodySize; }
+ set { _maxBodySize = value; }
+ }
+
}
+
+
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index bcaae0c..6ab362d 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -24,7 +24,7 @@
{
public class PushConsumer : Client, IConsumer
{
- public PushConsumer(INameServerResolver resolver, string resourceNamespace, string group) : base(resolver, resourceNamespace)
+ public PushConsumer(AccessPoint accessPoint, string resourceNamespace, string group) : base(accessPoint, resourceNamespace)
{
_group = group;
_topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
diff --git a/rocketmq-client-csharp/SequenceGenerator.cs b/rocketmq-client-csharp/SequenceGenerator.cs
index 7f4c394..97a1eb9 100644
--- a/rocketmq-client-csharp/SequenceGenerator.cs
+++ b/rocketmq-client-csharp/SequenceGenerator.cs
@@ -17,6 +17,7 @@
using System;
using System.Threading;
using System.Net.NetworkInformation;
+using NLog;
namespace Org.Apache.Rocketmq
{
@@ -27,6 +28,7 @@
*/
public sealed class SequenceGenerator
{
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
public static SequenceGenerator Instance
{
@@ -94,10 +96,11 @@
{
if (nic.OperationalStatus == OperationalStatus.Up)
{
- if (nic.Name.Equals("lo"))
+ if (nic.Name.StartsWith("lo"))
{
continue;
}
+ Logger.Debug($"NIC={nic.Name}");
return nic.GetPhysicalAddress().GetAddressBytes();
}
}
diff --git a/rocketmq-client-csharp/ServiceAddress.cs b/rocketmq-client-csharp/ServiceAddress.cs
deleted file mode 100644
index fadb021..0000000
--- a/rocketmq-client-csharp/ServiceAddress.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Generic;
-
-namespace Org.Apache.Rocketmq
-{
- public sealed class ServiceAddress
- {
- public ServiceAddress(AddressScheme scheme, List<Address> addresses)
- {
- Scheme = scheme;
- Addresses = addresses;
- }
-
- public AddressScheme Scheme { get; }
- public List<Address> Addresses { get; }
- }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 2cc079e..268e077 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -14,10 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
using grpc = global::Grpc.Core;
-using System.Security.Cryptography;
+using NLog;
+
+
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -25,9 +28,73 @@
class Session
{
- public string Target { get; }
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ public Session(string target,
+ grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+ IClient client)
+ {
+ this._target = target;
+ this._stream = stream;
+ }
+ public async Task Loop()
+ {
+ var reader = this._stream.ResponseStream;
+ var writer = this._stream.RequestStream;
+ var request = new rmq::TelemetryCommand();
+ request.Settings = new rmq::Settings();
+ _client.buildClientSetting(request.Settings);
+ await writer.WriteAsync(request);
+ while (!_cts.IsCancellationRequested)
+ {
+ if (await reader.MoveNext(_cts.Token))
+ {
+ var cmd = reader.Current;
+ switch (cmd.CommandCase)
+ {
+ case rmq::TelemetryCommand.CommandOneofCase.None:
+ {
+ Logger.Warn($"Telemetry failed: {cmd.Status.ToString()}");
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.Settings:
+ {
+
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
+ {
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
+ {
+ break;
+ }
+ case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public void Cancel()
+ {
+ _cts.Cancel();
+ }
+
+ private string _target;
+ public string Target { get { return _target; } }
+ private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+ private IClient _client;
+
+ private CancellationTokenSource _cts = new CancellationTokenSource();
+ public CancellationTokenSource CTS
+ {
+ get { return _cts; }
+ }
};
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/TopicRouteData.cs b/rocketmq-client-csharp/TopicRouteData.cs
index a751c51..e4aa04c 100644
--- a/rocketmq-client-csharp/TopicRouteData.cs
+++ b/rocketmq-client-csharp/TopicRouteData.cs
@@ -17,24 +17,27 @@
using System;
using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class TopicRouteData : IEquatable<TopicRouteData>
{
- public TopicRouteData(List<Partition> partitions)
+ public TopicRouteData(List<rmq::MessageQueue> partitions)
{
- Partitions = partitions;
- Partitions.Sort();
+ _messageQueues = partitions;
+
+ _messageQueues.Sort(Utilities.CompareMessageQueue);
}
- public List<Partition> Partitions { get; }
+ private List<rmq::MessageQueue> _messageQueues;
+ public List<rmq::MessageQueue> MessageQueues { get { return _messageQueues; } }
public bool Equals(TopicRouteData other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
- return Equals(Partitions, other.Partitions);
+ return Equals(_messageQueues, other._messageQueues);
}
public override bool Equals(object obj)
@@ -47,7 +50,7 @@
public override int GetHashCode()
{
- return (Partitions != null ? Partitions.GetHashCode() : 0);
+ return (_messageQueues != null ? _messageQueues.GetHashCode() : 0);
}
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Utilities.cs b/rocketmq-client-csharp/Utilities.cs
index 3d4818f..23ed8db 100644
--- a/rocketmq-client-csharp/Utilities.cs
+++ b/rocketmq-client-csharp/Utilities.cs
@@ -19,6 +19,8 @@
using System.Linq;
using System.Net.NetworkInformation;
using System.Text;
+using System;
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -49,5 +51,29 @@
return result.ToString();
}
+
+ public static string TargetUrl(rmq::MessageQueue messageQueue)
+ {
+ // TODO: Assert associated broker has as least one service endpoint.
+ var serviceEndpoint = messageQueue.Broker.Endpoints.Addresses[0];
+ return $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+ }
+
+ public static int CompareMessageQueue(rmq::MessageQueue lhs, rmq::MessageQueue rhs)
+ {
+ int topic_comparison = String.Compare(lhs.Topic.ResourceNamespace + lhs.Topic.Name, rhs.Topic.ResourceNamespace + rhs.Topic.Name);
+ if (topic_comparison != 0)
+ {
+ return topic_comparison;
+ }
+
+ int broker_name_comparison = String.Compare(lhs.Broker.Name, rhs.Broker.Name);
+ if (0 != broker_name_comparison)
+ {
+ return broker_name_comparison;
+ }
+
+ return lhs.Id < rhs.Id ? -1 : (lhs.Id == rhs.Id ? 0 : 1);
+ }
}
}
\ No newline at end of file
diff --git a/tests/BrokerTest.cs b/tests/BrokerTest.cs
deleted file mode 100644
index 19f6b47..0000000
--- a/tests/BrokerTest.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
-
-namespace Org.Apache.Rocketmq {
- [TestClass]
- public class BrokerTest {
-
- [TestMethod]
- public void testCompareTo() {
- var b1 = new Broker("b1", 0, null);
- var b2 = new Broker("b1", 1, null);
- Assert.AreEqual(b1.CompareTo(b2), -1);
- }
-
- [TestMethod]
- public void testEquals() {
- var b1 = new Broker("b1", 0, null);
- var b2 = new Broker("b1", 0, null);
- Assert.AreEqual(b1, b2, "Equals method should be employed to test equality");
- }
-
- }
-}
\ No newline at end of file
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 7dde545..a6746ff 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -15,8 +15,8 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
using System;
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
@@ -28,10 +28,6 @@
[ClassInitialize]
public static void SetUp(TestContext context)
{
- List<string> nameServerAddress = new List<string>();
- nameServerAddress.Add(string.Format("{0}:{1}", host, port));
- resolver = new StaticNameServerResolver(nameServerAddress);
-
credentialsProvider = new ConfigFileCredentialsProvider();
}
@@ -41,33 +37,31 @@
}
-
[TestMethod]
- public void testSendMessage()
+ public async Task testSendMessage()
{
- var producer = new Producer(resolver, resourceNamespace);
+ var accessPoint = new AccessPoint();
+ accessPoint.Host = host;
+ accessPoint.Port = port;
+ var producer = new Producer(accessPoint, resourceNamespace);
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
var msg = new Message(topic, body);
- var sendResult = producer.Send(msg).GetAwaiter().GetResult();
+ var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
producer.Shutdown();
}
- private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+ private static string resourceNamespace = "";
private static string topic = "cpp_sdk_standard";
- private static string clientId = "C001";
- private static string group = "GID_cpp_sdk_standard";
-
- private static INameServerResolver resolver;
private static ICredentialsProvider credentialsProvider;
- private static string host = "116.62.231.199";
- private static int port = 80;
+ private static string host = "11.166.42.94";
+ private static int port = 8081;
}
}
\ No newline at end of file
diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs
index 5250bb8..444530b 100644
--- a/tests/PushConsumerTest.cs
+++ b/tests/PushConsumerTest.cs
@@ -53,11 +53,8 @@
[ClassInitialize]
public static void SetUp(TestContext context)
{
- List<string> nameServerAddress = new List<string>();
- nameServerAddress.Add(string.Format("{0}:{1}", host, port));
- resolver = new StaticNameServerResolver(nameServerAddress);
-
credentialsProvider = new ConfigFileCredentialsProvider();
+
}
[ClassCleanup]
@@ -66,10 +63,18 @@
}
+ [TestInitialize]
+ public void SetUp()
+ {
+ accessPoint = new AccessPoint();
+ accessPoint.Host = host;
+ accessPoint.Port = port;
+ }
+
[TestMethod]
public void testLifecycle()
{
- var consumer = new PushConsumer(resolver, resourceNamespace, group);
+ var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
consumer.Region = "cn-hangzhou-pre";
consumer.Subscribe(topic, "*", ExpressionType.TAG);
@@ -84,7 +89,7 @@
[TestMethod]
public void testConsumeMessage()
{
- var consumer = new PushConsumer(resolver, resourceNamespace, group);
+ var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
consumer.Region = "cn-hangzhou-pre";
consumer.Subscribe(topic, "*", ExpressionType.TAG);
@@ -99,14 +104,14 @@
private static string topic = "cpp_sdk_standard";
- private static string clientId = "C001";
private static string group = "GID_cpp_sdk_standard";
- private static INameServerResolver resolver;
private static ICredentialsProvider credentialsProvider;
private static string host = "116.62.231.199";
private static int port = 80;
+ private AccessPoint accessPoint;
+
}
}
\ No newline at end of file
diff --git a/tests/StaticNameServerResolverTest.cs b/tests/StaticNameServerResolverTest.cs
deleted file mode 100644
index 853be1a..0000000
--- a/tests/StaticNameServerResolverTest.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-
-namespace Org.Apache.Rocketmq
-{
- [TestClass]
- public class StaticNameServerResolverTest
- {
- [TestMethod]
- public void testResolve()
- {
- List<string> list = new List<string>();
- list.Add("https://localhost:80");
- var resolver = new StaticNameServerResolver(list);
- var result = resolver.resolveAsync().GetAwaiter().GetResult();
- Assert.AreSame(list, result);
- }
- }
-}
\ No newline at end of file