Merge back to master (#21)

* Refactor ClientManager and RpcClient (#9)

* Ensure rpc stub dispose correctly when shutdown (#10)

* Transparent retry when query topic route entries

* Make sure stub shutdown gracefully

* Log when something is wrong (#11)

* Supply the residual RPC request (#12)

* Fix a series of naming issues (#13)

* Implement the first alpha version of PushConsumer, with many TODOs (#14)

Implement an initial version of PushConsumer

* Polish code (#15)

* WIP: Start to adapt to protocol v2

* WIP: debug telemetry bi-direction streaming

* Fix telemetry by adding x-mq-client-id header

* WIP:

* Make producer work

* Polish code (#20)

* Add package OpenTelemetry and Opentelemetry.API

* WIP: write unit tests

* WIP

* Make Shutdown async

* WIP: refactor start procedure

* WIP

* WIP

* WIP: receive messages

* WIP: add change invisible duration

* Minor fix

* Adjust message

* Add unit tests for Producer.Send

* WIP: prepare to add unit test for SimpleConsumer.Receive

* WIP: Unit tests for SimpleConsumer.Receive

* Add unit tests for SimpleConsumer

* Collect metrics for Producer

* Fix minor issue

* Add custom otlp exporter

* add MeterProvider

Co-authored-by: aaron ai <yangkun.ayk@alibaba-inc.com>
diff --git a/.gitignore b/.gitignore
index 678b6cc..c6ddbae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@
 .vscode/
 .idea
 *.user
-*DS_Store
\ No newline at end of file
+*DS_Store
+.fake
\ No newline at end of file
diff --git a/examples/Program.cs b/examples/Program.cs
index 9bf745c..09a1674 100644
--- a/examples/Program.cs
+++ b/examples/Program.cs
@@ -5,20 +5,24 @@
 namespace examples

 {

 

-    class Foo {

+    class Foo
+    {

         public int bar = 1;

     }

     class Program

     {

 

-        static void RT(Action action, int seconds, CancellationToken token) {

-            if (null == action) {

+        static void RT(Action action, int seconds, CancellationToken token)
+        {

+            if (null == action)
+            {

                 return;

             }

 

             Task.Run(async () =>

             {

-                while(!token.IsCancellationRequested) {

+                while (!token.IsCancellationRequested)
+                {

                     action();

                     await Task.Delay(TimeSpan.FromSeconds(seconds), token);

                 }

@@ -31,7 +35,7 @@
 

             string accessKey = "key";

             string accessSecret = "secret";

-            var credentials = new org.apache.rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();

+            var credentials = new Org.Apache.Rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();

             bool expired = credentials.expired();

 

             int workerThreads;

@@ -44,7 +48,8 @@
             ThreadPool.QueueUserWorkItem((Object stateInfo) =>

             {

                 Console.WriteLine("From ThreadPool");

-                if (stateInfo is Foo) {

+                if (stateInfo is Foo)
+                {

                     Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar);

                 }

             }, new Foo());

diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/AccessPoint.cs
similarity index 66%
rename from rocketmq-client-csharp/StaticNameServerResolver.cs
rename to rocketmq-client-csharp/AccessPoint.cs
index 9f97599..cf4e1f4 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/AccessPoint.cs
@@ -14,25 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public class StaticNameServerResolver : INameServerResolver
+    public class AccessPoint
     {
+        private string _host;
 
-        public StaticNameServerResolver(List<string> nameServerList)
+        public string Host
         {
-            this.nameServerList = nameServerList;
+            get { return _host; }
+            set { _host = value; }
         }
 
-        public async Task<List<string>> resolveAsync()
+        private int _port;
+
+        public int Port
         {
-            return nameServerList;
+            get { return _port; }
+            set { _port = value; }
         }
 
-        private List<string> nameServerList;
+        public string TargetUrl()
+        {
+            return $"https://{_host}:{_port}";
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/rocketmq-client-csharp/Address.cs b/rocketmq-client-csharp/Address.cs
deleted file mode 100644
index dadf346..0000000
--- a/rocketmq-client-csharp/Address.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-namespace org.apache.rocketmq {
-    public class Address {
-        public Address(string host, int port) {
-            this.host = host;
-            this.port = port;
-        }
-
-        private string host;
-        public string Host {
-            get { return host; }
-        }
-
-        private int port;
-        public int Port {
-            get { return port; }
-        }
-
-    }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Broker.cs b/rocketmq-client-csharp/Broker.cs
deleted file mode 100644
index 2f5f675..0000000
--- a/rocketmq-client-csharp/Broker.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace org.apache.rocketmq {
-    public class Broker : IComparable<Broker>, IEquatable<Broker> {
-
-        public Broker(string name, int id, ServiceAddress address) {
-            this.name = name;
-            this.id = id;
-            this.address = address;
-        }
-
-        private string name;
-        public string Name {
-            get { return name; }
-        }
-
-        private int id;
-        public int Id {
-            get { return id; }
-        }
-
-        private ServiceAddress address;
-        public ServiceAddress Address {
-            get { return address; }
-        }
-
-        /**
-         * Context aware primary target URL.
-         */
-        public string targetUrl()
-        {
-            var addr = address.Addresses[0];
-            return string.Format("https://{0}:{1}", addr.Host, addr.Port);
-        }
-
-        public int CompareTo(Broker other) {
-            if (0 != name.CompareTo(other.name)) {
-                return name.CompareTo(other.name);
-            }
-
-            return id.CompareTo(other.id);
-        }
-
-        public bool Equals(Broker other) {
-            return name.Equals(other.name) && id.Equals(other.id);
-        }
-
-        public override bool Equals(Object other) {
-            if (!(other is Broker)) {
-                return false;
-            }
-            return Equals(other as Broker);
-        }
-
-        public override int GetHashCode()
-        {
-            return HashCode.Combine(name, id);
-        }
-    }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index dac6d89..32dffae 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -19,113 +19,176 @@
 using System.Collections.Concurrent;
 using System.Threading.Tasks;
 using System.Threading;
+using System.Diagnostics;
 using System;
-using rmq = apache.rocketmq.v1;
+using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
+using NLog;
+using System.Diagnostics.Metrics;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
 
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public abstract class Client : ClientConfig, IClient
     {
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public Client(INameServerResolver resolver, string resourceNamespace)
+        protected Client(AccessPoint accessPoint, string resourceNamespace)
         {
-            this.nameServerResolver = resolver;
-            this.resourceNamespace_ = resourceNamespace;
-            this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace);
-            this.nameServerResolverCTS = new CancellationTokenSource();
+            _accessPoint = accessPoint;
 
-            this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
-            this.updateTopicRouteCTS = new CancellationTokenSource();
+            // Support IPv4 for now
+            AccessPointScheme = rmq::AddressScheme.Ipv4;
+            var serviceEndpoint = new rmq::Address();
+            serviceEndpoint.Host = accessPoint.Host;
+            serviceEndpoint.Port = accessPoint.Port;
+            AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
+
+            _resourceNamespace = resourceNamespace;
+
+            _clientSettings = new rmq::Settings();
+
+            _clientSettings.AccessPoint = new rmq::Endpoints();
+            _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+
+            _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+
+            _clientSettings.UserAgent = new rmq.UA();
+            _clientSettings.UserAgent.Language = rmq::Language.DotNet;
+            _clientSettings.UserAgent.Version = "5.0.0";
+            _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
+            _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+
+            Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+
+            _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
+            _updateTopicRouteCts = new CancellationTokenSource();
+
+            _healthCheckCts = new CancellationTokenSource();
+
+            telemetryCts_ = new CancellationTokenSource();
         }
 
-        public virtual void start()
+        public virtual async Task Start()
         {
             schedule(async () =>
             {
-                await updateNameServerList();
-            }, 30, nameServerResolverCTS.Token);
+                await UpdateTopicRoute();
 
-            schedule(async () =>
-            {
-                await updateTopicRoute();
+            }, 30, _updateTopicRouteCts.Token);
 
-            }, 30, updateTopicRouteCTS.Token);
+            // Get routes for topics of interest.
+            await UpdateTopicRoute();
 
+            string accessPointUrl = _accessPoint.TargetUrl();
+            createSession(accessPointUrl);
+
+            await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
+
+            await Heartbeat();
         }
 
-        public virtual void shutdown()
+        public virtual async Task Shutdown()
         {
-            updateTopicRouteCTS.Cancel();
-            nameServerResolverCTS.Cancel();
+            Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
+            _updateTopicRouteCts.Cancel();
+            telemetryCts_.Cancel();
+            await Manager.Shutdown();
         }
 
-        private async Task updateNameServerList()
+        protected string FilterBroker(Func<string, bool> acceptor)
         {
-            List<string> nameServers = await nameServerResolver.resolveAsync();
-            if (0 == nameServers.Count)
+            foreach (var item in _topicRouteTable)
             {
-                // Whoops, something should be wrong. We got an empty name server list.
-                return;
-            }
-
-            if (nameServers.Equals(this.nameServers))
-            {
-                return;
-            }
-
-            // Name server list is updated. 
-            // TODO: Locking is required
-            this.nameServers = nameServers;
-            this.currentNameServerIndex = 0;
-        }
-
-        private async Task updateTopicRoute()
-        {
-            if (null == nameServers || 0 == nameServers.Count)
-            {
-                List<string> list = await nameServerResolver.resolveAsync();
-                if (null != list && 0 != list.Count)
+                foreach (var partition in item.Value.MessageQueues)
                 {
-                    this.nameServers = list;
-                }
-                else
-                {
-                    // TODO: log warning here.
-                    return;
+                    string target = Utilities.TargetUrl(partition);
+                    if (acceptor(target))
+                    {
+                        return target;
+                    }
                 }
             }
+            return null;
+        }
 
-            // We got one or more name servers available.
-            string nameServer = nameServers[currentNameServerIndex];
+        /**
+         * Return all endpoints of brokers in route table.
+         */
+        private List<string> AvailableBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            foreach (var item in _topicRouteTable)
+            {
+                foreach (var partition in item.Value.MessageQueues)
+                {
+                    string endpoint = Utilities.TargetUrl(partition);
+                    if (!endpoints.Contains(endpoint))
+                    {
+                        endpoints.Add(endpoint);
+                    }
+                }
+            }
+            return endpoints;
+        }
+
+        private async Task UpdateTopicRoute()
+        {
+            HashSet<string> topics = new HashSet<string>();
+            foreach (var topic in topicsOfInterest_)
+            {
+                topics.Add(topic);
+            }
+
+            foreach (var item in _topicRouteTable)
+            {
+                topics.Add(item.Key);
+            }
+            Logger.Debug($"Fetch topic route for {topics.Count} topics");
+
+            // Wrap topics into list such that we can map async result to topic 
+            List<string> topicList = new List<string>();
+            topicList.AddRange(topics);
 
             List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
-            foreach (var item in topicRouteTable)
+            foreach (var item in topicList)
             {
-                tasks.Add(getRouteFor(item.Key, true));
+                tasks.Add(GetRouteFor(item, true));
             }
 
             // Update topic route data
             TopicRouteData[] result = await Task.WhenAll(tasks);
+            var i = 0;
             foreach (var item in result)
             {
                 if (null == item)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
+                    ++i;
                     continue;
                 }
 
-                if (0 == item.Partitions.Count)
+                if (0 == item.MessageQueues.Count)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
+                    ++i;
                     continue;
                 }
 
-                var topicName = item.Partitions[0].Topic.Name;
-                var existing = topicRouteTable[topicName];
+                var topicName = item.MessageQueues[0].Topic.Name;
+
+                // Make assertion
+                Debug.Assert(topicName.Equals(topicList[i]));
+
+                var existing = _topicRouteTable[topicName];
                 if (!existing.Equals(item))
                 {
-                    topicRouteTable[topicName] = item;
+                    _topicRouteTable[topicName] = item;
                 }
+                ++i;
             }
         }
 
@@ -154,74 +217,210 @@
          * direct
          *    Indicate if we should by-pass cache and fetch route entries from name server.
          */
-        public async Task<TopicRouteData> getRouteFor(string topic, bool direct)
+        public async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
         {
-            if (!direct && topicRouteTable.ContainsKey(topic))
+            if (!direct && _topicRouteTable.ContainsKey(topic))
             {
-                return topicRouteTable[topic];
-            }
-
-            if (null == nameServers || 0 == nameServers.Count)
-            {
-                List<string> list = await nameServerResolver.resolveAsync();
-                if (null != list && 0 != list.Count)
-                {
-                    this.nameServers = list;
-                }
-                else
-                {
-                    // TODO: log warning here.
-                    return null;
-                }
+                return _topicRouteTable[topic];
             }
 
             // We got one or more name servers available.
-            string nameServer = nameServers[currentNameServerIndex];
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace_;
+            request.Topic.ResourceNamespace = _resourceNamespace;
             request.Topic.Name = topic;
             request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
-            var address = new rmq::Address();
-            int pos = nameServer.LastIndexOf(':');
-            address.Host = nameServer.Substring(0, pos);
-            address.Port = Int32.Parse(nameServer.Substring(pos + 1));
-            request.Endpoints.Addresses.Add(address);
-            var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+            request.Endpoints.Scheme = AccessPointScheme;
+            foreach (var address in AccessPointEndpoints)
+            {
+                request.Endpoints.Addresses.Add(address);
+            }
+
             var metadata = new grpc.Metadata();
             Signature.sign(this, metadata);
-            var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
-            return topicRouteData;
+            int index = _random.Next(0, AccessPointEndpoints.Count);
+            var serviceEndpoint = AccessPointEndpoints[index];
+            // AccessPointAddresses.Count
+            string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+            TopicRouteData topicRouteData;
+            try
+            {
+                Logger.Debug($"Resolving route for topic={topic}");
+                topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
+                if (null != topicRouteData)
+                {
+                    Logger.Debug($"Got route entries for {topic} from name server");
+                    _topicRouteTable.TryAdd(topic, topicRouteData);
+                    return topicRouteData;
+                }
+                else
+                {
+                    Logger.Warn($"Failed to query route of {topic} from {target}");
+                }
+            }
+            catch (Exception e)
+            {
+                Logger.Warn(e, "Failed when querying route");
+            }
+
+            return null;
         }
 
-        public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
+        protected abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
 
-        public void heartbeat()
+        public async Task Heartbeat()
         {
-            List<string> endpoints = endpointsInUse();
+            List<string> endpoints = AvailableBrokerEndpoints();
             if (0 == endpoints.Count)
             {
+                Logger.Debug("No broker endpoints available in topic route");
                 return;
             }
 
-            var heartbeatRequest = new rmq::HeartbeatRequest();
-            prepareHeartbeatData(heartbeatRequest);
+            var request = new rmq::HeartbeatRequest();
+            PrepareHeartbeatData(request);
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
+
+            List<Task> tasks = new List<Task>();
+            foreach (var endpoint in endpoints)
+            {
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
+            }
+
+            await Task.WhenAll(tasks);
         }
 
-        public void healthCheck()
+        private List<string> BlockedBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            return endpoints;
+        }
+
+        private void RemoveFromBlockList(string endpoint)
         {
 
         }
 
-        public async Task<bool> notifyClientTermination()
+        protected async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group)
         {
-            List<string> endpoints = endpointsInUse();
+            // Pick a broker randomly
+            string target = FilterBroker((s) => true);
+            var request = new rmq::QueryAssignmentRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = AccessPointScheme;
+            foreach (var endpoint in AccessPointEndpoints)
+            {
+                request.Endpoints.Addresses.Add(endpoint);
+            }
+            try
+            {
+                var metadata = new grpc::Metadata();
+                Signature.sign(this, metadata);
+                return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
+            }
+            catch (System.Exception e)
+            {
+                Logger.Warn(e, $"Failed to acquire load assignments from {target}");
+            }
+            // Just return an empty list.
+            return new List<rmq.Assignment>();
+        }
+
+        private string TargetUrl(rmq::Assignment assignment)
+        {
+            var broker = assignment.MessageQueue.Broker;
+            var addresses = broker.Endpoints.Addresses;
+            // TODO: use the first address for now. 
+            var address = addresses[0];
+            return $"https://{address.Host}:{address.Port}";
+        }
+
+        public virtual void BuildClientSetting(rmq::Settings settings)
+        {
+            settings.MergeFrom(_clientSettings);
+        }
+
+        public void createSession(string url)
+        {
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            var stream = Manager.Telemetry(url, metadata);
+            var session = new Session(url, stream, this);
+            _sessions.TryAdd(url, session);
+            Task.Run(async () =>
+            {
+                await session.Loop();
+            });
+        }
+
+
+        public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+        {
+            var targetUrl = TargetUrl(assignment);
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            var request = new rmq::ReceiveMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.MessageQueue = assignment.MessageQueue;
+            var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+            return messages;
+        }
+
+        public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
+        {
+            var request = new rmq::AckMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            var entry = new rmq::AckMessageEntry();
+            entry.ReceiptHandle = receiptHandle;
+            entry.MessageId = messageId;
+            request.Entries.Add(entry);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            return await Manager.Ack(target, metadata, request, RequestTimeout);
+        }
+
+        public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
+        {
+            var request = new rmq::ChangeInvisibleDurationRequest();
+            request.ReceiptHandle = receiptHandle;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            request.MessageId = messageId;
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
+        }
+
+        public async Task<bool> NotifyClientTermination()
+        {
+            List<string> endpoints = AvailableBrokerEndpoints();
             var request = new rmq::NotifyClientTerminationRequest();
-            request.ClientId = clientId();
+
 
             var metadata = new grpc.Metadata();
             Signature.sign(this, metadata);
@@ -230,7 +429,7 @@
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(clientManager.notifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -244,19 +443,53 @@
             return true;
         }
 
-        private List<string> endpointsInUse()
+        public virtual void OnSettingsReceived(rmq::Settings settings)
         {
-            //TODO: gather endpoints from route entries.
-            return new List<string>();
+            if (null != settings.Metric)
+            {
+                _clientSettings.Metric = new rmq::Metric();
+                _clientSettings.Metric.MergeFrom(settings.Metric);
+            }
+
+            if (null != settings.BackoffPolicy)
+            {
+                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+            }
         }
 
-        protected IClientManager clientManager;
-        private INameServerResolver nameServerResolver;
-        private CancellationTokenSource nameServerResolverCTS;
-        private List<string> nameServers;
-        private int currentNameServerIndex;
+        protected readonly IClientManager Manager;
 
-        private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
-        private CancellationTokenSource updateTopicRouteCTS;
+        private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>();
+
+        public void AddTopicOfInterest(string topic)
+        {
+            topicsOfInterest_.Add(topic);
+        }
+
+        private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
+        private readonly CancellationTokenSource _updateTopicRouteCts;
+
+        private readonly CancellationTokenSource _healthCheckCts;
+
+        private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource();
+
+        public CancellationTokenSource TelemetryCts()
+        {
+            return telemetryCts_;
+        }
+
+        protected readonly AccessPoint _accessPoint;
+
+        // This field is subject changes from servers.
+        protected readonly rmq::Settings _clientSettings;
+
+        private readonly Random _random = new Random();
+        
+        protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+
+        public static readonly string MeterName = "Apache.RocketMQ.Client";
+        
+        protected static readonly Meter MetricMeter = new(MeterName, "1.0");
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 949f8b4..0d99cb1 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -15,101 +15,134 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
-    public class ClientConfig : IClientConfig {
+    public class ClientConfig : IClientConfig
+    {
 
-        public ClientConfig() {
+        public ClientConfig()
+        {
             var hostName = System.Net.Dns.GetHostName();
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
             this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
-            this.ioTimeout_ = TimeSpan.FromSeconds(3);
-            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
+            this._requestTimeout = TimeSpan.FromSeconds(3);
+            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
+            this.client_type_ = rmq::ClientType.Unspecified;
+            this.access_point_ = new rmq::Endpoints();
+            this.back_off_policy_ = new rmq::RetryPolicy();
+            this._publishing = new Publishing();
         }
 
-        public string region() {
-            return region_;
+        public string region()
+        {
+            return _region;
         }
-        public string Region {
-            set { region_ = value; }
+        public string Region
+        {
+            set { _region = value; }
         }
 
-        public string serviceName() {
-            return serviceName_;
+        public string serviceName()
+        {
+            return _serviceName;
         }
-        public string ServiceName {
-            set { serviceName_ = value; }
+        public string ServiceName
+        {
+            set { _serviceName = value; }
         }
 
-        public string resourceNamespace() {
-            return resourceNamespace_;
+        public string resourceNamespace()
+        {
+            return _resourceNamespace;
         }
-        public string ResourceNamespace {
-            set { resourceNamespace_ = value; }
+        public string ResourceNamespace
+        {
+            get { return _resourceNamespace; }
+            set { _resourceNamespace = value; }
         }
 
-        public ICredentialsProvider credentialsProvider() {
+        public ICredentialsProvider credentialsProvider()
+        {
             return credentialsProvider_;
         }
-        
-        public ICredentialsProvider CredentialsProvider {
+
+        public ICredentialsProvider CredentialsProvider
+        {
             set { credentialsProvider_ = value; }
         }
 
-        public string tenantId() {
-            return tenantId_;
+        public string tenantId()
+        {
+            return _tenantId;
         }
-        public string TenantId {
-            set { tenantId_ = value; }
+        public string TenantId
+        {
+            set { _tenantId = value; }
         }
 
-        public TimeSpan getIoTimeout() {
-            return ioTimeout_;
-        }
-        public TimeSpan IoTimeout {
-            set { ioTimeout_ = value; }
+        public TimeSpan RequestTimeout
+        {
+            get
+            {
+                return _requestTimeout;
+            }
+            set
+            {
+                _requestTimeout = value;
+            }
         }
 
-        public TimeSpan getLongPollingTimeout() {
+        public TimeSpan getLongPollingTimeout()
+        {
             return longPollingIoTimeout_;
         }
-        public TimeSpan LongPollingTimeout {
+        public TimeSpan LongPollingTimeout
+        {
             set { longPollingIoTimeout_ = value; }
         }
 
-        public string getGroupName() {
+        public string getGroupName()
+        {
             return groupName_;
         }
-        public string GroupName {
+        public string GroupName
+        {
             set { groupName_ = value; }
         }
 
-        public string clientId() {
+        public string clientId()
+        {
             return clientId_;
         }
 
-        public bool isTracingEnabled() {
+        public bool isTracingEnabled()
+        {
             return tracingEnabled_;
         }
-        public bool TracingEnabled {
+        public bool TracingEnabled
+        {
             set { tracingEnabled_ = value; }
         }
 
-        public void setInstanceName(string instanceName) {
+        public void setInstanceName(string instanceName)
+        {
             this.instanceName_ = instanceName;
         }
 
-        private string region_ = "cn-hangzhou";
-        private string serviceName_ = "ONS";
+        private string _region = "cn-hangzhou";
+        private string _serviceName = "ONS";
 
-        protected string resourceNamespace_;
+        protected string _resourceNamespace;
 
         private ICredentialsProvider credentialsProvider_;
 
-        private string tenantId_;
+        private string _tenantId;
 
-        private TimeSpan ioTimeout_;
+        private TimeSpan _requestTimeout;
 
         private TimeSpan longPollingIoTimeout_;
 
@@ -120,6 +153,53 @@
         private bool tracingEnabled_ = false;
 
         private string instanceName_ = "default";
+
+        private rmq::ClientType client_type_;
+        public rmq::ClientType ClientType
+        {
+            get { return client_type_; }
+            set { client_type_ = value; }
+        }
+
+
+        private rmq::Endpoints access_point_;
+
+        public rmq::AddressScheme AccessPointScheme
+        {
+            get { return access_point_.Scheme; }
+            set { access_point_.Scheme = value; }
+        }
+
+        public List<rmq::Address> AccessPointEndpoints
+        {
+            get
+            {
+                List<rmq::Address> addresses = new List<rmq::Address>();
+                foreach (var item in access_point_.Addresses)
+                {
+                    addresses.Add(item);
+                }
+                return addresses;
+            }
+
+            set
+            {
+                access_point_.Addresses.Clear();
+                foreach (var item in value)
+                {
+                    access_point_.Addresses.Add(item);
+                }
+            }
+        }
+
+        private rmq::RetryPolicy back_off_policy_;
+
+        private Publishing _publishing;
+        public Publishing Publishing
+        {
+            get { return _publishing; }
+        }
+
     }
 
 }
diff --git a/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 59ec0f2..01adddc 100644
--- a/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -18,11 +18,15 @@
 using System.Threading.Tasks;
 using Grpc.Core;
 using Grpc.Core.Interceptors;
+using NLog;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class ClientLoggerInterceptor : Interceptor
     {
+
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
         public override TResponse BlockingUnaryCall<TRequest, TResponse>(
             TRequest request,
             ClientInterceptorContext<TRequest, TResponse> context,
@@ -52,19 +56,12 @@
             try
             {
                 var response = await t;
-                Console.WriteLine($"Response received: {response}");
+                Logger.Debug($"Response received: {response}");
                 return response;
             }
             catch (Exception ex)
             {
-                // Log error to the console.
-                // Note: Configuring .NET Core logging is the recommended way to log errors
-                // https://docs.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
-                var initialColor = Console.ForegroundColor;
-                Console.ForegroundColor = ConsoleColor.Red;
-                Console.WriteLine($"Call error: {ex.Message}");
-                Console.ForegroundColor = initialColor;
-
+                Logger.Error($"Call error: {ex.Message}");
                 throw;
             }
         }
@@ -104,10 +101,7 @@
             where TRequest : class
             where TResponse : class
         {
-            var initialColor = Console.ForegroundColor;
-            Console.ForegroundColor = ConsoleColor.Green;
-            Console.WriteLine($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
-            Console.ForegroundColor = initialColor;
+            Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
         }
 
         private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 59fec83..a39a0e0 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -15,168 +15,310 @@
  * limitations under the License.
  */
 
-using System.Collections.Concurrent;
-
-using rmq = global::apache.rocketmq.v1;
-using Grpc.Net.Client;
+using rmq = Apache.Rocketmq.V2;
 using System;
+using System.IO;
+using System.IO.Compression;
 using System.Threading;
 using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using System.Collections.Generic;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
+using System.Security.Cryptography;
+using NLog;
 
-namespace org.apache.rocketmq {
-    public class ClientManager : IClientManager {
+namespace Org.Apache.Rocketmq
+{
+    public class ClientManager : IClientManager
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public ClientManager() {
-            rpcClients = new ConcurrentDictionary<string, RpcClient>();
+        public ClientManager()
+        {
+            _rpcClients = new Dictionary<string, RpcClient>();
+            _clientLock = new ReaderWriterLockSlim();
         }
 
-        public IRpcClient getRpcClient(string target) {
-            if (!rpcClients.ContainsKey(target)) {
-                var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions {
-                    HttpHandler = createHttpHandler()
-                });
-                var invoker = channel.Intercept(new ClientLoggerInterceptor());
-                var client = new rmq::MessagingService.MessagingServiceClient(invoker);
-                var rpcClient = new RpcClient(client);
-                if(rpcClients.TryAdd(target, rpcClient)) {
-                    return rpcClient;
+        public IRpcClient GetRpcClient(string target)
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                // client exists, return in advance.
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
                 }
             }
-            return rpcClients[target];
-        }
-
-        /**
-         * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
-         * why parameters are configured this way.
-         */
-        public static HttpMessageHandler createHttpHandler()
-        {
-            var sslOptions = new System.Net.Security.SslClientAuthenticationOptions();
-            // Disable server certificate validation during development phase.
-            // Comment out the following line if server certificate validation is required. 
-            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
-            var handler = new SocketsHttpHandler
+            finally
             {
-                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
-                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
-                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
-                EnableMultipleHttp2Connections = true,
-                SslOptions = sslOptions,
-            };
-            return handler;
+                _clientLock.ExitReadLock();
+            }
+
+            _clientLock.EnterWriteLock();
+            try
+            {
+                // client exists, return in advance.
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
+                }
+
+                // client does not exist, generate a new one
+                var client = new RpcClient(target);
+                _rpcClients.Add(target, client);
+                return client;
+            }
+            finally
+            {
+                _clientLock.ExitWriteLock();
+            }
         }
 
-        public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+        public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var queryRouteResponse = await rpcClient.queryRoute(request, callOptions);
+            var rpcClient = GetRpcClient(target);
+            return rpcClient.Telemetry(metadata);
+        }
 
-            if (queryRouteResponse.Common.Status.Code != ((int)Google.Rpc.Code.Ok)) {
+        public async Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata,
+            rmq::QueryRouteRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            Logger.Debug($"QueryRouteRequest: {request}");
+            var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
+
+            if (queryRouteResponse.Status.Code != rmq::Code.Ok)
+            {
+                Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status}");
                 // Raise an application layer exception
-
             }
+            Logger.Debug($"QueryRouteResponse: {queryRouteResponse}");
 
-            var partitions = new List<Partition>();
-            // Translate protobuf object to domain specific one
-            foreach (var partition in queryRouteResponse.Partitions)
+            var messageQueues = new List<rmq::MessageQueue>();
+            foreach (var messageQueue in queryRouteResponse.MessageQueues)
             {
-                var topic = new Topic(partition.Topic.ResourceNamespace, partition.Topic.Name);
-                var id = partition.Id;
-                Permission permission = Permission.READ_WRITE;
-                switch (partition.Permission) {
-                    case rmq::Permission.None:
-                    {
-                        permission = Permission.NONE;
-                        break;
-                    }
-                    case rmq::Permission.Read:
-                    {
-                        permission = Permission.READ;
-                        break;
-                    }
-                    case rmq::Permission.Write:
-                    {
-                        permission = Permission.WRITE;
-                        break;
-                    }
-                    case rmq::Permission.ReadWrite:
-                    {
-                        permission = Permission.READ_WRITE;
-                        break;
-                    }
-                }
-
-                AddressScheme scheme = AddressScheme.IPv4;
-                switch(partition.Broker.Endpoints.Scheme) {
-                    case rmq::AddressScheme.Ipv4:
-                        {
-                        scheme = AddressScheme.IPv4;
-                        break;
-                    }
-                    case rmq::AddressScheme.Ipv6:
-                        {
-                        scheme = AddressScheme.IPv6;
-                        break;
-                    }
-                    case rmq::AddressScheme.DomainName:
-                        {
-                        scheme = AddressScheme.DOMAIN_NAME;
-                        break;
-                    }
-                }
-
-                List<Address> addresses = new List<Address>();
-                foreach(var item in partition.Broker.Endpoints.Addresses) {
-                    addresses.Add(new Address(item.Host, item.Port));
-                }
-                ServiceAddress serviceAddress = new ServiceAddress(scheme, addresses);
-                Broker broker = new Broker(partition.Broker.Name, id, serviceAddress);
-                partitions.Add(new Partition(topic, broker, id, permission));
+                messageQueues.Add(messageQueue);
             }
-
-            var topicRouteData = new TopicRouteData(partitions);
+            var topicRouteData = new TopicRouteData(messageQueues);
             return topicRouteData;
         }
 
-        public async Task<Boolean> heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+        public async Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request,
+            TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc.CallOptions(metadata, deadline);
-            var response = await rpcClient.heartbeat(request, callOptions);
-            if (null == response)
-            {
-                return false;
-            }
-
-            return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+            var rpcClient = GetRpcClient(target);
+            Logger.Debug($"Heartbeat to {target}, Request: {request}");
+            var response = await rpcClient.Heartbeat(metadata, request, timeout);
+            Logger.Debug($"Heartbeat to {target} response status: {response.Status}");
+            return response.Status.Code == rmq::Code.Ok;
         }
 
-        public async Task<rmq::SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
+        public async Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata,
+            rmq::SendMessageRequest request, TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = await rpcClient.sendMessage(request, callOptions);
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.SendMessage(metadata, request, timeout);
             return response;
         }
 
-        public async Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        public async Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata,
+            rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            rmq::NotifyClientTerminationResponse response = await rpcClient.notifyClientTermination(request, callOptions);
-            return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+            var rpcClient = GetRpcClient(target);
+            rmq::NotifyClientTerminationResponse response =
+                await rpcClient.NotifyClientTermination(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
         }
 
-        private ConcurrentDictionary<string, RpcClient> rpcClients;
+        public async Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            rmq::QueryAssignmentResponse response = await rpcClient.QueryAssignment(metadata, request, timeout);
+            if (response.Status.Code != rmq::Code.Ok)
+            {
+                // TODO: Build exception hierarchy
+                throw new Exception($"Failed to query load assignment from server. Cause: {response.Status.Message}");
+            }
 
+            List<rmq::Assignment> assignments = new List<rmq.Assignment>();
+            foreach (var item in response.Assignments)
+            {
+                assignments.Add(item);
+            }
+            return assignments;
+        }
+
+        public async Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, 
+            rmq::ReceiveMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            List<rmq::ReceiveMessageResponse> response = await rpcClient.ReceiveMessage(metadata, request, timeout);
+
+            if (null == response || 0 == response.Count)
+            {
+                // TODO: throw an exception to propagate this error?
+                return new List<Message>();
+            }
+
+            List<Message> messages = new List<Message>();
+            
+            foreach (var entry in response)
+            {
+                switch (entry.ContentCase)
+                {
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.None:
+                    {
+                        Logger.Warn("Unexpected ReceiveMessageResponse content type");
+                        break;
+                    }
+                    
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.Status:
+                    {
+                        switch (entry.Status.Code)
+                        {
+                            case rmq.Code.Ok:
+                            {
+                                break;
+                            }
+
+                            case rmq.Code.Forbidden:
+                            {
+                                Logger.Warn("Receive message denied");
+                                break;
+                            }
+                            case rmq.Code.TooManyRequests:
+                            {
+                                Logger.Warn("TooManyRequest: servers throttled");
+                                break;
+                            }
+                            default:
+                            {
+                                Logger.Warn("Unknown error status");
+                                break;
+                            }
+                        }
+                        break;
+                    }
+
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.Message:
+                    {
+                        var message = Convert(target, entry.Message);
+                        messages.Add(message);
+                        break;
+                    }
+
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
+                    {
+                        var begin = entry.DeliveryTimestamp;
+                        var costs = DateTime.UtcNow - begin.ToDateTime();
+                        // TODO: Collect metrics
+                        break;
+                    }
+                }
+            }
+            return messages;
+        }
+
+        private Message Convert(string sourceHost, rmq::Message message)
+        {
+            var msg = new Message();
+            msg.Topic = message.Topic.Name;
+            msg.MessageId = message.SystemProperties.MessageId;
+            msg.Tag = message.SystemProperties.Tag;
+
+            // Validate message body checksum
+            byte[] raw = message.Body.ToByteArray();
+            if (rmq::DigestType.Crc32 == message.SystemProperties.BodyDigest.Type)
+            {
+                uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(checksum.ToString("X")))
+                {
+                    msg._checksumVerifiedOk = false;
+                }
+            }
+            else if (rmq::DigestType.Md5 == message.SystemProperties.BodyDigest.Type)
+            {
+                var checksum = MD5.HashData(raw);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+                {
+                    msg._checksumVerifiedOk = false;
+                }
+            }
+            else if (rmq::DigestType.Sha1 == message.SystemProperties.BodyDigest.Type)
+            {
+                var checksum = SHA1.HashData(raw);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+                {
+                    msg._checksumVerifiedOk = false;
+                }
+            }
+
+            foreach (var entry in message.UserProperties)
+            {
+                msg.UserProperties.Add(entry.Key, entry.Value);
+            }
+
+            msg._receiptHandle = message.SystemProperties.ReceiptHandle;
+            msg._sourceHost = sourceHost;
+
+            foreach (var key in message.SystemProperties.Keys)
+            {
+                msg.Keys.Add(key);
+            }
+
+            msg.DeliveryAttempt = message.SystemProperties.DeliveryAttempt;
+
+            if (message.SystemProperties.BodyEncoding == rmq::Encoding.Gzip)
+            {
+                // Decompress/Inflate message body
+                var inputStream = new MemoryStream(message.Body.ToByteArray());
+                var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress);
+                var outputStream = new MemoryStream();
+                gzipStream.CopyTo(outputStream);
+                msg.Body = outputStream.ToArray();
+            }
+            else
+            {
+                msg.Body = message.Body.ToByteArray();
+            }
+
+            return msg;
+        }
+
+        public async Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.AckMessage(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
+        }
+
+        public async Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.ChangeInvisibleDuration(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
+        }
+
+        public async Task Shutdown()
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                List<Task> tasks = new List<Task>();
+                foreach (var item in _rpcClients)
+                {
+                    tasks.Add(item.Value.Shutdown());
+                }
+
+                await Task.WhenAll(tasks);
+            }
+            finally
+            {
+                _clientLock.ExitReadLock();
+            }
+        }
+
+        private readonly Dictionary<string, RpcClient> _rpcClients;
+        private readonly ReaderWriterLockSlim _clientLock;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManagerFactory.cs b/rocketmq-client-csharp/ClientManagerFactory.cs
index 3ca211d..9d03994 100644
--- a/rocketmq-client-csharp/ClientManagerFactory.cs
+++ b/rocketmq-client-csharp/ClientManagerFactory.cs
@@ -18,7 +18,7 @@
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public sealed class ClientManagerFactory
     {
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 1381b3f..39dfd7e 100644
--- a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -19,36 +19,46 @@
 using System.Text.Json;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     /**
      * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
      * A sample config content is as follows:
      * {"AccessKey": "key", "AccessSecret": "secret"}
      */
-    public class ConfigFileCredentialsProvider : ICredentialsProvider {
+    public class ConfigFileCredentialsProvider : ICredentialsProvider
+    {
 
-        public ConfigFileCredentialsProvider() {
+        public ConfigFileCredentialsProvider()
+        {
             var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
             string configFileRelativePath = "/.rocketmq/config";
-            if (!File.Exists(home + configFileRelativePath)) {
+            if (!File.Exists(home + configFileRelativePath))
+            {
                 return;
             }
 
-            try {
-                using (var reader = new StreamReader(home + configFileRelativePath)) {
+            try
+            {
+                using (var reader = new StreamReader(home + configFileRelativePath))
+                {
                     string json = reader.ReadToEnd();
                     var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
                     accessKey = kv["AccessKey"];
                     accessSecret = kv["AccessSecret"];
                     valid = true;
-                } 
-            } catch (IOException e) {
+                }
+            }
+            catch (IOException)
+            {
             }
         }
 
-        public Credentials getCredentials() {
-            if (!valid) {
+        public Credentials getCredentials()
+        {
+            if (!valid)
+            {
                 return null;
             }
 
diff --git a/rocketmq-client-csharp/Credentials.cs b/rocketmq-client-csharp/Credentials.cs
index 2da9581..a73b000 100644
--- a/rocketmq-client-csharp/Credentials.cs
+++ b/rocketmq-client-csharp/Credentials.cs
@@ -17,27 +17,34 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class Credentials {
+namespace Org.Apache.Rocketmq
+{
+    public class Credentials
+    {
 
-        public Credentials(string accessKey, string accessSecret) {
+        public Credentials(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) {
+        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
             this.sessionToken = sessionToken;
             this.expirationInstant = expirationInstant;
         }
 
-        public bool empty() {
+        public bool empty()
+        {
             return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret);
         }
 
-        public bool expired() {
-            if (DateTime.MinValue == expirationInstant) {
+        public bool expired()
+        {
+            if (DateTime.MinValue == expirationInstant)
+            {
                 return false;
             }
 
@@ -45,17 +52,20 @@
         }
 
         private string accessKey;
-        public string AccessKey {
+        public string AccessKey
+        {
             get { return accessKey; }
         }
-        
+
         private string accessSecret;
-        public string AccessSecret {
+        public string AccessSecret
+        {
             get { return accessSecret; }
         }
 
         private string sessionToken;
-        public string SessionToken {
+        public string SessionToken
+        {
             get { return sessionToken; }
         }
 
diff --git a/rocketmq-client-csharp/Permission.cs b/rocketmq-client-csharp/ExpressionType.cs
similarity index 88%
rename from rocketmq-client-csharp/Permission.cs
rename to rocketmq-client-csharp/ExpressionType.cs
index 659c15b..0caaf8e 100644
--- a/rocketmq-client-csharp/Permission.cs
+++ b/rocketmq-client-csharp/ExpressionType.cs
@@ -14,10 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+namespace Org.Apache.Rocketmq
+{
 
-public enum Permission {
-    NONE,
-    READ,
-    WRITE,
-    READ_WRITE,
+    public enum ExpressionType
+    {
+        TAG,
+        SQL92,
+    }
+
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/FilterExpression.cs
similarity index 72%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/FilterExpression.cs
index 568098f..3bd432d 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/FilterExpression.cs
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public interface INameServerResolver
+    public class FilterExpression
     {
-        Task<List<string>> resolveAsync();
+        public FilterExpression(string expression, ExpressionType type)
+        {
+            Expression = expression;
+            Type = type;
+        }
+
+        public ExpressionType Type { get; }
+        public string Expression { get; }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 7f3ed64..3352028 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,17 +16,24 @@
  */
 
 using System.Threading.Tasks;
+using System.Threading;
+using System;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public interface IClient : IClientConfig
     {
 
-        void heartbeat();
+        Task Heartbeat();
 
-        void healthCheck();
+        Task<bool> NotifyClientTermination();
 
-        Task<bool> notifyClientTermination();
+        void BuildClientSetting(rmq::Settings settings);
 
+
+        void OnSettingsReceived(rmq::Settings settings);
+
+        CancellationTokenSource TelemetryCts();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index b83006c..438d7a8 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -16,8 +16,10 @@
  */
 using System;
 
-namespace org.apache.rocketmq {
-    public interface IClientConfig {
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientConfig
+    {
         string region();
 
         string serviceName();
@@ -28,8 +30,6 @@
 
         string tenantId();
 
-        TimeSpan getIoTimeout();
-
         TimeSpan getLongPollingTimeout();
 
         string getGroupName();
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 08ed86a..afccfde 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -15,22 +15,37 @@
  * limitations under the License.
  */
 
-using apache.rocketmq.v1;
 using System.Threading.Tasks;
 using System;
+using System.Collections.Generic;
 using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
-    public interface IClientManager {
-        IRpcClient getRpcClient(string target);
 
-        Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientManager
+    {
+        IRpcClient GetRpcClient(string target);
 
-        Task<Boolean> heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+        grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
 
-        Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
+        Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout);
 
-        Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+        Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
 
+        Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout);
+
+        Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout);
+
+        Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout);
+
+        Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, rmq::ReceiveMessageRequest request, TimeSpan timeout);
+
+        Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout);
+
+        Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout);
+
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/AddressScheme.cs b/rocketmq-client-csharp/IConsumer.cs
similarity index 84%
rename from rocketmq-client-csharp/AddressScheme.cs
rename to rocketmq-client-csharp/IConsumer.cs
index 3e95b09..2ad0dab 100644
--- a/rocketmq-client-csharp/AddressScheme.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,10 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public enum AddressScheme {
-        IPv4,
-        IPv6,
-        DOMAIN_NAME,
+
+using System.Threading.Tasks;
+namespace Org.Apache.Rocketmq
+{
+    public interface IConsumer
+    {
+        Task Start();
+
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ICredentialsProvider.cs b/rocketmq-client-csharp/ICredentialsProvider.cs
index 6e7112e..1fb892b 100644
--- a/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public interface ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public interface ICredentialsProvider
+    {
         Credentials getCredentials();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/IMessageListener.cs
similarity index 86%
rename from rocketmq-client-csharp/INameServerResolver.cs
rename to rocketmq-client-csharp/IMessageListener.cs
index 568098f..f46efd5 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/IMessageListener.cs
@@ -14,14 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
+
 using System.Collections.Generic;
 using System.Threading.Tasks;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public interface INameServerResolver
+
+    public interface IMessageListener
     {
-        Task<List<string>> resolveAsync();
+        Task Consume(List<Message> messages, List<Message> failed);
+
     }
+
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 89f8955..420af20 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -14,16 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
+
 using System.Threading.Tasks;
 
-namespace org.apache.rocketmq {
-    public interface IProducer {
-        void start();
+namespace Org.Apache.Rocketmq
+{
+    public interface IProducer
+    {
+        Task Start();
 
-        void shutdown();
+        Task Shutdown();
 
-        Task<SendResult> send(Message message);
-        
+        Task<SendReceipt> Send(Message message);
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index 0590bb0..146d4f7 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -14,21 +14,43 @@
  * See the License for the specific language governing permissions and

  * limitations under the License.

  */

-using System;

-using System.Threading.Tasks;

-using apache.rocketmq.v1;

-using grpc = global::Grpc.Core;

 

-namespace org.apache.rocketmq

+using System;

+using System.Collections.Generic;

+using System.Threading.Tasks;

+using Apache.Rocketmq.V2;

+using Grpc.Core;

+

+namespace Org.Apache.Rocketmq

 {

     public interface IRpcClient

     {

-        Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);

+        AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Metadata metadata);

 

-        Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions);

+        Task<QueryRouteResponse> QueryRoute(Metadata metadata, QueryRouteRequest request, TimeSpan timeout);

 

-        Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions);

+        Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest request, TimeSpan timeout);

 

-        Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions);

+        Task<SendMessageResponse> SendMessage(Metadata metadata, SendMessageRequest request, TimeSpan timeout);

+

+        Task<QueryAssignmentResponse> QueryAssignment(Metadata metadata, QueryAssignmentRequest request,

+            TimeSpan timeout);

+

+        Task<List<ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, ReceiveMessageRequest request, TimeSpan timeout);

+

+        Task<AckMessageResponse> AckMessage(Metadata metadata, AckMessageRequest request, TimeSpan timeout);

+

+        Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, ChangeInvisibleDurationRequest request, TimeSpan timeout);

+

+        Task<ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,

+            ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout);

+

+        Task<EndTransactionResponse> EndTransaction(Metadata metadata, EndTransactionRequest request, TimeSpan timeout);

+

+

+        Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,

+            NotifyClientTerminationRequest request, TimeSpan timeout);

+

+        Task Shutdown();

     }

-}

+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 1e6ee0e..b527311 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -14,79 +14,111 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
 using System.Collections.Generic;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
 {
 
-    public class Message {
-        public Message() : this(null, null) {
+    public class Message
+    {
+        public Message() : this(null, null)
+        {
         }
 
-        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
 
-        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
+        {
         }
 
-        public Message(string topic, string tag, List<string> keys, byte[] body) {
-            this.messageId = SequenceGenerator.Instance.Next();
-            this.maxAttemptTimes = 3;
-            this.topic = topic;
-            this.tag = tag;
-            this.keys = keys;
-            this.body = body;
-            this.userProperties = new Dictionary<string, string>();
-            this.systemProperties = new Dictionary<string, string>();
+        public Message(string topic, string tag, List<string> keys, byte[] body)
+        {
+            MessageId = SequenceGenerator.Instance.Next();
+            MaxAttemptTimes = 3;
+            Topic = topic;
+            Tag = tag;
+            Keys = keys;
+            Body = body;
+            UserProperties = new Dictionary<string, string>();
+            DeliveryTimestamp = DateTime.MinValue;
         }
 
-        private string messageId;
         public string MessageId
         {
-            get { return messageId; }
+            get;
+            internal set;
+        }
+        
+        public string Topic
+        {
+            get;
+            set;
         }
 
-        private string topic;
-
-        public string Topic {
-            get { return topic; }
-            set { this.topic = value; }
+        public byte[] Body
+        {
+            get;
+            set;
         }
 
-        private byte[] body;
-        public byte[] Body {
-            get { return body; }
-            set { this.body = value; }
+        public string Tag
+        {
+            get;
+            set;
         }
 
-        private string tag;
-        public string Tag {
-            get { return tag; }
-            set { this.tag = value; }
+        public List<string> Keys
+        {
+            get;
+            set;
         }
 
-        private List<string> keys;
-        public List<string> Keys{
-            get { return keys; }
-            set { this.keys = value; }
+        public Dictionary<string, string> UserProperties
+        {
+            get;
+            set;
         }
 
-        private Dictionary<string, string> userProperties;
-        public Dictionary<string, string> UserProperties {
-            get { return userProperties; }
-            set { this.userProperties = value; }
-        }
-
-        private Dictionary<string, string> systemProperties;
-        internal Dictionary<string, string> SystemProperties {
-            get { return systemProperties; }
-            set { this.systemProperties = value; }
-        }
-
-        private int maxAttemptTimes;
         public int MaxAttemptTimes
         {
-            get { return maxAttemptTimes; }
-            set { maxAttemptTimes = value; }
+            get;
+            set;
         }
+
+
+        public DateTime DeliveryTimestamp
+        {
+            get;
+            set;
+        }
+        
+        public int DeliveryAttempt
+        {
+            get;
+            internal set;
+        }
+        
+        public string MessageGroup
+        {
+            get;
+            set;
+        }
+        
+        public bool Fifo()
+        {
+            return !String.IsNullOrEmpty(MessageGroup);
+        }
+
+        public bool Scheduled()
+        {
+            return DeliveryTimestamp > DateTime.UtcNow;
+        }
+
+        internal bool _checksumVerifiedOk = true;
+        internal string _receiptHandle;
+        internal string _sourceHost;
     }
 
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/MessageException.cs
similarity index 81%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/MessageException.cs
index 568098f..7ef10df 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/MessageException.cs
@@ -14,14 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
 
-namespace org.apache.rocketmq
+using System;
+
+namespace Org.Apache.Rocketmq
 {
-    public interface INameServerResolver
+    [Serializable]
+    public class MessageException : Exception
     {
-        Task<List<string>> resolveAsync();
+        public MessageException(string message) : base(message)
+        {
+        }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MessageIdGenerator.cs b/rocketmq-client-csharp/MessageIdGenerator.cs
index 8af1fda..8dc370d 100644
--- a/rocketmq-client-csharp/MessageIdGenerator.cs
+++ b/rocketmq-client-csharp/MessageIdGenerator.cs
@@ -20,7 +20,7 @@
 using System.IO;
 using System.Threading;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * MessageId generate rules refer: https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o
diff --git a/rocketmq-client-csharp/MessageType.cs b/rocketmq-client-csharp/MessageType.cs
index 376b658..a459e93 100644
--- a/rocketmq-client-csharp/MessageType.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public enum MessageType {
+    public enum MessageType
+    {
         Normal,
         Fifo,
         Delay,
         Transaction,
     }
-    
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MetadataConstants.cs b/rocketmq-client-csharp/MetadataConstants.cs
index 184bec8..5381595 100644
--- a/rocketmq-client-csharp/MetadataConstants.cs
+++ b/rocketmq-client-csharp/MetadataConstants.cs
@@ -17,8 +17,10 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class MetadataConstants {
+namespace Org.Apache.Rocketmq
+{
+    public class MetadataConstants
+    {
         public const string TENANT_ID_KEY = "x-mq-tenant-id";
         public const string NAMESPACE_KEY = "x-mq-namespace";
         public const string AUTHORIZATION = "authorization";
@@ -33,5 +35,7 @@
         public const string CLIENT_VERSION_KEY = "x-mq-client-version";
         public const string PROTOCOL_VERSION_KEY = "x-mq-protocol-version";
         public const string REQUEST_ID_KEY = "x-mq-request-id";
+
+        public const string CLIENT_ID_KEY = "x-mq-client-id";
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MqLogManager.cs b/rocketmq-client-csharp/MqLogManager.cs
index 0608c8a..3d294bd 100644
--- a/rocketmq-client-csharp/MqLogManager.cs
+++ b/rocketmq-client-csharp/MqLogManager.cs
@@ -4,7 +4,7 @@
 using NLog;
 using NLog.Config;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * RocketMQ Log Manager.
diff --git a/rocketmq-client-csharp/Partition.cs b/rocketmq-client-csharp/Partition.cs
deleted file mode 100644
index 410601e..0000000
--- a/rocketmq-client-csharp/Partition.cs
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-
-namespace org.apache.rocketmq {
-    
-    public class Partition : IEquatable<Partition>, IComparable<Partition> {
-
-        public Partition(Topic topic, Broker broker, int id, Permission permission) {
-            this.topic = topic;
-            this.broker = broker;
-            this.id = id;
-            this.permission = permission;
-        }
-
-        private Topic topic;
-        public Topic Topic{
-            get { return topic; }
-        }
-
-        private Broker broker;
-        public Broker Broker {
-            get { return broker; }
-        }
-
-        private int id;
-        public int Id {
-            get { return id; }
-        }
-
-        Permission permission;
-        public Permission Permission {
-            get { return permission; }
-        }
-
-        public bool Equals(Partition other) {
-            return topic.Equals(other.topic) 
-                && broker.Equals(other.broker) 
-                && id.Equals(other.id) 
-                && permission == other.permission;
-        }
-
-        public override bool Equals(Object other) {
-            if (!(other is Partition)) {
-                return false;
-            }
-            return Equals(other);
-        }
-
-        public override int GetHashCode()
-        {
-            return HashCode.Combine(topic, broker, id, permission);
-        }
-
-        public int CompareTo(Partition other) {
-            if (0 != topic.CompareTo(other.topic)) {
-                return topic.CompareTo(other.topic);
-            }
-
-            if (0 != broker.CompareTo(other.broker)) {
-                return broker.CompareTo(other.broker);
-            }
-
-            if (0 != id.CompareTo(other.id)) {
-                return id.CompareTo(other.id);
-            }
-
-            return permission.CompareTo(other.permission);
-        }
-    }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/ProcessQueue.cs
similarity index 63%
copy from rocketmq-client-csharp/StaticNameServerResolver.cs
copy to rocketmq-client-csharp/ProcessQueue.cs
index 9f97599..1022978 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/ProcessQueue.cs
@@ -15,24 +15,29 @@
  * limitations under the License.
  */
 using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public class StaticNameServerResolver : INameServerResolver
+    public class ProcessQueue
     {
 
-        public StaticNameServerResolver(List<string> nameServerList)
+        public ProcessQueue()
         {
-            this.nameServerList = nameServerList;
+            _lastReceivedTime = DateTime.UtcNow;
+        }
+        public bool Dropped { get; set; }
+
+        private DateTime _lastReceivedTime;
+
+        public DateTime LastReceiveTime
+        {
+            get { return _lastReceivedTime; }
+            set { _lastReceivedTime = value; }
         }
 
-        public async Task<List<string>> resolveAsync()
+        internal bool Expired()
         {
-            return nameServerList;
+            return DateTime.UtcNow.Subtract(_lastReceivedTime).TotalMilliseconds > 30 * 1000;
         }
 
-        private List<string> nameServerList;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 0b3f8a0..5c51cdc 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -14,94 +14,149 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
 using System.Threading.Tasks;
-using rmq = apache.rocketmq.v1;
-using pb = global::Google.Protobuf;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using NLog;
+using OpenTelemetry;
+using OpenTelemetry.Exporter;
+using OpenTelemetry.Metrics;
 
-
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class Producer : Client, IProducer
     {
-        public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
+        public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
         {
-            this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _sendFailureTotal = MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
+            _sendLatency = MetricMeter.CreateHistogram<double>(SendLatencyName, 
+                description: "Measure the duration of publishing messages to brokers",
+                unit: "milliseconds");
         }
 
-        public override void start()
+        public override async Task Start()
         {
-            base.start();
-            // More initalization
+            await base.Start();
+            // More initialization
+            // TODO: Add authentication header
+
+            _meterProvider = Sdk.CreateMeterProviderBuilder()
+                .AddMeter("Apache.RocketMQ.Client")
+                .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+                {
+                    options.Protocol = OtlpExportProtocol.Grpc;
+                    options.Endpoint = new Uri(_accessPoint.TargetUrl());
+                    options.TimeoutMilliseconds = (int) _clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+
+                    readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 60 * 1000;
+                })
+                .AddView((instrument) =>
+                {
+                    if (instrument.Meter.Name == MeterName && instrument.Name == SendLatencyName)
+                    {
+                        return new ExplicitBucketHistogramConfiguration()
+                        {
+                            Boundaries = new double[] {1, 5, 10, 20, 50, 200, 500},
+                        };
+                    }
+                    return null;
+                })
+                .Build();
         }
 
-        public override void shutdown()
+        public override async Task Shutdown()
         {
             // Release local resources
-            base.shutdown();
+            await base.Shutdown();
         }
 
-        public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.Producer;
 
+            // Concept of ProducerGroup has been removed.
         }
 
-        public async Task<SendResult> send(Message message)
+        public async Task<SendReceipt> Send(Message message)
         {
-            if (!loadBalancer.ContainsKey(message.Topic))
+            if (!_loadBalancer.ContainsKey(message.Topic))
             {
-                var topicRouteData = await getRouteFor(message.Topic, false);
-                if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+                var topicRouteData = await GetRouteFor(message.Topic, false);
+                if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
                 {
+                    Logger.Error($"Failed to resolve route info for {message.Topic}");
                     throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
                 }
 
                 var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
-                loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+                _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
             }
 
-            var publishLB = loadBalancer[message.Topic];
+            var publishLb = _loadBalancer[message.Topic];
 
             var request = new rmq::SendMessageRequest();
-            request.Message = new rmq::Message();
-            request.Message.Body = pb::ByteString.CopyFrom(message.Body);
-            request.Message.Topic = new rmq::Resource();
-            request.Message.Topic.ResourceNamespace = resourceNamespace();
-            request.Message.Topic.Name = message.Topic;
+            var entry = new rmq::Message();
+            entry.Body = ByteString.CopyFrom(message.Body);
+            entry.Topic = new rmq::Resource();
+            entry.Topic.ResourceNamespace = resourceNamespace();
+            entry.Topic.Name = message.Topic;
+            request.Messages.Add(entry);
 
             // User properties
             foreach (var item in message.UserProperties)
             {
-                request.Message.UserAttribute.Add(item.Key, item.Value);
+                entry.UserProperties.Add(item.Key, item.Value);
             }
 
-            request.Message.SystemAttribute = new rmq::SystemAttribute();
-            request.Message.SystemAttribute.MessageId = message.MessageId;
+            entry.SystemProperties = new rmq::SystemProperties();
+            entry.SystemProperties.MessageId = message.MessageId;
+            entry.SystemProperties.MessageType = rmq::MessageType.Normal;
+            if (DateTime.MinValue != message.DeliveryTimestamp)
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Delay;
+                entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+
+                if (message.Fifo())
+                {
+                    Logger.Warn("A message may not be FIFO and delayed at the same time");
+                    throw new MessageException("A message may not be both FIFO and Timed");
+                }
+            } else if (!String.IsNullOrEmpty(message.MessageGroup))
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
+                entry.SystemProperties.MessageGroup = message.MessageGroup;
+            }
+            
             if (!string.IsNullOrEmpty(message.Tag))
             {
-                request.Message.SystemAttribute.Tag = message.Tag;
+                entry.SystemProperties.Tag = message.Tag;
             }
 
             if (0 != message.Keys.Count)
             {
                 foreach (var key in message.Keys)
                 {
-                    request.Message.SystemAttribute.Keys.Add(key);
+                    entry.SystemProperties.Keys.Add(key);
                 }
             }
 
-            // string target = "https://";
             List<string> targets = new List<string>();
-            List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
-            foreach (var partition in candidates)
+            List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
+            foreach (var messageQueue in candidates)
             {
-                targets.Add(partition.Broker.targetUrl());
+                targets.Add(Utilities.TargetUrl(messageQueue));
             }
 
-            var metadata = new grpc::Metadata();
+            var metadata = new Metadata();
             Signature.sign(this, metadata);
 
             Exception ex = null;
@@ -110,27 +165,46 @@
             {
                 try
                 {
-                    rmq::SendMessageResponse response = await clientManager.sendMessage(target, metadata, request, getIoTimeout());
-                    if (null != response && (int)global::Google.Rpc.Code.Ok == response.Common.Status.Code)
+                    var stopWatch = new Stopwatch();
+                    stopWatch.Start();
+                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
+                    if (null != response && rmq::Code.Ok == response.Status.Code)
                     {
-                        var messageId = response.MessageId;
-                        return new SendResult(messageId);
+                        var messageId = response.Entries[0].MessageId;
+                        
+                        // Account latency histogram
+                        stopWatch.Stop();
+                        var latency = stopWatch.ElapsedMilliseconds;
+                        _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
+                        
+                        return new SendReceipt(messageId);
                     }
                 }
                 catch (Exception e)
                 {
+                    // Account failure count
+                    _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));                    
+                    Logger.Info(e, $"Failed to send message to {target}");
                     ex = e;
                 }
             }
 
             if (null != ex)
             {
+                Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
                 throw ex;
             }
 
+            Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
             throw new Exception("Send message failed");
         }
 
-        private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
+        private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
+
+        private readonly Counter<long> _sendFailureTotal;
+        private readonly Histogram<double> _sendLatency;
+
+        private static readonly string SendLatencyName = "rocketmq_send_success_cost_time";
+        private MeterProvider _meterProvider;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto
deleted file mode 100644
index 33f4644..0000000
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/definition.proto
+++ /dev/null
@@ -1,351 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-package apache.rocketmq.v1;
-
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQDomain";
-
-option csharp_namespace = "apache.rocketmq.v1";
-
-enum Permission {
-  NONE = 0;
-  READ = 1;
-  WRITE = 2;
-  READ_WRITE = 3;
-
-  reserved 4 to 64;
-}
-
-enum FilterType {
-  TAG = 0;
-  SQL = 1;
-
-  reserved 2 to 64;
-}
-
-message FilterExpression {
-  FilterType type = 1;
-  string expression = 2;
-
-  reserved 3 to 64;
-}
-
-// Dead lettering is done on a best effort basis. The same message might be
-// dead lettered multiple times.
-//
-// If validation on any of the fields fails at subscription creation/update,
-// the create/update subscription request will fail.
-message DeadLetterPolicy {
-  // The maximum number of delivery attempts for any message.
-  //
-  // This field will be honored on a best effort basis.
-  //
-  // If this parameter is 0, a default value of 16 is used.
-  int32 max_delivery_attempts = 1;
-
-  reserved 2 to 64;
-}
-
-message Resource {
-  string resource_namespace = 1;
-
-  // Resource name identifier, which remains unique within the abstract resource
-  // namespace.
-  string name = 2;
-
-  reserved 3 to 64;
-}
-
-enum ConsumeModel {
-  CLUSTERING = 0;
-  BROADCASTING = 1;
-
-  reserved 2 to 64;
-}
-
-message ProducerData {
-  Resource group = 1;
-
-  reserved 2 to 64;
-}
-
-enum ConsumePolicy {
-  RESUME = 0;
-  PLAYBACK = 1;
-  DISCARD = 2;
-  TARGET_TIMESTAMP = 3;
-
-  reserved 4 to 64;
-}
-
-enum ConsumeMessageType {
-  ACTIVE = 0;
-  PASSIVE = 1;
-
-  reserved 2 to 64;
-}
-
-message ConsumerData {
-  Resource group = 1;
-
-  repeated SubscriptionEntry subscriptions = 2;
-
-  ConsumeModel consume_model = 3;
-
-  ConsumePolicy consume_policy = 4;
-
-  DeadLetterPolicy dead_letter_policy = 5;
-
-  ConsumeMessageType consume_type = 6;
-
-  reserved 7 to 64;
-}
-
-message SubscriptionEntry {
-  Resource topic = 1;
-  FilterExpression expression = 2;
-
-  reserved 3 to 64;
-}
-
-enum AddressScheme {
-  IPv4 = 0;
-  IPv6 = 1;
-  DOMAIN_NAME = 2;
-
-  reserved 3 to 64;
-}
-
-message Address {
-  string host = 1;
-  int32 port = 2;
-
-  reserved 3 to 64;
-}
-
-message Endpoints {
-  AddressScheme scheme = 1;
-  repeated Address addresses = 2;
-
-  reserved 3 to 64;
-}
-
-message Broker {
-  // Name of the broker
-  string name = 1;
-
-  // Broker index. Canonically, index = 0 implies that the broker is playing
-  // leader role while brokers with index > 0 play follower role.
-  int32 id = 2;
-
-  // Address of the broker, complying with the following scheme
-  // 1. dns:[//authority/]host[:port]
-  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
-  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
-  Endpoints endpoints = 3;
-
-  reserved 4 to 64;
-}
-
-message Partition {
-  Resource topic = 1;
-  int32 id = 2;
-  Permission permission = 3;
-  Broker broker = 4;
-
-  reserved 5 to 64;
-}
-
-enum MessageType {
-  NORMAL = 0;
-
-  // Sequenced message
-  FIFO = 1;
-
-  // Messages that are delivered after the specified duration.
-  DELAY = 2;
-
-  // Messages that are transactional. Only committed messages are delivered to
-  // subscribers.
-  TRANSACTION = 3;
-
-  reserved 4 to 64;
-}
-
-enum DigestType {
-  // CRC algorithm achieves goal of detecting random data error with lowest
-  // computation overhead.
-  CRC32 = 0;
-
-  // MD5 algorithm achieves good balance between collision rate and computation
-  // overhead.
-  MD5 = 1;
-
-  // SHA-family has substantially fewer collision with fair amount of
-  // computation.
-  SHA1 = 2;
-
-  reserved 3 to 64;
-}
-
-// When publishing messages to or subscribing messages from brokers, clients
-// shall include or validate digests of message body to ensure data integrity.
-//
-// For message publishment, when an invalid digest were detected, brokers need
-// respond client with BAD_REQUEST.
-//
-// For messags subscription, when an invalid digest were detected, consumers
-// need to handle this case according to message type:
-// 1) Standard messages should be negatively acknowledged instantly, causing
-// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
-// previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
-message Digest {
-  DigestType type = 1;
-  string checksum = 2;
-
-  reserved 3 to 64;
-}
-
-enum Encoding {
-  IDENTITY = 0;
-  GZIP = 1;
-
-  reserved 2 to 64;
-}
-
-message SystemAttribute {
-  // Tag
-  string tag = 1;
-
-  // Message keys
-  repeated string keys = 2;
-
-  // Message identifier, client-side generated, remains unique.
-  // if message_id is empty, the send message request will be aborted with
-  // status `INVALID_ARGUMENT`
-  string message_id = 3;
-
-  // Message body digest
-  Digest body_digest = 4;
-
-  // Message body encoding. Candidate options are identity, gzip, snappy etc.
-  Encoding body_encoding = 5;
-
-  // Message type, normal, FIFO or transactional.
-  MessageType message_type = 6;
-
-  // Message born time-point.
-  google.protobuf.Timestamp born_timestamp = 7;
-
-  // Message born host. Valid options are IPv4, IPv6 or client host domain name.
-  string born_host = 8;
-
-  // Time-point at which the message is stored in the broker.
-  google.protobuf.Timestamp store_timestamp = 9;
-
-  // The broker that stores this message. It may be name, IP or arbitrary
-  // identifier that uniquely identify the broker.
-  string store_host = 10;
-
-  oneof timed_delivery {
-    // Time-point at which broker delivers to clients.
-    google.protobuf.Timestamp delivery_timestamp = 11;
-
-    // Level-based delay strategy.
-    int32 delay_level = 12;
-  }
-
-  // If a message is acquired by way of POP, this field holds the receipt.
-  // Clients use the receipt to acknowledge or negatively acknowledge the
-  // message.
-  string receipt_handle = 13;
-
-  // Partition identifier in which a message is physically stored.
-  int32 partition_id = 14;
-
-  // Partition offset at which a message is stored.
-  int64 partition_offset = 15;
-
-  // Period of time servers would remain invisible once a message is acquired.
-  google.protobuf.Duration invisible_period = 16;
-
-  // Business code may failed to process messages for the moment. Hence, clients
-  // may request servers to deliver them again using certain back-off strategy,
-  // the attempt is 1 not 0 if message is delivered first time.
-  int32 delivery_attempt = 17;
-
-  // Message producer load-balance group if applicable.
-  Resource producer_group = 18;
-
-  string message_group = 19;
-
-  // Trace context.
-  string trace_context = 20;
-
-  // Delay time of first recover orphaned transaction request from server.
-  google.protobuf.Duration orphaned_transaction_recovery_period = 21;
-
-  reserved 22 to 64;
-}
-
-message Message {
-
-  Resource topic = 1;
-
-  // User defined key-value pairs.
-  // If user_attribute contains the reserved keys by RocketMQ,
-  // the send message request will be aborted with status `INVALID_ARGUMENT`.
-  // See below links for the reserved keys
-  // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
-  map<string, string> user_attribute = 2;
-
-  SystemAttribute system_attribute = 3;
-
-  bytes body = 4;
-
-  reserved 5 to 64;
-}
-
-message Assignment {
-  Partition Partition = 1;
-
-  reserved 2 to 64;
-}
-
-enum QueryOffsetPolicy {
-  // Use this option if client wishes to playback all existing messages.
-  BEGINNING = 0;
-
-  // Use this option if client wishes to skip all existing messages.
-  END = 1;
-
-  // Use this option if time-based seek is targeted.
-  TIME_POINT = 2;
-
-  reserved 3 to 64;
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto
deleted file mode 100644
index 6f1b4c1..0000000
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/service.proto
+++ /dev/null
@@ -1,522 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
-
-import "apache/rocketmq/v1/definition.proto";
-
-package apache.rocketmq.v1;
-
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQService";
-
-option csharp_namespace = "apache.rocketmq.v1";
-
-message ResponseCommon {
-  google.rpc.Status status = 1;
-  google.rpc.RequestInfo request_info = 2;
-  google.rpc.Help help = 3;
-  google.rpc.RetryInfo retry_info = 4;
-  google.rpc.DebugInfo debug_info = 5;
-  google.rpc.ErrorInfo error_info = 6;
-
-  reserved 7 to 64;
-}
-
-// Topics are destination of messages to publish to or subscribe from. Similar
-// to domain names, they will be addressable after resolution through the
-// provided access point.
-//
-// Access points are usually the addresses of name servers, which fulfill
-// service discovery, load-balancing and other auxillary services. Name servers
-// receive periodic heartbeats from affiliate brokers and erase those which
-// failed to maintain alive status.
-//
-// Name servers answer queries of QueryRouteRequest, responding clients with
-// addressable partitions, which they may directly publish messages to or
-// subscribe messages from.
-//
-// QueryRouteRequest shall include source endpoints, aka, configured
-// access-point, which annotates tenant-id, instance-id or other
-// vendor-specific settings. Purpose-built name servers may respond customized
-// results based on these particular requirements.
-message QueryRouteRequest {
-  Resource topic = 1;
-
-  Endpoints endpoints = 2;
-
-  reserved 3 to 64;
-}
-
-message QueryRouteResponse {
-  ResponseCommon common = 1;
-
-  repeated Partition partitions = 2;
-
-  reserved 3 to 64;
-}
-
-message SendMessageRequest {
-  Message message = 1;
-  Partition partition = 2;
-
-  reserved 3 to 64;
-}
-
-message SendMessageResponse {
-  ResponseCommon common = 1;
-  string message_id = 2;
-  string transaction_id = 3;
-
-  reserved 4 to 64;
-}
-
-message QueryAssignmentRequest {
-  Resource topic = 1;
-  Resource group = 2;
-  string client_id = 3;
-
-  // Service access point
-  Endpoints endpoints = 4;
-
-  reserved 5 to 64;
-}
-
-message QueryAssignmentResponse {
-  ResponseCommon common = 1;
-  repeated Assignment assignments = 2;
-
-  reserved 3 to 64;
-}
-
-message ReceiveMessageRequest {
-  Resource group = 1;
-  string client_id = 2;
-  Partition partition = 3;
-  FilterExpression filter_expression = 4;
-  ConsumePolicy consume_policy = 5;
-  google.protobuf.Timestamp initialization_timestamp = 6;
-  int32 batch_size = 7;
-  google.protobuf.Duration invisible_duration = 8;
-  google.protobuf.Duration await_time = 9;
-  bool fifo_flag = 10;
-
-  reserved 11 to 64;
-}
-
-message ReceiveMessageResponse {
-  ResponseCommon common = 1;
-  repeated Message messages = 2;
-  google.protobuf.Timestamp delivery_timestamp = 3;
-  google.protobuf.Duration invisible_duration = 4;
-
-  reserved 5 to 64;
-}
-
-message AckMessageRequest {
-  Resource group = 1;
-  Resource topic = 2;
-  string client_id = 3;
-  oneof handle {
-    string receipt_handle = 4;
-    int64 offset = 5;
-  }
-  string message_id = 6;
-
-  reserved 7 to 64;
-}
-
-message AckMessageResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message NackMessageRequest {
-  Resource group = 1;
-  Resource topic = 2;
-  string client_id = 3;
-  string receipt_handle = 4;
-  string message_id = 5;
-  int32 delivery_attempt = 6;
-  int32 max_delivery_attempts = 7;
-
-  reserved 8 to 64;
-}
-
-message NackMessageResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message ForwardMessageToDeadLetterQueueRequest {
-  Resource group = 1;
-  Resource topic = 2;
-  string client_id = 3;
-  string receipt_handle = 4;
-  string message_id = 5;
-  int32 delivery_attempt = 6;
-  int32 max_delivery_attempts = 7;
-
-  reserved 8 to 64;
-}
-
-message ForwardMessageToDeadLetterQueueResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message HeartbeatRequest {
-  string client_id = 1;
-  oneof client_data {
-    ProducerData producer_data = 2;
-    ConsumerData consumer_data = 3;
-  }
-  bool fifo_flag = 4;
-
-  reserved 5 to 64;
-}
-
-message HeartbeatResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message HealthCheckRequest {
-  Resource group = 1;
-  string client_host = 2;
-
-  reserved 3 to 64;
-}
-
-message HealthCheckResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message EndTransactionRequest {
-  Resource group = 1;
-  string message_id = 2;
-  string transaction_id = 3;
-  enum TransactionResolution {
-    COMMIT = 0;
-    ROLLBACK = 1;
-  }
-  TransactionResolution resolution = 4;
-  enum Source {
-    CLIENT = 0;
-    SERVER_CHECK = 1;
-  }
-  Source source = 5;
-  string trace_context = 6;
-
-  reserved 7 to 64;
-}
-
-message EndTransactionResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message QueryOffsetRequest {
-  Partition partition = 1;
-  QueryOffsetPolicy policy = 2;
-  google.protobuf.Timestamp time_point = 3;
-
-  reserved 4 to 64;
-}
-
-message QueryOffsetResponse {
-  ResponseCommon common = 1;
-  int64 offset = 2;
-
-  reserved 3 to 64;
-}
-
-message PullMessageRequest {
-  Resource group = 1;
-  Partition partition = 2;
-  int64 offset = 3;
-  int32 batch_size = 4;
-  google.protobuf.Duration await_time = 5;
-  FilterExpression filter_expression = 6;
-  string client_id = 7;
-
-  reserved 8 to 64;
-}
-
-message PullMessageResponse {
-  ResponseCommon common = 1;
-  int64 min_offset = 2;
-  int64 next_offset = 3;
-  int64 max_offset = 4;
-  repeated Message messages = 5;
-
-  reserved 6 to 64;
-}
-
-message NoopCommand { reserved 1 to 64; }
-
-message PrintThreadStackTraceCommand {
-  string command_id = 1;
-
-  reserved 2 to 64;
-}
-
-message ReportThreadStackTraceRequest {
-  string command_id = 1;
-  string thread_stack_trace = 2;
-
-  reserved 3 to 64;
-}
-
-message ReportThreadStackTraceResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
-  string command_id = 1;
-  Message message = 2;
-
-  reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultRequest {
-  string command_id = 1;
-
-  // 1. Return `INVALID_ARGUMENT` if message is corrupted.
-  // 2. Return `INTERNAL` if failed to consume message.
-  // 3. Return `OK` if success.
-  google.rpc.Status status = 2;
-
-  reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message RecoverOrphanedTransactionCommand {
-  Message orphaned_transactional_message = 1;
-  string transaction_id = 2;
-
-  reserved 3 to 64;
-}
-
-message PollCommandRequest {
-  string client_id = 1;
-  repeated Resource topics = 2;
-  oneof group {
-    Resource producer_group = 3;
-    Resource consumer_group = 4;
-  }
-
-  reserved 5 to 64;
-}
-
-message PollCommandResponse {
-  oneof type {
-    // Default command when no new command need to be delivered.
-    NoopCommand noop_command = 1;
-    // Request client to print thread stack trace.
-    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
-    // Request client to verify the consumption of the appointed message.
-    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
-    // Request client to recover the orphaned transaction message.
-    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
-  }
-
-  reserved 5 to 64;
-}
-
-message NotifyClientTerminationRequest {
-  oneof group {
-    Resource producer_group = 1;
-    Resource consumer_group = 2;
-  }
-  string client_id = 3;
-
-  reserved 4 to 64;
-}
-
-message NotifyClientTerminationResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-// For all the RPCs in MessagingService, the following error handling policies
-// apply:
-//
-// If the request doesn't bear a valid authentication credential, return a
-// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
-// user is not granted with sufficient permission to execute the requested
-// operation, return a response with common.status.code == `PERMISSION_DENIED`.
-// If the per-user-resource-based quota is exhausted, return a response with
-// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
-// errors raise, return a response with common.status.code == `INTERNAL`.
-service MessagingService {
-
-  // Queries the route entries of the requested topic in the perspective of the
-  // given endpoints. On success, servers should return a collection of
-  // addressable partitions. Note servers may return customized route entries
-  // based on endpoints provided.
-  //
-  // If the requested topic doesn't exist, returns `NOT_FOUND`.
-  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
-  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
-
-  // Producer or consumer sends HeartbeatRequest to servers periodically to
-  // keep-alive. Additionally, it also reports client-side configuration,
-  // including topic subscription, load-balancing group name, etc.
-  //
-  // Returns `OK` if success.
-  //
-  // If a client specifies a language that is not yet supported by servers,
-  // returns `INVALID_ARGUMENT`
-  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
-
-  // Checks the health status of message server, returns `OK` if services are
-  // online and serving. Clients may use this RPC to detect availability of
-  // messaging service, and take isolation actions when necessary.
-  rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
-
-  // Delivers messages to brokers.
-  // Clients may further:
-  // 1. Refine a message destination to topic partition which fulfills parts of
-  // FIFO semantic;
-  // 2. Flag a message as transactional, which keeps it invisible to consumers
-  // until it commits;
-  // 3. Time a message, making it invisible to consumers till specified
-  // time-point;
-  // 4. And more...
-  //
-  // Returns message-id or transaction-id with status `OK` on success.
-  //
-  // If the destination topic doesn't exist, returns `NOT_FOUND`.
-  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
-
-  // Queries the assigned partition route info of a topic for current consumer,
-  // the returned assignment result is decided by server-side load balancer.
-  //
-  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
-  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
-  rpc QueryAssignment(QueryAssignmentRequest)
-      returns (QueryAssignmentResponse) {}
-
-  // Receives messages from the server in batch manner, returns a batch of
-  // messages if success. The received messages should be ACKed or NACKed after
-  // processing.
-  //
-  // If the pending concurrent receive requests exceed the quota of the given
-  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
-  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
-  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
-  // message in the specific topic, returns `OK` with an empty message set.
-  // Please note that client may suffer from false empty responses.
-  rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
-
-  // Acknowledges the message associated with the `receipt_handle` or `offset`
-  // in the `AckMessageRequest`, it means the message has been successfully
-  // processed. Returns `OK` if the message server remove the relevant message
-  // successfully.
-  //
-  // If the given receipt_handle is illegal or out of date, returns
-  // `INVALID_ARGUMENT`.
-  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
-
-  // Signals that the message has not been successfully processed. The message
-  // server should resend the message follow the retry policy defined at
-  // server-side.
-  //
-  // If the corresponding topic or consumer group doesn't exist, returns
-  // `NOT_FOUND`.
-  rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
-
-  // Forwards one message to dead letter queue if the DeadLetterPolicy is
-  // triggered by this message at client-side, return `OK` if success.
-  rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
-      returns (ForwardMessageToDeadLetterQueueResponse) {}
-
-  // Commits or rollback one transactional message.
-  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
-
-  // Queries the offset of the specific partition, returns the offset with `OK`
-  // if success. The message server should maintain a numerical offset for each
-  // message in a partition.
-  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
-
-  // Pulls messages from the specific partition, returns a set of messages with
-  // next pull offset. The pulled messages can't be ACKed or NACKed, while the
-  // client is responsible for manage offsets for consumer, typically update
-  // consume offset to local memory or a third-party storage service.
-  //
-  // If the pending concurrent receive requests exceed the quota of the given
-  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
-  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
-  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
-  // message in the specific topic, returns `OK` with an empty message set.
-  // Please note that client may suffer from false empty responses.
-  rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
-
-  // Multiplexing RPC(s) for various polling requests, which issue different
-  // commands to client.
-  //
-  // Sometimes client may need to receive and process the command from server.
-  // To prevent the complexity of streaming RPC(s), a unary RPC using
-  // long-polling is another solution.
-  //
-  // To mark the request-response of corresponding command, `command_id` in
-  // message is recorded in the subsequent RPC(s). For example, after receiving
-  // command of printing thread stack trace, client would send
-  // `ReportMessageConsumptionResultRequest` to server, which contain both of
-  // the stack trace and `command_id`.
-  //
-  // At same time, `NoopCommand` is delivered from server when no new command is
-  // needed, it is essential for client to maintain the ping-pong.
-  //
-  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
-  // After receiving the corresponding polling command, the thread stack trace
-  // is reported to the server.
-  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
-      returns (ReportThreadStackTraceResponse) {}
-
-  // After receiving the corresponding polling command, the consumption result
-  // of appointed message is reported to the server.
-  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
-      returns (ReportMessageConsumptionResultResponse) {}
-
-  // Notify the server that the client is terminated.
-  rpc NotifyClientTermination(NotifyClientTerminationRequest)
-      returns (NotifyClientTerminationResponse) {}
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
similarity index 86%
rename from rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto
rename to rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
index 554207b..7dbb702 100644
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v1/admin.proto
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
@@ -15,11 +15,12 @@
 
 syntax = "proto3";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
 
 option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
 option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
+option java_package = "apache.rocketmq.v2";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
 option java_outer_classname = "MQAdmin";
@@ -35,11 +36,8 @@
   Level level = 1;
 }
 
-message ChangeLogLevelResponse {
-  string remark = 1;
-}
+message ChangeLogLevelResponse { string remark = 1; }
 
 service Admin {
-  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {
-  }
+  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..21a6321
--- /dev/null
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+  TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+  COMMIT = 1;
+  ROLLBACK = 2;
+}
+
+enum TransactionSource {
+  SOURCE_UNSPECIFIED = 0;
+  SOURCE_CLIENT = 1;
+  SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+  PERMISSION_UNSPECIFIED = 0;
+  NONE = 1;
+  READ = 2;
+  WRITE = 3;
+  READ_WRITE = 4;
+}
+
+enum FilterType {
+  FILTER_TYPE_UNSPECIFIED = 0;
+  TAG = 1;
+  SQL = 2;
+}
+
+message FilterExpression {
+  FilterType type = 1;
+  string expression = 2;
+}
+
+message RetryPolicy {
+  int32 max_attempts = 1;
+  oneof strategy {
+    ExponentialBackoff exponential_backoff = 2;
+    CustomizedBackoff customized_backoff = 3;
+  }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+  google.protobuf.Duration initial = 1;
+  google.protobuf.Duration max = 2;
+  float multiplier = 3;
+}
+
+message CustomizedBackoff {
+  // To support classic backoff strategy which is arbitary defined by end users.
+  // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
+  repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+  string resource_namespace = 1;
+
+  // Resource name identifier, which remains unique within the abstract resource
+  // namespace.
+  string name = 2;
+}
+
+message SubscriptionEntry {
+  Resource topic = 1;
+  FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+  ADDRESS_SCHEME_UNSPECIFIED = 0;
+  IPv4 = 1;
+  IPv6 = 2;
+  DOMAIN_NAME = 3;
+}
+
+message Address {
+  string host = 1;
+  int32 port = 2;
+}
+
+message Endpoints {
+  AddressScheme scheme = 1;
+  repeated Address addresses = 2;
+}
+
+message Broker {
+  // Name of the broker
+  string name = 1;
+
+  // Broker index. Canonically, index = 0 implies that the broker is playing
+  // leader role while brokers with index > 0 play follower role.
+  int32 id = 2;
+
+  // Address of the broker, complying with the following scheme
+  // 1. dns:[//authority/]host[:port]
+  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+  Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+  Resource topic = 1;
+  int32 id = 2;
+  Permission permission = 3;
+  Broker broker = 4;
+  repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+  MESSAGE_TYPE_UNSPECIFIED = 0;
+
+  NORMAL = 1;
+
+  // Sequenced message
+  FIFO = 2;
+
+  // Messages that are delivered after the specified duration.
+  DELAY = 3;
+
+  // Messages that are transactional. Only committed messages are delivered to
+  // subscribers.
+  TRANSACTION = 4;
+}
+
+enum DigestType {
+  DIGEST_TYPE_UNSPECIFIED = 0;
+
+  // CRC algorithm achieves goal of detecting random data error with lowest
+  // computation overhead.
+  CRC32 = 1;
+
+  // MD5 algorithm achieves good balance between collision rate and computation
+  // overhead.
+  MD5 = 2;
+
+  // SHA-family has substantially fewer collision with fair amount of
+  // computation.
+  SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+  DigestType type = 1;
+  string checksum = 2;
+}
+
+enum ClientType {
+  CLIENT_TYPE_UNSPECIFIED = 0;
+  PRODUCER = 1;
+  PUSH_CONSUMER = 2;
+  SIMPLE_CONSUMER = 3;
+}
+
+enum Encoding {
+  ENCODING_UNSPECIFIED = 0;
+
+  IDENTITY = 1;
+
+  GZIP = 2;
+}
+
+message SystemProperties {
+  // Tag, which is optional.
+  optional string tag = 1;
+
+  // Message keys
+  repeated string keys = 2;
+
+  // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
+  string message_id = 3;
+
+  // Message body digest
+  Digest body_digest = 4;
+
+  // Message body encoding. Candidate options are identity, gzip, snappy etc.
+  Encoding body_encoding = 5;
+
+  // Message type, normal, FIFO or transactional.
+  MessageType message_type = 6;
+
+  // Message born time-point.
+  google.protobuf.Timestamp born_timestamp = 7;
+
+  // Message born host. Valid options are IPv4, IPv6 or client host domain name.
+  string born_host = 8;
+
+  // Time-point at which the message is stored in the broker, which is absent
+  // for message publishing.
+  optional google.protobuf.Timestamp store_timestamp = 9;
+
+  // The broker that stores this message. It may be broker name, IP or arbitrary
+  // identifier that uniquely identify the server.
+  string store_host = 10;
+
+  // Time-point at which broker delivers to clients, which is optional.
+  optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+  // If a message is acquired by way of POP, this field holds the receipt,
+  // which is absent for message publishing.
+  // Clients use the receipt to acknowledge or negatively acknowledge the
+  // message.
+  optional string receipt_handle = 12;
+
+  // Message queue identifier in which a message is physically stored.
+  int32 queue_id = 13;
+
+  // Message-queue offset at which a message is stored, which is absent for
+  // message publishing.
+  optional int64 queue_offset = 14;
+
+  // Period of time servers would remain invisible once a message is acquired.
+  optional google.protobuf.Duration invisible_duration = 15;
+
+  // Business code may failed to process messages for the moment. Hence, clients
+  // may request servers to deliver them again using certain back-off strategy,
+  // the attempt is 1 not 0 if message is delivered first time, and it is absent
+  // for message publishing.
+  optional int32 delivery_attempt = 16;
+
+  // Define the group name of message in the same topic, which is optional.
+  optional string message_group = 17;
+
+  // Trace context for each message, which is optional.
+  optional string trace_context = 18;
+
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+
+  Resource topic = 1;
+
+  // User defined key-value pairs.
+  // If user_properties contain the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
+  // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+  map<string, string> user_properties = 2;
+
+  SystemProperties system_properties = 3;
+
+  bytes body = 4;
+}
+
+message Assignment { MessageQueue message_queue = 1; }
+
+enum Code {
+  // Success.
+  OK = 0;
+  // Format of access point is illegal.
+  ILLEGAL_ACCESS_POINT = 1;
+  // Format of topic is illegal.
+  ILLEGAL_TOPIC = 2;
+  // Format of consumer group is illegal.
+  ILLEGAL_CONSUMER_GROUP = 3;
+  // Format of message tag is illegal.
+  ILLEGAL_MESSAGE_TAG = 4;
+  // Format of message key is illegal.
+  ILLEGAL_MESSAGE_KEY = 5;
+  // Size of message keys exceeds the threshold.
+  MESSAGE_KEYS_TOO_LARGE = 6;
+  // Format of message group is illegal.
+  ILLEGAL_MESSAGE_GROUP = 7;
+  // Format of message property key is illegal.
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 9;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 10;
+
+  // User does not have the permission to operate.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
+  FORBIDDEN = 403;
+
+  // Code indicates that the client request has not been completed
+  // because it lacks valid authentication credentials for the
+  // requested resource.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
+  UNAUTHORIZED = 401;
+
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 13;
+
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 14;
+
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_MESSAGE_FORBIDDEN = 15;
+
+  // Failed to consume message.
+  FAILED_TO_CONSUME_MESSAGE = 16;
+
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 17;
+
+  // Too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 18;
+
+  // Expired receipt-handle is used when trying to acknowledge or change
+  // invisible duration of a message
+  RECEIPT_HANDLE_EXPIRED = 19;
+
+  // Message property is not match the message type.
+  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 21;
+
+  // Transaction id is invalid.
+  INVALID_TRANSACTION_ID = 22;
+
+  // Format of filter expression is illegal.
+  ILLEGAL_FILTER_EXPRESSION = 23;
+
+  // Receipt handle of message is invalid.
+  INVALID_RECEIPT_HANDLE = 24;
+
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 25;
+
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 26;
+
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 27;
+
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 28;
+
+  // Message not found from server.
+  MESSAGE_NOT_FOUND = 29;
+
+  // Message offset is illegal.
+  ILLEGAL_MESSAGE_OFFSET = 30;
+
+  // Illegal message is for the sake of backward compatibility. In most case,
+  // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
+  ILLEGAL_MESSAGE = 31;
+
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 32;
+
+  // Return different results for entries in composite request.
+  MULTIPLE_RESULTS = 33;
+
+  // Code indicates that the server encountered an unexpected condition
+  // that prevented it from fulfilling the request.
+  // This error response is a generic "catch-all" response.
+  // Usually, this indicates the server cannot find a better alternative
+  // error code to response. Sometimes, server administrators log error
+  // responses like the 500 status code with more details about the request
+  // to prevent the error from happening again in the future.
+  //
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+  INTERNAL_SERVER_ERROR = 500;
+
+  // Code means that the server or client does not support the functionality
+  // required to fulfill the request.
+  NOT_IMPLEMENTED = 501;
+
+  // Code indicates that the server, while acting as a gateway or proxy,
+  // did not get a response in time from the upstream server that
+  // it needed in order to complete the request.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  GATEWAY_TIMEOUT = 504;
+}
+
+message Status {
+  Code code = 1;
+  string message = 2;
+}
+
+enum Language {
+  LANGUAGE_UNSPECIFIED = 0;
+  JAVA = 1;
+  CPP = 2;
+  DOT_NET = 3;
+  GOLANG = 4;
+  RUST = 5;
+}
+
+// User Agent
+message UA {
+  // SDK language
+  Language language = 1;
+
+  // SDK version
+  string version = 2;
+
+  // Platform details, including OS name, version, arch etc.
+  string platform = 3;
+
+  // Hostname of the node
+  string hostname = 4;
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
new file mode 100644
index 0000000..c7ce2e9
--- /dev/null
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+  Resource topic = 1;
+  Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+  Status status = 1;
+
+  repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+  repeated Message messages = 1;
+}
+
+message SendResultEntry {
+  Status status = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  int64 offset = 4;
+}
+
+message SendMessageResponse {
+  Status status = 1;
+
+  // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+  // each entry for best certainty.
+  repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+  Resource topic = 1;
+  Resource group = 2;
+  Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+  Status status = 1;
+  repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  FilterExpression filter_expression = 3;
+  int32 batch_size = 4;
+  // Required if client type is simple consumer.
+  optional google.protobuf.Duration invisible_duration = 5;
+  // For message auto renew and clean
+  bool auto_renew = 6;
+}
+
+message ReceiveMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
+  }
+}
+
+message AckMessageEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+
+  // Acknowledge result may be acquired through inspecting
+  // `status.code`; In case acknowledgement failed, `status.message`
+  // is the explanation of the failure.
+  Status status = 3;
+}
+
+message AckMessageResponse {
+
+  // RPC tier status, which is used to represent RPC-level errors including
+  // authentication, authorization, throttling and other general failures.
+  Status status = 1;
+
+  repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+  int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
+
+message HeartbeatRequest {
+  optional Resource group = 1;
+  ClientType client_type = 2;
+}
+
+message HeartbeatResponse { Status status = 1; }
+
+message EndTransactionRequest {
+  Resource topic = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  TransactionResolution resolution = 4;
+  TransactionSource source = 5;
+  string trace_context = 6;
+}
+
+message EndTransactionResponse { Status status = 1; }
+
+message PrintThreadStackTraceCommand { string nonce = 1; }
+
+message ThreadStackTrace {
+  string nonce = 1;
+  optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+  string nonce = 1;
+  MessageQueue message_queue = 2;
+  Message message = 3;
+}
+
+message VerifyMessageResult {
+  string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+  MessageQueue message_queue = 1;
+  Message orphaned_transactional_message = 2;
+  string transaction_id = 3;
+}
+
+message Publishing {
+  // Publishing settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // List of topics to which messages will publish to.
+  repeated Resource topics = 1;
+
+  // Publishing settings below here are from server, it is essential for
+  // server to push.
+  //
+  // Body of message will be deflated if its size in bytes exceeds the
+  // threshold.
+  int32 compress_body_threshold = 2;
+
+  // If the message body size exceeds `max_body_size`, broker servers would
+  // reject the request. As a result, it is advisable that Producer performs
+  // client-side check validation.
+  int32 max_body_size = 3;
+}
+
+message Subscription {
+  // Subscription settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // Consumer group.
+  optional Resource group = 1;
+
+  // Subscription for consumer.
+  repeated SubscriptionEntry subscriptions = 2;
+
+  // Subscription settings below here are from server, it is essential for
+  // server to push.
+  //
+  // When FIFO flag is `true`, messages of the same message group are processed
+  // in first-in-first-out manner.
+  //
+  // Brokers will not deliver further messages of the same group utill prior
+  // ones are completely acknowledged.
+  optional bool fifo = 3;
+
+  // Message receive batch size here is essential for push consumer.
+  optional int32 receive_batch_size = 4;
+
+  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+  // push consumer.
+  optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+  
+  // The endpoint that client metrics should be exported to, which is required if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
+message Settings {
+  // Configurations for all clients.
+  optional ClientType client_type = 1;
+
+  optional Endpoints access_point = 2;
+
+  // If publishing of messages encounters throttling or server internal errors,
+  // publishers should implement automatic retries after progressive longer
+  // back-offs for consecutive errors.
+  //
+  // When processing message fails, `backoff_policy` describes an interval
+  // after which the message should be available to consume again.
+  //
+  // For FIFO messages, the interval should be relatively small because
+  // messages of the same message group would not be readily available utill
+  // the prior one depletes its lifecycle.
+  optional RetryPolicy backoff_policy = 3;
+
+  // Request timeout for RPCs excluding long-polling.
+  optional google.protobuf.Duration request_timeout = 4;
+
+  oneof pub_sub {
+    Publishing publishing = 5;
+
+    Subscription subscription = 6;
+  }
+
+  // User agent details
+  UA user_agent = 7;
+
+  Metric metric = 8;
+}
+
+message TelemetryCommand {
+  optional Status status = 1;
+
+  oneof command {
+    // Client settings
+    Settings settings = 2;
+
+    // These messages are from client.
+    //
+    // Report thread stack trace to server.
+    ThreadStackTrace thread_stack_trace = 3;
+
+    // Report message verify result to server.
+    VerifyMessageResult verify_message_result = 4;
+
+    // There messages are from server.
+    //
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageCommand verify_message_command = 7;
+  }
+}
+
+message NotifyClientTerminationRequest {
+  // Consumer group, which is absent for producer.
+  optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+  Resource group = 1;
+  Resource topic = 2;
+
+  // Unique receipt handle to identify message to change
+  string receipt_handle = 3;
+
+  // New invisible duration
+  google.protobuf.Duration invisible_duration = 4;
+
+  // For message tracing
+  string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+  Status status = 1;
+
+  // Server may generate a new receipt handle for the message.
+  string receipt_handle = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+
+  // Queries the route entries of the requested topic in the perspective of the
+  // given endpoints. On success, servers should return a collection of
+  // addressable message-queues. Note servers may return customized route
+  // entries based on endpoints provided.
+  //
+  // If the requested topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
+
+  // Producer or consumer sends HeartbeatRequest to servers periodically to
+  // keep-alive. Additionally, it also reports client-side configuration,
+  // including topic subscription, load-balancing group name, etc.
+  //
+  // Returns `OK` if success.
+  //
+  // If a client specifies a language that is not yet supported by servers,
+  // returns `INVALID_ARGUMENT`
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+
+  // Delivers messages to brokers.
+  // Clients may further:
+  // 1. Refine a message destination to message-queues which fulfills parts of
+  // FIFO semantic;
+  // 2. Flag a message as transactional, which keeps it invisible to consumers
+  // until it commits;
+  // 3. Time a message, making it invisible to consumers till specified
+  // time-point;
+  // 4. And more...
+  //
+  // Returns message-id or transaction-id with status `OK` on success.
+  //
+  // If the destination topic doesn't exist, returns `NOT_FOUND`.
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+
+  // Queries the assigned route info of a topic for current consumer,
+  // the returned assignment result is decided by server-side load balancer.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+  }
+
+  // Receives messages from the server in batch manner, returns a set of
+  // messages if success. The received messages should be acked or redelivered
+  // after processed.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
+  //
+  // If failed to receive message from remote, server must return only one
+  // `ReceiveMessageResponse` as the reply to the request, whose `Status` indicates
+  // the specific reason of failure, otherwise, the reply is considered successful.
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {
+  }
+
+  // Acknowledges the message associated with the `receipt_handle` or `offset`
+  // in the `AckMessageRequest`, it means the message has been successfully
+  // processed. Returns `OK` if the message server remove the relevant message
+  // successfully.
+  //
+  // If the given receipt_handle is illegal or out of date, returns
+  // `INVALID_ARGUMENT`.
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+
+  // Forwards one message to dead letter queue if the max delivery attempts is
+  // exceeded by this message at client-side, return `OK` if success.
+  rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+      returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+  // Commits or rollback one transactional message.
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+
+  // Once a client starts, it would immediately establishes bi-lateral stream
+  // RPCs with brokers, reporting its settings as the initiative command.
+  //
+  // When servers have need of inspecting client status, they would issue
+  // telemetry commands to clients. After executing received instructions,
+  // clients shall report command execution results through client-side streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
+
+  // Notify the server that the client is terminated.
+  rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+  }
+
+  // Once a message is retrieved from consume queue on behalf of the group, it
+  // will be kept invisible to other clients of the same group for a period of
+  // time. The message is supposed to be processed within the invisible
+  // duration. If the client, which is in charge of the invisible message, is
+  // not capable of processing the message timely, it may use
+  // ChangeInvisibleDuration to lengthen invisible duration.
+  rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) {
+  }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
index 9a1b66d..7d258b4 100644
--- a/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -16,58 +16,59 @@
  */
 using System;
 using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class PublishLoadBalancer
     {
         public PublishLoadBalancer(TopicRouteData route)
         {
-            this.partitions = new List<Partition>();
-            foreach (var partition in route.Partitions)
+            this._messageQueues = new List<rmq::MessageQueue>();
+            foreach (var messageQueue in route.MessageQueues)
             {
-                if (Permission.NONE == partition.Permission)
+                if (rmq::Permission.Unspecified == messageQueue.Permission)
                 {
                     continue;
                 }
 
-                if (Permission.READ == partition.Permission)
+                if (rmq::Permission.Read == messageQueue.Permission)
                 {
                     continue;
                 }
 
-                this.partitions.Add(partition);
+                this._messageQueues.Add(messageQueue);
             }
 
-            this.partitions.Sort();
+            this._messageQueues.Sort(Utilities.CompareMessageQueue);
             Random random = new Random();
-            this.roundRobinIndex = random.Next(0, this.partitions.Count);
+            this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
         }
 
-        public void update(TopicRouteData route)
+        public void Update(TopicRouteData route)
         {
-            List<Partition> partitions = new List<Partition>();
-            foreach (var partition in route.Partitions)
+            List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
+            foreach (var partition in route.MessageQueues)
             {
-                if (Permission.NONE == partition.Permission)
+                if (rmq::Permission.Unspecified == partition.Permission)
                 {
                     continue;
                 }
 
-                if (Permission.READ == partition.Permission)
+                if (rmq::Permission.Read == partition.Permission)
                 {
                     continue;
                 }
                 partitions.Add(partition);
             }
             partitions.Sort();
-            this.partitions = partitions;
+            this._messageQueues = partitions;
         }
 
         /**
          * Accept a partition iff its broker is different.
          */
-        private bool accept(List<Partition> existing, Partition partition)
+        private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
         {
             if (0 == existing.Count)
             {
@@ -76,7 +77,7 @@
 
             foreach (var item in existing)
             {
-                if (item.Broker.Equals(partition.Broker))
+                if (item.Broker.Equals(messageQueue.Broker))
                 {
                     return false;
                 }
@@ -84,22 +85,22 @@
             return true;
         }
 
-        public List<Partition> select(int maxAttemptTimes)
+        public List<rmq::MessageQueue> Select(int maxAttemptTimes)
         {
-            List<Partition> result = new List<Partition>();
+            List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
 
-            List<Partition> all = this.partitions;
+            List<rmq::MessageQueue> all = this._messageQueues;
             if (0 == all.Count)
             {
                 return result;
             }
-            int start = ++roundRobinIndex;
+            int start = ++_roundRobinIndex;
             int found = 0;
 
             for (int i = 0; i < all.Count; i++)
             {
                 int idx = ((start + i) & int.MaxValue) % all.Count;
-                if (accept(result, all[idx]))
+                if (Accept(result, all[idx]))
                 {
                     result.Add(all[idx]);
                     if (++found >= maxAttemptTimes)
@@ -112,8 +113,8 @@
             return result;
         }
 
-        private List<Partition> partitions;
+        private List<rmq::MessageQueue> _messageQueues;
 
-        private int roundRobinIndex;
+        private int _roundRobinIndex;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/INameServerResolver.cs b/rocketmq-client-csharp/Publishing.cs
similarity index 75%
copy from rocketmq-client-csharp/INameServerResolver.cs
copy to rocketmq-client-csharp/Publishing.cs
index 568098f..ffedd17 100644
--- a/rocketmq-client-csharp/INameServerResolver.cs
+++ b/rocketmq-client-csharp/Publishing.cs
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
 
-namespace org.apache.rocketmq
+using rmq = Apache.Rocketmq.V2;
+using System.Collections.Generic;
+
+namespace Org.Apache.Rocketmq
 {
-    public interface INameServerResolver
+    // Settings for publishing
+    public class Publishing
     {
-        Task<List<string>> resolveAsync();
+        public List<rmq::Resource> Topics { get; set; }
+        public int CompressBodyThreshold { get; set; }
+
+        public int MaxBodySize { get; set; }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..cc30943
--- /dev/null
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+    public class PushConsumer : Client, IConsumer
+    {
+        public PushConsumer(AccessPoint accessPoint, string resourceNamespace, string group) : base(accessPoint, resourceNamespace)
+        {
+            _group = group;
+            _topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
+            _topicAssignmentsMap = new ConcurrentDictionary<string, List<rmq::Assignment>>();
+            _processQueueMap = new ConcurrentDictionary<rmq::Assignment, ProcessQueue>();
+            _scanAssignmentCTS = new CancellationTokenSource();
+            _scanExpiredProcessQueueCTS = new CancellationTokenSource();
+        }
+
+        public override async Task Start()
+        {
+            if (null == _messageListener)
+            {
+                throw new System.Exception("Bad configuration: message listener is required");
+            }
+
+            await base.Start();
+
+            // Step-1: Resolve topic routes
+            List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                queryRouteTasks.Add(GetRouteFor(item.Key, true));
+            }
+            Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
+
+            // Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
+            await Heartbeat();
+
+            // Step-3: Scan load assignments that are assigned to current client
+            schedule(async () =>
+            {
+                await scanLoadAssignments();
+            }, 10, _scanAssignmentCTS.Token);
+
+            schedule(() =>
+            {
+                ScanExpiredProcessQueue();
+            }, 10, _scanExpiredProcessQueueCTS.Token);
+        }
+
+        public override async Task Shutdown()
+        {
+            _scanAssignmentCTS.Cancel();
+            _scanExpiredProcessQueueCTS.Cancel();
+
+            // Shutdown resources of derived class
+            await base.Shutdown();
+        }
+
+        private async Task scanLoadAssignments()
+        {
+            Logger.Debug("Start to scan load assignments from server");
+            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq::Assignment>>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                tasks.Add(scanLoadAssignment(item.Key, _group));
+            }
+            var result = await Task.WhenAll(tasks);
+
+            foreach (var assignments in result)
+            {
+                if (assignments.Count == 0)
+                {
+                    continue;
+                }
+
+                checkAndUpdateAssignments(assignments);
+            }
+            Logger.Debug("Completed scanning load assignments");
+        }
+
+        private void ScanExpiredProcessQueue()
+        {
+            foreach (var item in _processQueueMap)
+            {
+                if (item.Value.Expired())
+                {
+                    Task.Run(async () =>
+                    {
+                        await ExecutePop0(item.Key);
+                    });
+                }
+            }
+        }
+
+        private void checkAndUpdateAssignments(List<rmq::Assignment> assignments)
+        {
+            if (assignments.Count == 0)
+            {
+                return;
+            }
+
+            string topic = assignments[0].MessageQueue.Topic.Name;
+
+            // Compare to generate or cancel pop-cycles
+            List<rmq::Assignment> existing;
+            _topicAssignmentsMap.TryGetValue(topic, out existing);
+
+            foreach (var assignment in assignments)
+            {
+                if (null == existing || !existing.Contains(assignment))
+                {
+                    ExecutePop(assignment);
+                }
+            }
+
+            if (null != existing)
+            {
+                foreach (var assignment in existing)
+                {
+                    if (!assignments.Contains(assignment))
+                    {
+                        Logger.Info($"Stop receiving messages from {assignment.MessageQueue.ToString()}");
+                        CancelPop(assignment);
+                    }
+                }
+            }
+
+        }
+
+        private void ExecutePop(rmq::Assignment assignment)
+        {
+            var processQueue = new ProcessQueue();
+            if (_processQueueMap.TryAdd(assignment, processQueue))
+            {
+                Task.Run(async () =>
+                {
+                    await ExecutePop0(assignment);
+                });
+            }
+        }
+
+        private async Task ExecutePop0(rmq::Assignment assignment)
+        {
+            Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
+            while (true)
+            {
+                try
+                {
+                    ProcessQueue processQueue;
+                    if (!_processQueueMap.TryGetValue(assignment, out processQueue))
+                    {
+                        break;
+                    }
+
+                    if (processQueue.Dropped)
+                    {
+                        break;
+                    }
+
+                    List<Message> messages = await base.ReceiveMessage(assignment, _group);
+                    processQueue.LastReceiveTime = System.DateTime.UtcNow;
+
+                    // TODO: cache message and dispatch them 
+
+                    List<Message> failed = new List<Message>();
+                    await _messageListener.Consume(messages, failed);
+
+                    foreach (var message in failed)
+                    {
+                        await base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                    }
+
+                    foreach (var message in messages)
+                    {
+                        if (!failed.Contains(message))
+                        {
+                            bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                            if (!success)
+                            {
+                                //TODO: log error.
+                            }
+                        }
+                    }
+                }
+                catch (System.Exception)
+                {
+                    // TODO: log exception raised.
+                }
+
+
+            }
+        }
+
+        private void CancelPop(rmq::Assignment assignment)
+        {
+            if (!_processQueueMap.ContainsKey(assignment))
+            {
+                return;
+            }
+
+            ProcessQueue processQueue;
+            if (_processQueueMap.Remove(assignment, out processQueue))
+            {
+                processQueue.Dropped = true;
+            }
+        }
+
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+        }
+
+        public void Subscribe(string topic, string expression, ExpressionType type)
+        {
+            var filterExpression = new FilterExpression(expression, type);
+            _topicFilterExpressionMap[topic] = filterExpression;
+
+        }
+
+        public void RegisterListener(IMessageListener listener)
+        {
+            if (null != listener)
+            {
+                _messageListener = listener;
+            }
+        }
+
+        private string _group;
+
+        private ConcurrentDictionary<string, FilterExpression> _topicFilterExpressionMap;
+        private IMessageListener _messageListener;
+
+        private CancellationTokenSource _scanAssignmentCTS;
+
+        private ConcurrentDictionary<string, List<rmq::Assignment>> _topicAssignmentsMap;
+
+        private ConcurrentDictionary<rmq::Assignment, ProcessQueue> _processQueueMap;
+
+        private CancellationTokenSource _scanExpiredProcessQueueCTS;
+
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 0191e91..c1f1cd6 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -15,47 +15,176 @@
  * limitations under the License.
  */
 
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Net.Security;
+using System.Threading;
 using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
+using NLog;
 
-namespace org.apache.rocketmq {
-    public class RpcClient : IRpcClient {
-        public RpcClient(MessagingService.MessagingServiceClient client) {
-            stub = client;
+namespace Org.Apache.Rocketmq
+{
+    public class RpcClient : IRpcClient
+    {
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private readonly rmq::MessagingService.MessagingServiceClient _stub;
+        private readonly GrpcChannel _channel;
+
+        public RpcClient(string target)
+        {
+            _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            {
+                HttpHandler = CreateHttpHandler()
+            });
+            var invoker = _channel.Intercept(new ClientLoggerInterceptor());
+            _stub = new rmq::MessagingService.MessagingServiceClient(invoker);
         }
 
-        public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions)
+        public async Task Shutdown()
         {
-            var call = stub.QueryRouteAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            var status = call.GetStatus();
-            if (status.StatusCode != grpc.StatusCode.OK) {
-                //TODO: Something is wrong, raise an exception here.
+            if (null != _channel)
+            {
+                await _channel.ShutdownAsync();
             }
-            return response;
         }
 
-        public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions)
+        /**
+         * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
+         * why parameters are configured this way.
+         */
+        private HttpMessageHandler CreateHttpHandler()
         {
-            var call = stub.HeartbeatAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var sslOptions = new SslClientAuthenticationOptions();
+            // Disable server certificate validation during development phase.
+            // Comment out the following line if server certificate validation is required. 
+            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+                EnableMultipleHttp2Connections = true,
+                SslOptions = sslOptions,
+            };
+            return handler;
         }
 
-        public async Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions)
+        public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
         {
-            var call = stub.NotifyClientTerminationAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var callOptions = new CallOptions(metadata, deadline);
+            return _stub.Telemetry(callOptions);
         }
 
-        public async Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions)
+        public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
         {
-            var call = stub.SendMessageAsync(request, callOptions);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.QueryRouteAsync(request, callOptions);
             return await call.ResponseAsync;
         }
 
-        private MessagingService.MessagingServiceClient stub;
+
+        public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.HeartbeatAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.SendMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata, rmq::QueryAssignmentRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.QueryAssignmentAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, 
+            rmq::ReceiveMessageRequest request, TimeSpan timeout) {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+            var call = _stub.ReceiveMessage(request, callOptions);
+            var result = new List<rmq::ReceiveMessageResponse>();
+            var stream = call.ResponseStream;
+            while (await stream.MoveNext())
+            {
+                var entry = stream.Current;
+                Logger.Debug($"Got ReceiveMessageResponse {entry}");
+                result.Add(entry);
+            }
+            Logger.Debug($"Receiving of messages completed");
+            return result;
+        }
+
+        public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.AckMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, rmq::ChangeInvisibleDurationRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.ChangeInvisibleDurationAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,
+            rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.ForwardMessageToDeadLetterQueueAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata, rmq::EndTransactionRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.EndTransactionAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+            rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.NotifyClientTerminationAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/SendResult.cs b/rocketmq-client-csharp/SendReceipt.cs
similarity index 79%
rename from rocketmq-client-csharp/SendResult.cs
rename to rocketmq-client-csharp/SendReceipt.cs
index 5967cca..0f29991 100644
--- a/rocketmq-client-csharp/SendResult.cs
+++ b/rocketmq-client-csharp/SendReceipt.cs
@@ -14,27 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public sealed class SendResult {
-        public SendResult(string messageId) {
+
+namespace Org.Apache.Rocketmq
+{
+    public sealed class SendReceipt
+    {
+        public SendReceipt(string messageId)
+        {
             status_ = SendStatus.SEND_OK;
             messageId_ = messageId;
         }
 
-        public SendResult(string messageId, SendStatus status) {
+        public SendReceipt(string messageId, SendStatus status)
+        {
             status_ = status;
             messageId_ = messageId;
         }
 
         private string messageId_;
 
-        public string MessageId {
+        public string MessageId
+        {
             get { return messageId_; }
         }
 
 
         private SendStatus status_;
-        public SendStatus Status {
+
+        public SendStatus Status
+        {
             get { return status_; }
         }
     }
diff --git a/rocketmq-client-csharp/SendStatus.cs b/rocketmq-client-csharp/SendStatus.cs
index 8964211..7586d22 100644
--- a/rocketmq-client-csharp/SendStatus.cs
+++ b/rocketmq-client-csharp/SendStatus.cs
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-namespace org.apache.rocketmq {
-    public enum SendStatus {
+namespace Org.Apache.Rocketmq
+{
+    public enum SendStatus
+    {
         SEND_OK,
         FLUSH_DISK_TIMEOUT,
         FLUSH_SLAVE_TIMEOUT,
diff --git a/rocketmq-client-csharp/SequenceGenerator.cs b/rocketmq-client-csharp/SequenceGenerator.cs
index aa92c80..97a1eb9 100644
--- a/rocketmq-client-csharp/SequenceGenerator.cs
+++ b/rocketmq-client-csharp/SequenceGenerator.cs
@@ -17,8 +17,9 @@
 using System;
 using System.Threading;
 using System.Net.NetworkInformation;
+using NLog;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
@@ -27,6 +28,7 @@
      */
     public sealed class SequenceGenerator
     {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         public static SequenceGenerator Instance
         {
@@ -94,10 +96,11 @@
             {
                 if (nic.OperationalStatus == OperationalStatus.Up)
                 {
-                    if (nic.Name.Equals("lo"))
+                    if (nic.Name.StartsWith("lo"))
                     {
                         continue;
                     }
+                    Logger.Debug($"NIC={nic.Name}");
                     return nic.GetPhysicalAddress().GetAddressBytes();
                 }
             }
diff --git a/rocketmq-client-csharp/ServiceAddress.cs b/rocketmq-client-csharp/ServiceAddress.cs
deleted file mode 100644
index 4aab213..0000000
--- a/rocketmq-client-csharp/ServiceAddress.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System.Collections.Generic;
-
-namespace org.apache.rocketmq {
-    public sealed class ServiceAddress {
-
-        public ServiceAddress(AddressScheme scheme, List<Address> addresses) {
-            this.scheme = scheme;
-            this.addresses = addresses;
-        }
-
-        private AddressScheme scheme;
-        public AddressScheme Scheme {
-            get { return scheme; }
-        }
-
-        private List<Address> addresses;
-        public List<Address> Addresses{
-            get { return addresses; }
-        }
-
-    }
-}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
new file mode 100644
index 0000000..a6be057
--- /dev/null
+++ b/rocketmq-client-csharp/Session.cs
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using grpc = global::Grpc.Core;
+using NLog;
+using rmq = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Session
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+        public Session(string target,
+            grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+            IClient client)
+        {
+            this._target = target;
+            this._stream = stream;
+            this._client = client;
+            this._channel = Channel.CreateUnbounded<bool>();
+        }
+
+        public async Task Loop()
+        {
+            var reader = this._stream.ResponseStream;
+            var writer = this._stream.RequestStream;
+            var request = new rmq::TelemetryCommand();
+            request.Settings = new rmq::Settings();
+            _client.BuildClientSetting(request.Settings);
+            await writer.WriteAsync(request);
+            Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
+            while (!_client.TelemetryCts().IsCancellationRequested)
+            {
+                if (await reader.MoveNext(_client.TelemetryCts().Token))
+                {
+                    var cmd = reader.Current;
+                    Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
+                    switch (cmd.CommandCase)
+                    {
+                        case rmq::TelemetryCommand.CommandOneofCase.None:
+                            {
+                                Logger.Warn($"Telemetry failed: {cmd.Status}");
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
+                                {
+                                    await _channel.Writer.WriteAsync(false);
+                                }
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.Settings:
+                            {
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
+                                {
+                                    await _channel.Writer.WriteAsync(true);
+                                }
+
+                                Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                _client.OnSettingsReceived(cmd.Settings);
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
+                            {
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
+                            {
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
+                            {
+                                break;
+                            }
+                    }
+                }
+            }
+            Logger.Info("Telemetry stream cancelled");
+            await writer.CompleteAsync();
+        }
+
+        private string _target;
+
+        public string Target
+        {
+            get { return _target; }
+        }
+
+        public async Task AwaitSettingNegotiationCompletion()
+        {
+            if (0 != Interlocked.Read(ref _established))
+            {
+                return;
+            }
+
+            Logger.Debug("Await setting negotiation");
+            await _channel.Reader.ReadAsync();
+        }
+
+        private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+        private IClient _client;
+
+        private long _established = 0;
+
+        private Channel<bool> _channel;
+    };
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index 70e038a..2331b53 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -19,29 +19,38 @@
 using grpc = global::Grpc.Core;
 using System.Security.Cryptography;
 
-namespace org.apache.rocketmq {
-    public class Signature {
-        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
+namespace Org.Apache.Rocketmq
+{
+    public class Signature
+    {
+        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+        {
             metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
-            if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
+            metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
+            if (!String.IsNullOrEmpty(clientConfig.tenantId()))
+            {
                 metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
             }
 
-            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) {
+            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
+            {
                 metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace());
             }
 
             string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT);
             metadata.Add(MetadataConstants.DATE_TIME_KEY, time);
 
-            if (null != clientConfig.credentialsProvider()) {
+            if (null != clientConfig.credentialsProvider())
+            {
                 var credentials = clientConfig.credentialsProvider().getCredentials();
-                if (null == credentials || credentials.expired()) {
+                if (null == credentials || credentials.expired())
+                {
                     return;
                 }
 
-                if (!String.IsNullOrEmpty(credentials.SessionToken)) {
+                if (!String.IsNullOrEmpty(credentials.SessionToken))
+                {
                     metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken);
                 }
 
@@ -50,7 +59,7 @@
                 HMACSHA1 signer = new HMACSHA1(secretData);
                 byte[] digest = signer.ComputeHash(data);
                 string hmac = BitConverter.ToString(digest).Replace("-", "");
-                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}", 
+                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
                     MetadataConstants.ALGORITHM_KEY,
                     MetadataConstants.CREDENTIAL_KEY,
                     credentials.AccessKey,
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 0000000..154efa0
--- /dev/null
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+using System.Threading;
+using Grpc.Core;
+using System.Collections.Generic;
+using System.Linq;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Org.Apache.Rocketmq
+{
+    public class SimpleConsumer : Client
+    {
+
+        public SimpleConsumer(AccessPoint accessPoint,
+        string resourceNamespace, string group)
+        : base(accessPoint, resourceNamespace)
+        {
+            _fifo = false;
+            _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+            _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+            _group = group;
+        }
+
+        public override void BuildClientSetting(rmq::Settings settings)
+        {
+            base.BuildClientSetting(settings);
+
+            settings.ClientType = rmq::ClientType.SimpleConsumer;
+            settings.Subscription = new rmq::Subscription();
+            settings.Subscription.Group = new rmq::Resource();
+            settings.Subscription.Group.Name = _group;
+            settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+
+            foreach (var kv in _subscriptions)
+            {
+                settings.Subscription.Subscriptions.Add(kv.Value);
+            }
+        }
+
+        public override async Task Start()
+        {
+            await base.Start();
+            
+            // Scan load assignment periodically
+            schedule(async () =>
+            {
+                while (!_scanAssignmentCts.IsCancellationRequested)
+                {
+                    await ScanLoadAssignments();                    
+                }
+            }, 30, _scanAssignmentCts.Token);
+
+            await ScanLoadAssignments();
+        }
+
+        public override async Task Shutdown()
+        {
+            await base.Shutdown();
+            if (!await NotifyClientTermination())
+            {
+                Logger.Warn("Failed to NotifyClientTermination");
+            }
+        }
+
+        private async Task ScanLoadAssignments()
+        {
+
+            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
+            List<string> topics = new List<string>();
+            foreach (var sub in _subscriptions)
+            {
+                var request = new rmq::QueryAssignmentRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = ResourceNamespace;
+                request.Topic.Name = sub.Key;
+                topics.Add(sub.Key);
+                request.Group = new rmq::Resource();
+                request.Group.Name = _group;
+                request.Group.ResourceNamespace = ResourceNamespace;
+
+                request.Endpoints = new rmq::Endpoints();
+                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
+                var address = new rmq::Address();
+                address.Host = _accessPoint.Host;
+                address.Port = _accessPoint.Port;
+                request.Endpoints.Addresses.Add(address);
+
+                var metadata = new Metadata();
+                Signature.sign(this, metadata);
+                tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+            }
+
+            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
+
+            var i = 0;
+            foreach (var assignments in list)
+            {
+                string topic = topics[i];
+                if (null == assignments || 0 == assignments.Count)
+                {
+                    Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
+                    ++i;
+                    continue;
+                }
+                Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
+                _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
+                ++i;
+            }
+        }
+
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+            request.ClientType = rmq::ClientType.SimpleConsumer;
+            request.Group = new rmq::Resource();
+            request.Group.Name = _group;
+            request.Group.ResourceNamespace = ResourceNamespace;
+        }
+
+        public void Subscribe(string topic, rmq::FilterType filterType, string expression)
+        {
+            var entry = new rmq::SubscriptionEntry();
+            entry.Topic = new rmq::Resource();
+            entry.Topic.Name = topic;
+            entry.Topic.ResourceNamespace = ResourceNamespace;
+            entry.Expression = new rmq::FilterExpression();
+            entry.Expression.Type = filterType;
+            entry.Expression.Expression = expression;
+            _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
+            AddTopicOfInterest(topic);
+        }
+
+        public override void OnSettingsReceived(rmq.Settings settings)
+        {
+            base.OnSettingsReceived(settings);
+
+            if (settings.Subscription.Fifo)
+            {
+                _fifo = true;
+                Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
+            }
+        }
+
+        public async Task<List<Message>> Receive(int batchSize, TimeSpan timeout)
+        {
+            var messageQueue = NextQueue();
+            if (null == messageQueue)
+            {
+                Logger.Debug("NextQueue returned null");
+                return new List<Message>();
+            }
+
+            var request = new rmq.ReceiveMessageRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.MessageQueue = new rmq.MessageQueue();
+            request.MessageQueue.MergeFrom(messageQueue);
+            request.BatchSize = batchSize;
+            
+            // Client is responsible of extending message invisibility duration
+            request.AutoRenew = false;
+            
+            var targetUrl = Utilities.TargetUrl(messageQueue);
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            
+            return await Manager.ReceiveMessage(targetUrl, metadata, request, timeout);
+        }
+
+
+        public async Task Ack(Message message)
+        {
+            var request = new rmq.AckMessageRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.Topic = new rmq.Resource();
+            request.Topic.ResourceNamespace = ResourceNamespace;
+            request.Topic.Name = message.Topic;
+            
+            var entry = new rmq.AckMessageEntry();
+            request.Entries.Add(entry);
+            entry.MessageId = message.MessageId;
+            entry.ReceiptHandle = message._receiptHandle;
+
+            var targetUrl = message._sourceHost;
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+        }
+
+        public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
+        {
+            var request = new rmq.ChangeInvisibleDurationRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.Topic = new rmq.Resource();
+            request.Topic.ResourceNamespace = ResourceNamespace;
+            request.Topic.Name = message.Topic;
+
+            request.ReceiptHandle = message._receiptHandle;
+            request.MessageId = message.MessageId;
+            
+            request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
+
+            var targetUrl = message._sourceHost;
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
+        }
+        
+        private rmq.MessageQueue NextQueue()
+        {
+            if (_topicAssignments.IsEmpty)
+            {
+                return null;
+            }
+            
+            UInt32 topicSeq = CurrentTopicSequence.Value;
+            CurrentTopicSequence.Value = topicSeq + 1;
+
+            var total = _topicAssignments.Count;
+            var topicIndex = topicSeq % total;
+            var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
+            
+            UInt32 queueSeq = CurrentQueueSequence.Value;
+            CurrentQueueSequence.Value = queueSeq + 1;
+            List<rmq.Assignment> assignments;
+            if (_topicAssignments.TryGetValue(topic, out assignments))
+            {
+                if (null == assignments)
+                {
+                    return null;
+                }
+                var idx = queueSeq % assignments.Count;
+                return assignments[(int)idx].MessageQueue;
+
+            }
+
+            return null;
+        }
+
+        private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
+        private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
+
+        private readonly string _group;
+        private bool _fifo;
+        private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
+        private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+        private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticCredentialsProvider.cs b/rocketmq-client-csharp/StaticCredentialsProvider.cs
index 301613b..edd810d 100644
--- a/rocketmq-client-csharp/StaticCredentialsProvider.cs
+++ b/rocketmq-client-csharp/StaticCredentialsProvider.cs
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public class StaticCredentialsProvider : ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public class StaticCredentialsProvider : ICredentialsProvider
+    {
 
-        public StaticCredentialsProvider(string accessKey, string accessSecret) {
+        public StaticCredentialsProvider(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials getCredentials() {
+        public Credentials getCredentials()
+        {
             return new Credentials(accessKey, accessSecret);
         }
 
diff --git a/rocketmq-client-csharp/Topic.cs b/rocketmq-client-csharp/Topic.cs
index dcc7100..f1ae453 100644
--- a/rocketmq-client-csharp/Topic.cs
+++ b/rocketmq-client-csharp/Topic.cs
@@ -17,50 +17,68 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class Topic : IComparable<Topic>, IEquatable<Topic> {
-        public Topic(string resource_namespace, string name) {
-            resourceNamespace = resource_namespace;
-            this.name = name;
+namespace Org.Apache.Rocketmq
+{
+    public class Topic : IComparable<Topic>, IEquatable<Topic>
+    {
+        public Topic(string resourceNamespace, string name)
+        {
+            ResourceNamespace = resourceNamespace;
+            Name = name;
         }
 
-        private string resourceNamespace;
-        public string ResourceNamespace {
-            get { return resourceNamespace; }
-        }
+        public string ResourceNamespace { get; }
+        public string Name { get; }
 
-        private string name;
-        public string Name {
-            get { return name; }
-        }
-
-        public int CompareTo(Topic other) {
-            if (0 != resourceNamespace.CompareTo(other.resourceNamespace)) {
-                return resourceNamespace.CompareTo(other.resourceNamespace);
-            }
-
-            if (0 != name.CompareTo(other.name)) {
-                return name.CompareTo(other.name);
-            }
-
-            return 0;
-        }
-
-        public bool Equals(Topic other) {
-            return resourceNamespace.Equals(other.resourceNamespace) && name.Equals(other.name);
-        }
-
-        public override bool Equals(Object other) {
-            if (!(other is Topic)) {
+        public bool Equals(Topic other)
+        {
+            if (ReferenceEquals(null, other))
+            {
                 return false;
             }
-            return Equals(other as Topic);
+
+            if (ReferenceEquals(this, other))
+            {
+                return true;
+            }
+
+            return ResourceNamespace == other.ResourceNamespace && Name == other.Name;
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj) || obj.GetType() != GetType())
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, obj))
+            {
+                return true;
+            }
+
+            return Equals((Topic)obj);
         }
 
         public override int GetHashCode()
         {
-            return HashCode.Combine(resourceNamespace, name);
+            return HashCode.Combine(ResourceNamespace, Name);
         }
 
+        public int CompareTo(Topic other)
+        {
+            if (ReferenceEquals(null, other))
+            {
+                return -1;
+            }
+
+            var compareTo = String.CompareOrdinal(ResourceNamespace, other.ResourceNamespace);
+            if (0 == compareTo)
+            {
+                compareTo = String.CompareOrdinal(Name, other.Name);
+            }
+
+            return compareTo;
+        }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/TopicRouteData.cs b/rocketmq-client-csharp/TopicRouteData.cs
index a860669..e4aa04c 100644
--- a/rocketmq-client-csharp/TopicRouteData.cs
+++ b/rocketmq-client-csharp/TopicRouteData.cs
@@ -14,43 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
 using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
-
-    public class TopicRouteData : IEquatable<TopicRouteData> {
-
-        public TopicRouteData(List<Partition> partitions) {
-            this.partitions = partitions;
-            this.partitions.Sort();
-        }
-
-        private List<Partition> partitions;
-
-        public List<Partition> Partitions {
-            get { return partitions; }
-        }
-
-        public bool Equals(TopicRouteData other) {
-            return partitions.Equals(other.partitions);
-        }
-
-        public override bool Equals(object other)
+namespace Org.Apache.Rocketmq
+{
+    public class TopicRouteData : IEquatable<TopicRouteData>
+    {
+        public TopicRouteData(List<rmq::MessageQueue> partitions)
         {
+            _messageQueues = partitions;
 
-            if (!(other is TopicRouteData)) {
-                return false;
-            }
+            _messageQueues.Sort(Utilities.CompareMessageQueue);
+        }
 
-            return Equals(other as TopicRouteData);
+        private List<rmq::MessageQueue> _messageQueues;
+        public List<rmq::MessageQueue> MessageQueues { get { return _messageQueues; } }
+
+        public bool Equals(TopicRouteData other)
+        {
+            if (ReferenceEquals(null, other)) return false;
+            if (ReferenceEquals(this, other)) return true;
+            return Equals(_messageQueues, other._messageQueues);
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj)) return false;
+            if (ReferenceEquals(this, obj)) return true;
+            if (obj.GetType() != this.GetType()) return false;
+            return Equals((TopicRouteData)obj);
         }
 
         public override int GetHashCode()
         {
-            return HashCode.Combine(partitions);
+            return (_messageQueues != null ? _messageQueues.GetHashCode() : 0);
         }
-
     }
-
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/TopicRouteException.cs b/rocketmq-client-csharp/TopicRouteException.cs
index b520e72..75462fd 100644
--- a/rocketmq-client-csharp/TopicRouteException.cs
+++ b/rocketmq-client-csharp/TopicRouteException.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class TopicRouteException : Exception
     {
diff --git a/rocketmq-client-csharp/Utilities.cs b/rocketmq-client-csharp/Utilities.cs
index 1834a77..23ed8db 100644
--- a/rocketmq-client-csharp/Utilities.cs
+++ b/rocketmq-client-csharp/Utilities.cs
@@ -19,8 +19,10 @@
 using System.Linq;
 using System.Net.NetworkInformation;
 using System.Text;
+using System;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public static class Utilities
     {
@@ -49,5 +51,29 @@
 
             return result.ToString();
         }
+
+        public static string TargetUrl(rmq::MessageQueue messageQueue)
+        {
+            // TODO: Assert associated broker has as least one service endpoint.
+            var serviceEndpoint = messageQueue.Broker.Endpoints.Addresses[0];
+            return $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+        }
+
+        public static int CompareMessageQueue(rmq::MessageQueue lhs, rmq::MessageQueue rhs)
+        {
+            int topic_comparison = String.Compare(lhs.Topic.ResourceNamespace + lhs.Topic.Name, rhs.Topic.ResourceNamespace + rhs.Topic.Name);
+            if (topic_comparison != 0)
+            {
+                return topic_comparison;
+            }
+
+            int broker_name_comparison = String.Compare(lhs.Broker.Name, rhs.Broker.Name);
+            if (0 != broker_name_comparison)
+            {
+                return broker_name_comparison;
+            }
+
+            return lhs.Id < rhs.Id ? -1 : (lhs.Id == rhs.Id ? 0 : 1);
+        }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 24cc710..baf103f 100644
--- a/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -6,25 +6,29 @@
     <Authors>Zhanhui Li</Authors>

     <Company>Apache Software Foundation</Company>

     <TargetFramework>net5.0</TargetFramework>

-    <RootNamespace>org.apache.rocketmq</RootNamespace>

+    <RootNamespace>Org.Apache.Rocketmq</RootNamespace>

     <GeneratePackageOnBuild>true</GeneratePackageOnBuild>

   </PropertyGroup>

 

   <ItemGroup>

+    <PackageReference Include="Crc32.NET" Version="1.2.0" />

     <PackageReference Include="Google.Protobuf" Version="3.19.4" />

-    <PackageReference Include="Grpc.Net.Client" Version="2.42.0" />

+    <PackageReference Include="Grpc.Net.Client" Version="2.43.0" />

     <PackageReference Include="Grpc.Tools" Version="2.43.0">

       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

       <PrivateAssets>all</PrivateAssets>

     </PackageReference>

     <PackageReference Include="NLog" Version="4.7.13" />

+    <PackageReference Include="OpenTelemetry" Version="1.3.0" />

+    <PackageReference Include="OpenTelemetry.Api" Version="1.3.0" />

+    <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.0" />

 

-    <Protobuf Include="Protos\apache\rocketmq\v1\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />

+    <Protobuf Include="Protos\apache\rocketmq\v2\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />

     <Protobuf Include="Protos\google\rpc\code.proto" ProtoRoot="Protos" GrpcServices="Client" />

     <Protobuf Include="Protos\google\rpc\error_details.proto" ProtoRoot="Protos" GrpcServices="Client" />

     <Protobuf Include="Protos\google\rpc\status.proto" ProtoRoot="Protos" GrpcServices="Client" />

-    <Protobuf Include="Protos\apache\rocketmq\v1\service.proto" ProtoRoot="Protos" GrpcServices="Client">

-      <Link>Protos\apache\rocketmq\v1\definition.proto</Link>

+    <Protobuf Include="Protos\apache\rocketmq\v2\service.proto" ProtoRoot="Protos" GrpcServices="Client">

+      <Link>Protos\apache\rocketmq\v2\definition.proto</Link>

       <Link>Protos\google\rpc\status.proto</Link>

       <Link>Protos\google\rpc\error_details.proto</Link>

     </Protobuf>

diff --git a/tests/BrokerTest.cs b/tests/BrokerTest.cs
deleted file mode 100644
index 8de89d5..0000000
--- a/tests/BrokerTest.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
-
-namespace org.apache.rocketmq {
-    [TestClass]
-    public class BrokerTest {
-
-        [TestMethod]
-        public void testCompareTo() {
-            var b1 = new Broker("b1", 0, null);
-            var b2 = new Broker("b1", 1, null);
-            Assert.AreEqual(b1.CompareTo(b2), -1);
-        }
-
-        [TestMethod]
-        public void testEquals() {
-            var b1 = new Broker("b1", 0, null);
-            var b2 = new Broker("b1", 0, null);
-            Assert.AreEqual(b1, b2, "Equals method should be employed to test equality");
-        }
-
-    }
-}
\ No newline at end of file
diff --git a/tests/ClientConfigTest.cs b/tests/ClientConfigTest.cs
index c6d83cf..4d8dec1 100644
--- a/tests/ClientConfigTest.cs
+++ b/tests/ClientConfigTest.cs
@@ -17,11 +17,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ClientConfigTest {
+    public class ClientConfigTest
+    {
         [TestMethod]
-        public void testClientId() {
+        public void testClientId()
+        {
             var clientConfig = new ClientConfig();
             string clientId = clientConfig.clientId();
             Assert.IsTrue(clientId.Contains("@"));
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
index 0f8bff7..af5983c 100644
--- a/tests/ClientManagerTest.cs
+++ b/tests/ClientManagerTest.cs
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 using System;
+using Grpc.Core;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using rmq = apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-using System.Threading;
-using System.Threading.Tasks;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class ClientManagerTest {
-        
+    public class ClientManagerTest
+    {
+
         [TestMethod]
-        public void testResolveRoute() {
+        public void TestResolveRoute()
+        {
             string topic = "cpp_sdk_standard";
             string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
             var request = new rmq::QueryRouteRequest();
@@ -41,7 +42,7 @@
             address.Port = 80;
             request.Endpoints.Addresses.Add(address);
 
-            var metadata = new grpc::Metadata();
+            var metadata = new Metadata();
             var clientConfig = new ClientConfig();
             var credentialsProvider = new ConfigFileCredentialsProvider();
             clientConfig.CredentialsProvider = credentialsProvider;
@@ -50,7 +51,7 @@
             Signature.sign(clientConfig, metadata);
             var clientManager = new ClientManager();
             string target = "https://116.62.231.199:80";
-            var topicRouteData = clientManager.resolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            var topicRouteData = clientManager.ResolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
             Console.WriteLine(topicRouteData);
         }
     }
diff --git a/tests/ConfigFileCredentialsProviderTest.cs b/tests/ConfigFileCredentialsProviderTest.cs
index f94d364..7741295 100644
--- a/tests/ConfigFileCredentialsProviderTest.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -18,11 +18,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ConfigFileCredentialsProviderTest {
+    public class ConfigFileCredentialsProviderTest
+    {
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var provider = new ConfigFileCredentialsProvider();
             var credentials = provider.getCredentials();
             Assert.IsNotNull(credentials);
diff --git a/tests/DateTimeTest.cs b/tests/DateTimeTest.cs
index 568d59e..fdf7d53 100644
--- a/tests/DateTimeTest.cs
+++ b/tests/DateTimeTest.cs
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
-    
+namespace Org.Apache.Rocketmq
+{
+
     [TestClass]
-    public class DateTimeTest {
-        
+    public class DateTimeTest
+    {
+
         [TestMethod]
-        public void testFormat() {
+        public void testFormat()
+        {
             DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56);
-            string time =  instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
+            string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
             string expected = "20220215T083156Z";
             Assert.AreEqual(time, expected);
         }
diff --git a/tests/MessageIdGeneratorTest.cs b/tests/MessageIdGeneratorTest.cs
index 6ed34d6..c98e113 100644
--- a/tests/MessageIdGeneratorTest.cs
+++ b/tests/MessageIdGeneratorTest.cs
@@ -16,7 +16,7 @@
  */
 
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
 
 namespace tests
 {
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index 3dd7f4b..f1c71f8 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -19,12 +19,15 @@
 using System.Text;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class MessageTest {
+    public class MessageTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             var msg1 = new Message();
             Assert.IsNotNull(msg1.MessageId);
             Assert.IsTrue(msg1.MessageId.StartsWith("01"));
@@ -36,7 +39,8 @@
         }
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -49,7 +53,8 @@
         }
 
         [TestMethod]
-        public void testCtor3() {
+        public void testCtor3()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -63,7 +68,8 @@
         }
 
         [TestMethod]
-        public void testCtor4() {
+        public void testCtor4()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -81,7 +87,8 @@
         }
 
         [TestMethod]
-        public void testCtor5() {
+        public void testCtor5()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
diff --git a/tests/MqLogManagerTest.cs b/tests/MqLogManagerTest.cs
index 71be3f5..4d163b2 100644
--- a/tests/MqLogManagerTest.cs
+++ b/tests/MqLogManagerTest.cs
@@ -1,7 +1,7 @@
 using System;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using NLog;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
 
 namespace tests
 {
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 961b167..663980a 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -15,60 +15,173 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
 using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
 
-namespace org.apache.rocketmq
+
+namespace tests
 {
-
     [TestClass]
     public class ProducerTest
     {
 
+        private static AccessPoint _accessPoint;
+
         [ClassInitialize]
         public static void SetUp(TestContext context)
         {
-            List<string> nameServerAddress = new List<string>();
-            nameServerAddress.Add(string.Format("{0}:{1}", host, port));
-            resolver = new StaticNameServerResolver(nameServerAddress);
-
-            credentialsProvider = new ConfigFileCredentialsProvider();
+            _accessPoint = new AccessPoint
+            {
+                Host = HOST,
+                Port = PORT
+            };
         }
 
         [ClassCleanup]
         public static void TearDown()
         {
-
         }
 
-
         [TestMethod]
-        public void testSendMessage()
+        public async Task TestLifecycle()
         {
-            var producer = new Producer(resolver, resourceNamespace);
-            producer.ResourceNamespace = resourceNamespace;
+            var producer = new Producer(_accessPoint, resourceNamespace);
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
-            producer.start();
+            await producer.Start();
+            await producer.Shutdown();
+        }
+
+        [TestMethod]
+        public async Task TestSendStandardMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             var msg = new Message(topic, body);
-            var sendResult = producer.send(msg).GetAwaiter().GetResult();
+            
+            // Tag the massage. A message has at most one tag.
+            msg.Tag = "Tag-0";
+            
+            // Associate the message with one or multiple keys
+            var keys = new List<string>();
+            keys.Add("k1");
+            keys.Add("k2");
+            msg.Keys = keys;
+            
+            var sendResult = await producer.Send(msg);
             Assert.IsNotNull(sendResult);
-            producer.shutdown();
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendMultipleMessages()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            for (var i = 0; i < 128; i++)
+            {
+                var msg = new Message(topic, body);
+            
+                // Tag the massage. A message has at most one tag.
+                msg.Tag = "Tag-0";
+            
+                // Associate the message with one or multiple keys
+                var keys = new List<string>();
+                keys.Add("k1");
+                keys.Add("k2");
+                msg.Keys = keys;
+                var sendResult = await producer.Send(msg);
+                Assert.IsNotNull(sendResult);                
+            }
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendFifoMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            
+            // Messages of the same group will get delivered one after another. 
+            msg.MessageGroup = "message-group-0";
+            
+            // Verify messages are FIFO iff their message group is not null or empty.
+            Assert.IsTrue(msg.Fifo());
+
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendScheduledMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
+            
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
+        
+        /**
+         * Trying send a message that is both FIFO and Scheduled should fail.
+         */
+        [TestMethod]
+        public async Task TestSendMessage_Failure()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            msg.MessageGroup = "Group-0";
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
+
+            try
+            {
+                await producer.Send(msg);
+                Assert.Fail("Should have raised an exception");
+            }
+            catch (MessageException e)
+            {
+            }
+            await producer.Shutdown();
         }
 
-        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+        private static string resourceNamespace = "";
 
         private static string topic = "cpp_sdk_standard";
 
-        private static string clientId = "C001";
-        private static string group = "GID_cpp_sdk_standard";
-
-        private static INameServerResolver resolver;
-        private static ICredentialsProvider credentialsProvider;
-        private static string host = "116.62.231.199";
-        private static int port = 80;
+        private static string HOST = "127.0.0.1";
+        private static int PORT = 8081;
     }
 
 }
\ No newline at end of file
diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs
new file mode 100644
index 0000000..78f01de
--- /dev/null
+++ b/tests/PushConsumerTest.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+using System;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+
+    public class TestMessageListener : IMessageListener
+    {
+        public Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("");
+            }
+
+            return Task.CompletedTask;
+        }
+    }
+
+    public class CountableMessageListener : IMessageListener
+    {
+        public Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("{}", message.MessageId);
+            }
+
+            return Task.CompletedTask;
+        }
+    }
+
+    [TestClass]
+    public class PushConsumerTest
+    {
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            credentialsProvider = new ConfigFileCredentialsProvider();
+
+        }
+
+        [ClassCleanup]
+        public static void TearDown()
+        {
+
+        }
+
+        [TestInitialize]
+        public void SetUp()
+        {
+            accessPoint = new AccessPoint();
+            accessPoint.Host = host;
+            accessPoint.Port = port;
+        }
+
+        [TestMethod]
+        public void testLifecycle()
+        {
+            var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new TestMessageListener());
+            consumer.Start();
+
+            consumer.Shutdown();
+        }
+
+
+        // [Ignore]
+        [TestMethod]
+        public void testConsumeMessage()
+        {
+            var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new CountableMessageListener());
+            consumer.Start();
+            System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
+            consumer.Shutdown();
+        }
+
+
+        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+        private static string topic = "cpp_sdk_standard";
+
+        private static string group = "GID_cpp_sdk_standard";
+
+        private static ICredentialsProvider credentialsProvider;
+        private static string host = "116.62.231.199";
+        private static int port = 80;
+
+        private AccessPoint accessPoint;
+
+    }
+
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 5425973..a1ecf82 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -15,149 +15,132 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
-using Grpc.Net.Client;
-using rmq = global::apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
 using System;
-using pb = global::Google.Protobuf;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using grpc = Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
+
     [TestClass]
     public class RpcClientTest
     {
 
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
+        [TestMethod]
+        public async Task testTelemetry()
         {
-            string target = string.Format("https://{0}:{1}", host, port);
-            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            Console.WriteLine("Test Telemetry streaming");
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
+
+            var cmd = new rmq::TelemetryCommand();
+            cmd.Settings = new rmq::Settings();
+            cmd.Settings.ClientType = rmq::ClientType.Producer;
+            cmd.Settings.AccessPoint = new rmq::Endpoints();
+            cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
+            cmd.Settings.AccessPoint.Addresses.Add(address);
+            cmd.Settings.RequestTimeout = new Google.Protobuf.WellKnownTypes.Duration();
+            cmd.Settings.RequestTimeout.Seconds = 3;
+            cmd.Settings.RequestTimeout.Nanos = 0;
+            cmd.Settings.Publishing = new rmq::Publishing();
+            var topic = new rmq::Resource();
+            topic.Name = "cpp_sdk_standard";
+            cmd.Settings.Publishing.Topics.Add(topic);
+            cmd.Settings.UserAgent = new rmq::UA();
+            cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
+            cmd.Settings.UserAgent.Version = "1.0";
+            cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
+
+            var duplexStreaming = rpc_client.Telemetry(metadata);
+            var reader = duplexStreaming.ResponseStream;
+            var writer = duplexStreaming.RequestStream;
+
+            var cts = new CancellationTokenSource();
+            await writer.WriteAsync(cmd);
+            Console.WriteLine("Command written");
+            if (await reader.MoveNext(cts.Token))
             {
-                HttpHandler = ClientManager.createHttpHandler()
-            });
-            var invoker = channel.Intercept(new ClientLoggerInterceptor());
-            var client = new rmq::MessagingService.MessagingServiceClient(invoker);
-            rpcClient = new RpcClient(client);
+                var response = reader.Current;
+                switch (response.CommandCase)
+                {
+                    case rmq::TelemetryCommand.CommandOneofCase.Settings:
+                        {
+                            var responded_settings = response.Settings;
+                            Console.WriteLine($"{responded_settings.ToString()}");
+                            break;
+                        }
+                    case rmq::TelemetryCommand.CommandOneofCase.None:
+                        {
+                            Console.WriteLine($"Unknown response command type: {response.Status.ToString()}");
+                            break;
+                        }
+                }
+                Console.WriteLine("Server responded ");
+            }
+            else
+            {
+                Console.WriteLine("Server is not responding");
+                var status = duplexStreaming.GetStatus();
+                Console.WriteLine($"status={status.ToString()}");
 
-            clientConfig = new ClientConfig();
-            var credentialsProvider = new ConfigFileCredentialsProvider();
-            clientConfig.CredentialsProvider = credentialsProvider;
-            clientConfig.ResourceNamespace = resourceNamespace;
-            clientConfig.Region = "cn-hangzhou-pre";
-        }
-
-        [ClassCleanup]
-        public static void TearDown()
-        {
-
+                var trailers = duplexStreaming.GetTrailers();
+                Console.WriteLine($"trailers={trailers.ToString()}");
+            }
         }
 
         [TestMethod]
         public void testQueryRoute()
         {
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace;
-            request.Topic.Name = topic;
+            request.Topic.Name = "cpp_sdk_standard";
             request.Endpoints = new rmq::Endpoints();
             request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
             var address = new rmq::Address();
-            address.Host = host;
-            address.Port = port;
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
             request.Endpoints.Addresses.Add(address);
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.queryRoute(request, callOptions).GetAwaiter().GetResult();
-        }
-
-
-        [TestMethod]
-        public void testHeartbeat()
-        {
-            var request = new rmq::HeartbeatRequest();
-            request.ClientId = clientId;
-            request.ProducerData = new rmq::ProducerData();
-            request.ProducerData.Group = new rmq::Resource();
-            request.ProducerData.Group.ResourceNamespace = resourceNamespace;
-            request.ProducerData.Group.Name = topic;
-            request.FifoFlag = false;
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.heartbeat(request, callOptions).GetAwaiter().GetResult();
+            var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
+            var result = response.GetAwaiter().GetResult();
         }
 
         [TestMethod]
-        public void testSendMessage()
+        public async Task TestSend()
         {
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
+
             var request = new rmq::SendMessageRequest();
-            request.Message = new rmq::Message();
-            byte[] body = new byte[1024];
-            for (int i = 0; i < body.Length; i++)
-            {
-                body[i] = (byte)'x';
-            }
-            request.Message.Body = pb::ByteString.CopyFrom(body);
-            request.Message.Topic = new rmq::Resource();
-            request.Message.Topic.ResourceNamespace = resourceNamespace;
-            request.Message.Topic.Name = topic;
-            request.Message.UserAttribute.Add("k", "v");
-            request.Message.UserAttribute.Add("key", "value");
-            request.Message.SystemAttribute = new rmq::SystemAttribute();
-            request.Message.SystemAttribute.Tag = "TagA";
-            request.Message.SystemAttribute.Keys.Add("key1");
-            request.Message.SystemAttribute.MessageId = SequenceGenerator.Instance.Next();
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-
-            var response = rpcClient.sendMessage(request, callOptions).GetAwaiter().GetResult();
+            var message = new rmq::Message();
+            message.Topic = new rmq::Resource();
+            message.Topic.Name = "cpp_sdk_standard";
+            message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
+            message.SystemProperties = new rmq::SystemProperties();
+            message.SystemProperties.Tag = "TagA";
+            message.SystemProperties.MessageId = "abc";
+            request.Messages.Add(message);
+            var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
+            Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
         }
-
-        // Remove the Ignore annotation if server has fixed
-        [Ignore]
-        [TestMethod]
-        public void testNotifyClientTermiantion()
-        {
-            var request = new rmq::NotifyClientTerminationRequest();
-            request.ClientId = clientId;
-            request.ProducerGroup = new rmq::Resource();
-            request.ProducerGroup.ResourceNamespace = resourceNamespace;
-            request.ProducerGroup.Name = group;
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.notifyClientTermination(request, callOptions).GetAwaiter().GetResult();
-        }
-
-        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
-
-        private static string topic = "cpp_sdk_standard";
-
-        private static string clientId = "C001";
-        private static string group = "GID_cpp_sdk_standard";
-
-        private static string host = "116.62.231.199";
-        private static int port = 80;
-
-        private static IRpcClient rpcClient;
-        private static ClientConfig clientConfig;
     }
 }
\ No newline at end of file
diff --git a/tests/SendResultTest.cs b/tests/SendResultTest.cs
index 8dd033a..4e3d9a0 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SendResultTest.cs
@@ -17,28 +17,32 @@
 
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SendResultTest {
+    public class SendResultTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             string messageId = new string("abc");
-            var sendResult = new SendResult(messageId);
+            var sendResult = new SendReceipt(messageId);
             Assert.AreEqual(messageId, sendResult.MessageId);
             Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status);
         }
 
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string messageId = new string("abc");
-            var sendResult = new SendResult(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
+            var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
             Assert.AreEqual(messageId, sendResult.MessageId);
             Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status);
         }
 
     }
-    
+
 }
\ No newline at end of file
diff --git a/tests/SequenceGeneratorTest.cs b/tests/SequenceGeneratorTest.cs
index fc0ceb0..9b55334 100644
--- a/tests/SequenceGeneratorTest.cs
+++ b/tests/SequenceGeneratorTest.cs
@@ -19,7 +19,7 @@
 using System;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     [TestClass]
     public class SequenceGeneratorTest
diff --git a/tests/SignatureTest.cs b/tests/SignatureTest.cs
index cece257..16d0f46 100644
--- a/tests/SignatureTest.cs
+++ b/tests/SignatureTest.cs
@@ -19,10 +19,12 @@
 using Moq;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SignatureTest {
+    public class SignatureTest
+    {
 
         [TestMethod]
         public void testSign()
@@ -33,7 +35,7 @@
             mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
             mock.Setup(x => x.serviceName()).Returns("mq");
             mock.Setup(x => x.region()).Returns("cn-hangzhou");
-            
+
             string accessKey = "key";
             string accessSecret = "secret";
             var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
new file mode 100644
index 0000000..c986614
--- /dev/null
+++ b/tests/SimpleConsumerTest.cs
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using Castle.Core.Logging;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+
+    [TestClass]
+    public class SimpleConsumerTest
+    {
+
+        private static AccessPoint accessPoint;
+        private static string _resourceNamespace = "";
+        private static string _group = "GID_cpp_sdk_standard";
+        private static string _topic = "cpp_sdk_standard";
+
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            accessPoint = new AccessPoint
+            {
+                Host = "127.0.0.1",
+                Port = 8081
+            };
+        }
+
+        [TestMethod]
+        public async Task TestLifecycle()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            Thread.Sleep(1_000);
+            await simpleConsumer.Shutdown();
+        }
+
+        [TestMethod]
+        public async Task TestReceive()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            Assert.IsTrue(messages.Count > 0);
+            Assert.IsTrue(messages.Count <= batchSize);
+            await simpleConsumer.Shutdown();
+        }
+        
+        
+        [TestMethod]
+        public async Task TestAck()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            foreach (var message in messages)
+            {
+                await simpleConsumer.Ack(message);
+                Console.WriteLine($"Ack {message.MessageId} OK");
+            }
+            await simpleConsumer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestChangeInvisibleDuration()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            foreach (var message in messages)
+            {
+                await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(10));
+                Console.WriteLine($"ChangeInvisibleDuration for message[MsgId={message.MessageId}] OK");
+            }
+            await simpleConsumer.Shutdown();
+        }
+    }
+}
\ No newline at end of file
diff --git a/tests/StaticCredentialsProviderTest.cs b/tests/StaticCredentialsProviderTest.cs
index 20b957e..8b5f012 100644
--- a/tests/StaticCredentialsProviderTest.cs
+++ b/tests/StaticCredentialsProviderTest.cs
@@ -17,12 +17,15 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class StaticCredentialsProviderTest {
+    public class StaticCredentialsProviderTest
+    {
 
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var accessKey = "key";
             var accessSecret = "secret";
             var provider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/StaticNameServerResolverTest.cs b/tests/StaticNameServerResolverTest.cs
deleted file mode 100644
index 88955e9..0000000
--- a/tests/StaticNameServerResolverTest.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-
-namespace org.apache.rocketmq
-{
-    [TestClass]
-    public class StaticNameServerResolverTest
-    {
-        [TestMethod]
-        public void testResolve()
-        {
-            List<string> list = new List<string>();
-            list.Add("https://localhost:80");
-            var resolver = new StaticNameServerResolver(list);
-            var result = resolver.resolveAsync().GetAwaiter().GetResult();
-            Assert.AreSame(list, result);
-        }
-    }
-}
\ No newline at end of file
diff --git a/tests/TopicTest.cs b/tests/TopicTest.cs
index fcc15e4..9f386de 100644
--- a/tests/TopicTest.cs
+++ b/tests/TopicTest.cs
@@ -17,13 +17,16 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
-     
-     [TestClass]
-     public class TopicTest {
+namespace Org.Apache.Rocketmq
+{
 
-         [TestMethod]
-         public void testCompareTo() {
+    [TestClass]
+    public class TopicTest
+    {
+
+        [TestMethod]
+        public void testCompareTo()
+        {
             List<Topic> topics = new List<Topic>();
             topics.Add(new Topic("ns1", "t1"));
             topics.Add(new Topic("ns0", "t1"));
@@ -36,13 +39,13 @@
 
             Assert.AreEqual(topics[1].ResourceNamespace, "ns0");
             Assert.AreEqual(topics[1].Name, "t1");
-            
+
 
             Assert.AreEqual(topics[2].ResourceNamespace, "ns1");
             Assert.AreEqual(topics[2].Name, "t1");
-            
+
         }
 
 
-     }
- }
\ No newline at end of file
+    }
+}
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index 4689e52..bbf537a 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -1,8 +1,12 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;

-using org.apache.rocketmq;

+using Org.Apache.Rocketmq;

 using Grpc.Net.Client;

-using apache.rocketmq.v1;

+using rmq = Apache.Rocketmq.V2;

+

 using System;

+using System.Collections.Concurrent;

+using System.Collections.Generic;

+

 namespace tests

 {

     [TestClass]

@@ -11,40 +15,61 @@
         [TestMethod]

         public void TestMethod1()

         {

-            apache.rocketmq.v1.Permission perm = apache.rocketmq.v1.Permission.None;

-            switch(perm) {

-                case apache.rocketmq.v1.Permission.None:

-                {

-                    Console.WriteLine("None");

-                    break;

-                }

+            rmq::Permission perm = rmq::Permission.None;

+            switch (perm)
+            {

+                case rmq::Permission.None:
+                    {
+                        Console.WriteLine("None");
+                        break;
+                    }

 

-                case apache.rocketmq.v1.Permission.Read:

-                {

-                    Console.WriteLine("Read");

-                    break;

-                }

+                case rmq::Permission.Read:
+                    {
+                        Console.WriteLine("Read");
+                        break;
+                    }

 

-                case apache.rocketmq.v1.Permission.Write:

-                {

-                    Console.WriteLine("Write");

-                    break;

-                }

+                case rmq::Permission.Write:
+                    {
+                        Console.WriteLine("Write");
+                        break;
+                    }

 

-                case apache.rocketmq.v1.Permission.ReadWrite:

-                {

-                    Console.WriteLine("ReadWrite");

-                    break;

-                }

+                case rmq::Permission.ReadWrite:
+                    {
+                        Console.WriteLine("ReadWrite");
+                        break;
+                    }

 

             }

         }

 

         [TestMethod]

-        public void TestRpcClientImplCtor() {

-            using var channel = GrpcChannel.ForAddress("https://localhost:5001");

-            var client = new MessagingService.MessagingServiceClient(channel);

-            RpcClient impl = new RpcClient(client);

+        public void TestRpcClientImplCtor()
+        {

+            RpcClient impl = new RpcClient("https://localhost:5001");

+        }

+

+        [TestMethod]

+        public void TestConcurrentDictionary()

+        {

+            var dict = new ConcurrentDictionary<string, List<String>>();

+            string s = "abc";

+            List<String> result;

+            var exists = dict.TryGetValue(s, out result);

+            Assert.IsFalse(exists);

+            Assert.IsNull(result);

+

+            result = new List<string>();

+            result.Add("abc");

+            Assert.IsTrue(dict.TryAdd(s, result));

+

+            List<String> list;

+            exists = dict.TryGetValue(s, out list);

+            Assert.IsTrue(exists);

+            Assert.IsNotNull(list);

+            Assert.AreEqual(1, list.Count);

         }

     }

 }