[ISSUE #776] Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C# SDK (#777)

Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C#

---------

Co-authored-by: tsaitsung-han.tht <tsaitsung-han.tht@alibaba-inc.com>
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 6e57d7e..c5627ec 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -34,6 +34,10 @@
         private static long _successCounter;
         private static long _failureCounter;
 
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
         private static readonly BlockingCollection<Task<ISendReceipt>> Tasks =
             new BlockingCollection<Task<ISendReceipt>>();
 
@@ -79,14 +83,11 @@
         {
             // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
             // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
-            const string accessKey = "yourAccessKey";
-            const string secretKey = "yourSecretKey";
 
             // Credential provider is optional for client configuration.
-            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "foobar.com:8080";
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
             var clientConfig = new ClientConfig.Builder()
-                .SetEndpoints(endpoints)
+                .SetEndpoints(Endpoint)
                 .SetCredentialsProvider(credentialsProvider)
                 .Build();
 
@@ -108,6 +109,7 @@
                 .SetTag(tag)
                 // You could set multiple keys for the single message actually.
                 .SetKeys("yourMessageKey-7044358f98fc")
+                .SetMessageGroup("fifo-group")
                 .Build();
 
             DoStats();
diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index 1da8918..a97867d 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -27,18 +27,19 @@
     {
         private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerDelayMessageExample).FullName);
 
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
         internal static async Task QuickStart()
         {
             // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
             // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
-            const string accessKey = "yourAccessKey";
-            const string secretKey = "yourSecretKey";
 
             // Credential provider is optional for client configuration.
-            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "foobar.com:8080";
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
             var clientConfig = new ClientConfig.Builder()
-                .SetEndpoints(endpoints)
+                .SetEndpoints(Endpoint)
                 .SetCredentialsProvider(credentialsProvider)
                 .Build();
 
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index 138cd02..9a9d4fd 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Text;
 using System.Threading.Tasks;
 using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@
     {
         private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerFifoMessageExample).FullName);
 
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
         internal static async Task QuickStart()
         {
             // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
             // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
-            const string accessKey = "yourAccessKey";
-            const string secretKey = "yourSecretKey";
 
             // Credential provider is optional for client configuration.
-            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "foobar.com:8080";
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
             var clientConfig = new ClientConfig.Builder()
-                .SetEndpoints(endpoints)
+                .SetEndpoints(Endpoint)
                 .SetCredentialsProvider(credentialsProvider)
                 .Build();
 
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 6038eb6..21fb79c 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Text;
 using System.Threading.Tasks;
 using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@
     {
         private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerNormalMessageExample).FullName);
 
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
         internal static async Task QuickStart()
         {
             // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
             // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
-            const string accessKey = "yourAccessKey";
-            const string secretKey = "yourSecretKey";
 
             // Credential provider is optional for client configuration.
-            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "foobar.com:8080";
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
             var clientConfig = new ClientConfig.Builder()
-                .SetEndpoints(endpoints)
+                .SetEndpoints(Endpoint)
                 .SetCredentialsProvider(credentialsProvider)
                 .Build();
 
diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index d353f74..4c5b3a7 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Text;
 using System.Threading.Tasks;
 using Microsoft.Extensions.Logging;
@@ -26,6 +27,10 @@
     {
         private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerTransactionMessageExample).FullName);
 
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
         private class TransactionChecker : ITransactionChecker
         {
             public TransactionResolution Check(MessageView messageView)
@@ -39,14 +44,11 @@
         {
             // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
             // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
-            const string accessKey = "yourAccessKey";
-            const string secretKey = "yourSecretKey";
 
             // Credential provider is optional for client configuration.
-            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "foobar.com:8080";
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
             var clientConfig = new ClientConfig.Builder()
-                .SetEndpoints(endpoints)
+                .SetEndpoints(Endpoint)
                 .SetCredentialsProvider(credentialsProvider)
                 .Build();
 
@@ -76,9 +78,9 @@
             var sendReceipt = await producer.Send(message, transaction);
             Logger.LogInformation("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
             // Commit the transaction.
-            transaction.Commit();
+            await transaction.Commit();
             // Or rollback the transaction.
-            // transaction.Rollback();
+            // await transaction.Rollback();
 
             // Close the producer if you don't need it anymore.
             await producer.DisposeAsync();
diff --git a/csharp/examples/PushConsumerExample.cs b/csharp/examples/PushConsumerExample.cs
new file mode 100644
index 0000000..00fbbf7
--- /dev/null
+++ b/csharp/examples/PushConsumerExample.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+    public class PushConsumerExample
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName);
+
+        private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+        private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+        private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
+        internal static async Task QuickStart()
+        {
+            // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
+            // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+
+            // Credential provider is optional for client configuration.
+            var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(Endpoint)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            // Add your subscriptions.
+            const string consumerGroup = "yourConsumerGroup";
+            const string topic = "yourTopic";
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression("*") } };
+
+            var pushConsumer = await new PushConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetSubscriptionExpression(subscription)
+                .SetMessageListener(new CustomMessageListener())
+                .Build();
+
+            Thread.Sleep(Timeout.Infinite);
+
+            // Close the push consumer if you don't need it anymore.
+            // await pushConsumer.DisposeAsync();
+        }
+
+        private class CustomMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                // Handle the received message and return consume result.
+                Logger.LogInformation($"Consume message={messageView}");
+                return ConsumeResult.SUCCESS;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 63d57e8..ec5992d 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -34,6 +34,7 @@
             // ProducerFifoMessageExample.QuickStart().Wait();

             // ProducerDelayMessageExample.QuickStart().Wait();

             // ProducerTransactionMessageExample.QuickStart().Wait();

+            // PushConsumerExample.QuickStart().Wait();

             // SimpleConsumerExample.QuickStart().Wait();

             // ProducerBenchmark.QuickStart().Wait();

         }

diff --git a/csharp/rocketmq-client-csharp/Assignment.cs b/csharp/rocketmq-client-csharp/Assignment.cs
new file mode 100644
index 0000000..1005676
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Assignment.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Assignment
+    {
+        public Assignment(MessageQueue messageQueue)
+        {
+            MessageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
+        }
+
+        public MessageQueue MessageQueue { get; }
+
+        public override bool Equals(object obj)
+        {
+            if (this == obj) return true;
+            if (obj == null || GetType() != obj.GetType()) return false;
+
+            var other = (Assignment)obj;
+            return EqualityComparer<MessageQueue>.Default.Equals(MessageQueue, other.MessageQueue);
+        }
+
+        public override int GetHashCode()
+        {
+            return EqualityComparer<MessageQueue>.Default.GetHashCode(MessageQueue);
+        }
+
+        public override string ToString()
+        {
+            return $"Assignment{{messageQueue={MessageQueue}}}";
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Assignments.cs b/csharp/rocketmq-client-csharp/Assignments.cs
new file mode 100644
index 0000000..a25f5ea
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Assignments.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Assignments
+    {
+        private readonly List<Assignment> _assignmentList;
+
+        public Assignments(List<Assignment> assignmentList)
+        {
+            _assignmentList = assignmentList;
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (this == obj)
+            {
+                return true;
+            }
+
+            if (obj == null || GetType() != obj.GetType())
+            {
+                return false;
+            }
+
+            var other = (Assignments)obj;
+            return _assignmentList.SequenceEqual(other._assignmentList);
+        }
+
+        public override int GetHashCode()
+        {
+            return HashCode.Combine(_assignmentList);
+        }
+
+        public override string ToString()
+        {
+            return $"{nameof(Assignments)} {{ {nameof(_assignmentList)} = {_assignmentList} }}";
+        }
+
+        public List<Assignment> GetAssignmentList()
+        {
+            return _assignmentList;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 1c81b41..491ef4c 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -21,10 +21,13 @@
 using System.Threading;
 using System;
 using System.Linq;
+using System.Runtime.CompilerServices;
 using Microsoft.Extensions.Logging;
 using Proto = Apache.Rocketmq.V2;
 using grpcLib = Grpc.Core;
 
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
 namespace Org.Apache.Rocketmq
 {
     public abstract class Client
@@ -49,7 +52,7 @@
 
         protected readonly ClientConfig ClientConfig;
         protected readonly Endpoints Endpoints;
-        protected readonly IClientManager ClientManager;
+        protected IClientManager ClientManager;
         protected readonly string ClientId;
         protected readonly ClientMeterManager ClientMeterManager;
 
@@ -113,7 +116,7 @@
             Logger.LogDebug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
         }
 
-        private (bool, Session) GetSession(Endpoints endpoints)
+        private protected (bool, Session) GetSession(Endpoints endpoints)
         {
             _sessionLock.EnterReadLock();
             try
@@ -151,11 +154,11 @@
 
         protected abstract IEnumerable<string> GetTopics();
 
-        protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
+        internal abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
 
         protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData);
 
-        private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
+        internal async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
         {
             var routeEndpoints = new HashSet<Endpoints>();
             foreach (var mq in topicRouteData.MessageQueues)
@@ -261,7 +264,7 @@
                 $"AvailableCompletionPortThreads={availableIo}");
         }
 
-        private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
+        private protected void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
         {
             Task.Run(async () =>
             {
@@ -313,6 +316,7 @@
                 {
                     Topic = new Proto::Resource
                     {
+                        ResourceNamespace = ClientConfig.Namespace,
                         Name = topic
                     },
                     Endpoints = Endpoints.ToProtobuf()
@@ -397,7 +401,7 @@
             return metadata;
         }
 
-        protected abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();
+        internal abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();
 
         private async void NotifyClientTermination()
         {
@@ -432,6 +436,17 @@
             return ClientConfig;
         }
 
+        internal IClientManager GetClientManager()
+        {
+            return ClientManager;
+        }
+
+        // Only for testing
+        internal void SetClientManager(IClientManager clientManager)
+        {
+            ClientManager = clientManager;
+        }
+
         internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
             Proto.RecoverOrphanedTransactionCommand command)
         {
@@ -439,7 +454,7 @@
                               $"clientId={ClientId}, endpoints={endpoints}");
         }
 
-        internal async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
+        internal virtual async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
         {
             // Only push consumer support message consumption verification.
             Logger.LogWarning($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " +
@@ -489,7 +504,7 @@
 
         internal void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
         {
-            var metric = new Metric(settings.Metric);
+            var metric = new Metric(settings.Metric ?? new Proto.Metric());
             ClientMeterManager.Reset(metric);
             GetSettings().Sync(settings);
         }
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index cf02a19..ed17f7d 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -22,12 +22,13 @@
     public class ClientConfig
     {
         private ClientConfig(ISessionCredentialsProvider sessionCredentialsProvider, TimeSpan requestTimeout,
-            string endpoints, bool sslEnabled)
+            string endpoints, bool sslEnabled, string namespaceName)
         {
             SessionCredentialsProvider = sessionCredentialsProvider;
             RequestTimeout = requestTimeout;
             Endpoints = endpoints;
             SslEnabled = sslEnabled;
+            Namespace = namespaceName;
         }
 
         public ISessionCredentialsProvider SessionCredentialsProvider { get; }
@@ -38,12 +39,15 @@
 
         public bool SslEnabled { get; }
 
+        public string Namespace { get; }
+
         public class Builder
         {
             private ISessionCredentialsProvider _sessionCredentialsProvider;
             private TimeSpan _requestTimeout = TimeSpan.FromSeconds(3);
             private string _endpoints;
             private bool _sslEnabled = true;
+            private string _namespace = "";
 
             public Builder SetCredentialsProvider(ISessionCredentialsProvider sessionCredentialsProvider)
             {
@@ -69,9 +73,15 @@
                 return this;
             }
 
+            public Builder SetNamespace(string namespaceName)
+            {
+                _namespace = namespaceName;
+                return this;
+            }
+
             public ClientConfig Build()
             {
-                return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled);
+                return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled, _namespace);
             }
         }
     }
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index fed6f38..e42a29d 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -143,6 +143,7 @@
         {
             var metadata = _client.Sign();
             var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);
+
             return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
                 request, response, metadata);
         }
@@ -166,6 +167,16 @@
                 request, response, metadata);
         }
 
+        public async Task<RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>>
+            ForwardMessageToDeadLetterQueue(Endpoints endpoints,
+                Proto.ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+        {
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).ForwardMessageToDeadLetterQueue(metadata, request, timeout);
+            return new RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>(
+                    request, response, metadata);
+        }
+
         public async Task<RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>> EndTransaction(
             Endpoints endpoints, Proto.EndTransactionRequest request, TimeSpan timeout)
         {
diff --git a/csharp/rocketmq-client-csharp/ConsumeResult.cs b/csharp/rocketmq-client-csharp/ConsumeResult.cs
new file mode 100644
index 0000000..6cd2124
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeResult.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+    /// <summary>
+    /// Designed for push consumer specifically.
+    /// </summary>
+    public enum ConsumeResult
+    {
+        /// <summary>
+        /// Consume message successfully.
+        /// </summary>
+        SUCCESS,
+        /// <summary>
+        /// Failed to consume message.
+        /// </summary>
+        FAILURE
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ConsumeService.cs b/csharp/rocketmq-client-csharp/ConsumeService.cs
new file mode 100644
index 0000000..7dcc667
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeService.cs
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+    public abstract class ConsumeService
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<ConsumeService>();
+
+        protected readonly string ClientId;
+        private readonly IMessageListener _messageListener;
+        private readonly TaskScheduler _consumptionTaskScheduler;
+        private readonly CancellationToken _consumptionCtsToken;
+
+        public ConsumeService(string clientId, IMessageListener messageListener, TaskScheduler consumptionTaskScheduler,
+            CancellationToken consumptionCtsToken)
+        {
+            ClientId = clientId;
+            _messageListener = messageListener;
+            _consumptionTaskScheduler = consumptionTaskScheduler;
+            _consumptionCtsToken = consumptionCtsToken;
+        }
+
+        public abstract void Consume(ProcessQueue pq, List<MessageView> messageViews);
+
+        public Task<ConsumeResult> Consume(MessageView messageView)
+        {
+            return Consume(messageView, TimeSpan.Zero);
+        }
+
+        public Task<ConsumeResult> Consume(MessageView messageView, TimeSpan delay)
+        {
+            var task = new ConsumeTask(ClientId, _messageListener, messageView);
+            var delayMilliseconds = (int)delay.TotalMilliseconds;
+
+            if (delayMilliseconds <= 0)
+            {
+                return Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None,
+                    _consumptionTaskScheduler);
+            }
+
+            var tcs = new TaskCompletionSource<ConsumeResult>();
+
+            Task.Run(async () =>
+            {
+                try
+                {
+                    await Task.Delay(delay, _consumptionCtsToken);
+                    var result = await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken,
+                        TaskCreationOptions.None, _consumptionTaskScheduler);
+                    tcs.SetResult(result);
+                }
+                catch (Exception e)
+                {
+                    Logger.LogError(e, $"Error while consuming message, clientId={ClientId}");
+                    tcs.SetException(e);
+                }
+            }, _consumptionCtsToken);
+
+            return tcs.Task;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ConsumeTask.cs b/csharp/rocketmq-client-csharp/ConsumeTask.cs
new file mode 100644
index 0000000..9214b68
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeTask.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+    public class ConsumeTask
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<ConsumeTask>();
+
+        private readonly string _clientId;
+        private readonly IMessageListener _messageListener;
+        private readonly MessageView _messageView;
+
+        public ConsumeTask(string clientId, IMessageListener messageListener, MessageView messageView)
+        {
+            _clientId = clientId;
+            _messageListener = messageListener;
+            _messageView = messageView;
+        }
+
+        /// <summary>
+        /// Invoke IMessageListener to consume the message.
+        /// </summary>
+        /// <returns>Message(s) which are consumed successfully.</returns>
+        public ConsumeResult Call()
+        {
+            try
+            {
+                var consumeResult = _messageListener.Consume(_messageView);
+                return consumeResult;
+            }
+            catch (Exception e)
+            {
+                Logger.LogError(e, $"Message listener raised an exception while consuming messages, clientId={_clientId}," +
+                                   $" mq={_messageView.MessageQueue}, messageId={_messageView.MessageId}");
+                return ConsumeResult.FAILURE;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
index 0bf7a45..2ad135b 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -18,11 +18,14 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Runtime.CompilerServices;
 using System.Text.RegularExpressions;
 using System.Threading.Tasks;
 using Google.Protobuf.WellKnownTypes;
 using Proto = Apache.Rocketmq.V2;
 
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
 namespace Org.Apache.Rocketmq
 {
     public abstract class Consumer : Client
@@ -36,7 +39,7 @@
             ConsumerGroup = consumerGroup;
         }
 
-        protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
+        internal async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
             TimeSpan awaitDuration)
         {
             var tolerance = ClientConfig.RequestTimeout;
@@ -85,11 +88,12 @@
             };
         }
 
-        protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+        internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
             FilterExpression filterExpression, TimeSpan awaitDuration, TimeSpan invisibleDuration)
         {
             var group = new Proto.Resource
             {
+                ResourceNamespace = ClientConfig.Namespace,
                 Name = ConsumerGroup
             };
             return new Proto.ReceiveMessageRequest
@@ -103,5 +107,26 @@
                 InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
             };
         }
+
+        protected internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+            FilterExpression filterExpression, TimeSpan awaitDuration, string attemptId)
+        {
+            attemptId ??= Guid.NewGuid().ToString();
+            var group = new Proto.Resource
+            {
+                ResourceNamespace = ClientConfig.Namespace,
+                Name = ConsumerGroup
+            };
+            return new Proto.ReceiveMessageRequest
+            {
+                Group = group,
+                MessageQueue = mq.ToProtobuf(),
+                FilterExpression = WrapFilterExpression(filterExpression),
+                LongPollingTimeout = Duration.FromTimeSpan(awaitDuration),
+                BatchSize = batchSize,
+                AutoRenew = true,
+                AttemptId = attemptId
+            };
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs
new file mode 100644
index 0000000..83f1bff
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class CustomizedBackoffRetryPolicy : IRetryPolicy
+    {
+        private readonly int _maxAttempts;
+        private readonly List<TimeSpan> _durations;
+
+        public CustomizedBackoffRetryPolicy(List<TimeSpan> durations, int maxAttempts)
+        {
+            if (durations == null || !durations.Any())
+            {
+                throw new ArgumentException("durations must not be empty", nameof(durations));
+            }
+            _durations = durations;
+            _maxAttempts = maxAttempts;
+        }
+
+        public int GetMaxAttempts()
+        {
+            return _maxAttempts;
+        }
+
+        public List<TimeSpan> GetDurations()
+        {
+            return _durations;
+        }
+
+        public TimeSpan GetNextAttemptDelay(int attempt)
+        {
+            if (attempt <= 0)
+            {
+                throw new ArgumentException("attempt must be positive", nameof(attempt));
+            }
+            return attempt > _durations.Count ? _durations.Last() : _durations[attempt - 1];
+        }
+
+        public static CustomizedBackoffRetryPolicy FromProtobuf(RetryPolicy retryPolicy)
+        {
+            if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff))
+            {
+                throw new ArgumentException("Illegal retry policy");
+            }
+            var customizedBackoff = retryPolicy.CustomizedBackoff;
+            var durations = customizedBackoff.Next.Select(duration => duration.ToTimeSpan()).ToList();
+            return new CustomizedBackoffRetryPolicy(durations, retryPolicy.MaxAttempts);
+        }
+
+        public RetryPolicy ToProtobuf()
+        {
+            var customizedBackoff = new CustomizedBackoff
+            {
+                Next = { _durations.Select(Duration.FromTimeSpan) }
+            };
+            return new RetryPolicy
+            {
+                MaxAttempts = _maxAttempts,
+                CustomizedBackoff = customizedBackoff
+            };
+        }
+
+        public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy)
+        {
+            if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff))
+            {
+                throw new InvalidOperationException("Strategy must be customized backoff");
+            }
+
+            return InheritBackoff(retryPolicy.CustomizedBackoff);
+        }
+
+        private IRetryPolicy InheritBackoff(CustomizedBackoff retryPolicy)
+        {
+            var durations = retryPolicy.Next.Select(duration => duration.ToTimeSpan()).ToList();
+            return new CustomizedBackoffRetryPolicy(durations, _maxAttempts);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
index d4826d8..1ee7a28 100644
--- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
@@ -25,7 +25,7 @@
     {
         private readonly int _maxAttempts;
 
-        private ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
+        public ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
             double backoffMultiplier)
         {
             _maxAttempts = maxAttempts;
@@ -39,11 +39,11 @@
             return _maxAttempts;
         }
 
-        private TimeSpan InitialBackoff { get; }
+        public TimeSpan InitialBackoff { get; }
 
-        private TimeSpan MaxBackoff { get; }
+        public TimeSpan MaxBackoff { get; }
 
-        private double BackoffMultiplier { get; }
+        public double BackoffMultiplier { get; }
 
         public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy)
         {
@@ -63,6 +63,10 @@
 
         public TimeSpan GetNextAttemptDelay(int attempt)
         {
+            if (attempt <= 0)
+            {
+                throw new ArgumentException("attempt must be positive", nameof(attempt));
+            }
             var delayMillis = Math.Min(
                 InitialBackoff.TotalMilliseconds * Math.Pow(BackoffMultiplier, 1.0 * (attempt - 1)),
                 MaxBackoff.TotalMilliseconds);
@@ -88,5 +92,18 @@
                 ExponentialBackoff = exponentialBackoff
             };
         }
+
+        public static ExponentialBackoffRetryPolicy FromProtobuf(Proto.RetryPolicy retryPolicy)
+        {
+            if (!retryPolicy.StrategyCase.Equals(Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff))
+            {
+                throw new ArgumentException("Illegal retry policy");
+            }
+            var exponentialBackoff = retryPolicy.ExponentialBackoff;
+            return new ExponentialBackoffRetryPolicy(retryPolicy.MaxAttempts,
+                exponentialBackoff.Initial.ToTimeSpan(),
+                exponentialBackoff.Max.ToTimeSpan(),
+                exponentialBackoff.Multiplier);
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/FifoConsumeService.cs b/csharp/rocketmq-client-csharp/FifoConsumeService.cs
new file mode 100644
index 0000000..b293c41
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/FifoConsumeService.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+    public class FifoConsumeService : ConsumeService
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<FifoConsumeService>();
+
+        public FifoConsumeService(string clientId, IMessageListener messageListener,
+            TaskScheduler consumptionExecutor, CancellationToken consumptionCtsToken) :
+            base(clientId, messageListener, consumptionExecutor, consumptionCtsToken)
+        {
+        }
+
+        public override void Consume(ProcessQueue pq, List<MessageView> messageViews)
+        {
+            ConsumeIteratively(pq, messageViews.GetEnumerator());
+        }
+
+        public void ConsumeIteratively(ProcessQueue pq, IEnumerator<MessageView> iterator)
+        {
+            if (!iterator.MoveNext())
+            {
+                return;
+            }
+
+            var messageView = iterator.Current;
+
+            if (messageView != null && messageView.IsCorrupted())
+            {
+                // Discard corrupted message.
+                Logger.LogError($"Message is corrupted for FIFO consumption, prepare to discard it," +
+                                $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
+                pq.DiscardFifoMessage(messageView);
+                ConsumeIteratively(pq, iterator); // Recursively consume the next message
+                return;
+            }
+
+            var consumeTask = Consume(messageView);
+            consumeTask.ContinueWith(async t =>
+            {
+                var result = await t;
+                await pq.EraseFifoMessage(messageView, result);
+            }, TaskContinuationOptions.ExecuteSynchronously).ContinueWith(_ => ConsumeIteratively(pq, iterator),
+                TaskContinuationOptions.ExecuteSynchronously);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index 19f9459..743df9f 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -113,6 +113,16 @@
             Endpoints endpoints, ChangeInvisibleDurationRequest request, TimeSpan timeout);
 
         /// <summary>
+        /// Send a message to the dead letter queue asynchronously, the method ensures no throwable.
+        /// </summary>
+        /// <param name="endpoints">Requested endpoints.</param>
+        /// <param name="request">Request of sending a message to DLQ.</param>
+        /// <param name="timeout">Request max duration.</param>
+        /// <returns></returns>
+        Task<RpcInvocation<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse>> ForwardMessageToDeadLetterQueue(
+            Endpoints endpoints, ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout);
+
+        /// <summary>
         /// Transaction ending request.
         /// </summary>
         /// <param name="endpoints">The target endpoints.</param>
diff --git a/csharp/rocketmq-client-csharp/IMessageListener.cs b/csharp/rocketmq-client-csharp/IMessageListener.cs
new file mode 100644
index 0000000..d011fc8
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/IMessageListener.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+    public interface IMessageListener
+    {
+        /// <summary>
+        /// The callback interface to consume the message.
+        /// </summary>
+        /// <remarks>
+        /// You should process the <see cref="MessageView"/> and return the corresponding <see cref="ConsumeResult"/>.
+        /// The consumption is successful only when <see cref="ConsumeResult.SUCCESS"/> is returned, null pointer is returned
+        /// or exception is thrown would cause message consumption failure too.
+        /// </remarks>
+        ConsumeResult Consume(MessageView messageView);
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
index 27c770b..7abd5da 100644
--- a/csharp/rocketmq-client-csharp/ITransaction.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
+using System.Threading.Tasks;
+
 namespace Org.Apache.Rocketmq
 {
     public interface ITransaction
     {
-        void Commit();
+        Task Commit();
 
-        void Rollback();
+        Task Rollback();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index 52b821a..aaa5ebb 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -77,7 +77,17 @@
 
         public DateTime BornTime { get; }
 
-        public int DeliveryAttempt { get; }
+        public int DeliveryAttempt { get; set; }
+
+        public int IncrementAndGetDeliveryAttempt()
+        {
+            return ++DeliveryAttempt;
+        }
+
+        public bool IsCorrupted()
+        {
+            return _corrupted;
+        }
 
         public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue = null)
         {
diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs
new file mode 100644
index 0000000..0c182d7
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq.Error;
+
+namespace Org.Apache.Rocketmq
+{
+    /// <summary>
+    /// Process queue is a cache to store fetched messages from remote for <c>PushConsumer</c>.
+    /// 
+    /// <c>PushConsumer</c> queries assignments periodically and converts them into message queues, each message queue is
+    /// mapped into one process queue to fetch message from remote. If the message queue is removed from the newest 
+    /// assignment, the corresponding process queue is marked as expired soon, which means its lifecycle is over.
+    /// </summary>
+    public class ProcessQueue
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<ProcessQueue>();
+
+        internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1);
+        internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1);
+        internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1);
+
+        private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20);
+        private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan ReceivingBackoffDelayWhenCacheIsFull = TimeSpan.FromSeconds(1);
+
+        private readonly PushConsumer _consumer;
+
+        /// <summary>
+        /// Dropped means ProcessQueue is deprecated, which means no message would be fetched from remote anymore.
+        /// </summary>
+        private volatile bool _dropped;
+        private readonly MessageQueue _mq;
+        private readonly FilterExpression _filterExpression;
+
+        /// <summary>
+        /// Messages which is pending means have been cached, but are not taken by consumer dispatcher yet.
+        /// </summary>
+        private readonly List<MessageView> _cachedMessages;
+        private readonly ReaderWriterLockSlim _cachedMessageLock;
+        private long _cachedMessagesBytes;
+
+        private long _activityTime = DateTime.UtcNow.Ticks;
+        private long _cacheFullTime = long.MinValue;
+
+        private readonly CancellationTokenSource _receiveMsgCts;
+        private readonly CancellationTokenSource _ackMsgCts;
+        private readonly CancellationTokenSource _changeInvisibleDurationCts;
+        private readonly CancellationTokenSource _forwardMessageToDeadLetterQueueCts;
+
+        public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression,
+            CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts,
+            CancellationTokenSource changeInvisibleDurationCts, CancellationTokenSource forwardMessageToDeadLetterQueueCts)
+        {
+            _consumer = consumer;
+            _dropped = false;
+            _mq = mq;
+            _filterExpression = filterExpression;
+            _cachedMessages = new List<MessageView>();
+            _cachedMessageLock = new ReaderWriterLockSlim();
+            _cachedMessagesBytes = 0;
+            _receiveMsgCts = receiveMsgCts;
+            _ackMsgCts = ackMsgCts;
+            _changeInvisibleDurationCts = changeInvisibleDurationCts;
+            _forwardMessageToDeadLetterQueueCts = forwardMessageToDeadLetterQueueCts;
+        }
+
+        /// <summary>
+        /// Get the mapped message queue.
+        /// </summary>
+        /// <returns>mapped message queue.</returns>
+        public MessageQueue GetMessageQueue()
+        {
+            return _mq;
+        }
+
+        /// <summary>
+        /// Drop the current process queue, which means the process queue's lifecycle is over,
+        /// thus it would not fetch messages from the remote anymore if dropped.
+        /// </summary>
+        public void Drop()
+        {
+            _dropped = true;
+        }
+
+        /// <summary>
+        /// ProcessQueue would be regarded as expired if no fetch message for a long time.
+        /// </summary>
+        /// <returns>if it is expired.</returns>
+        public bool Expired()
+        {
+            var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
+            var requestTimeout = _consumer.GetClientConfig().RequestTimeout;
+            var maxIdleDuration = longPollingTimeout.Add(requestTimeout).Multiply(3);
+            var idleDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _activityTime);
+            if (idleDuration < maxIdleDuration.Ticks)
+            {
+                return false;
+            }
+            var afterCacheFullDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _cacheFullTime);
+            if (afterCacheFullDuration < maxIdleDuration.Ticks)
+            {
+                return false;
+            }
+            Logger.LogWarning(
+                $"Process queue is idle, idleDuration={idleDuration}, maxIdleDuration={maxIdleDuration}," +
+                $" afterCacheFullDuration={afterCacheFullDuration}, mq={_mq}, clientId={_consumer.GetClientId()}");
+            return true;
+        }
+
+        internal void CacheMessages(List<MessageView> messageList)
+        {
+            _cachedMessageLock.EnterWriteLock();
+            try
+            {
+                foreach (var messageView in messageList)
+                {
+                    _cachedMessages.Add(messageView);
+                    Interlocked.Add(ref _cachedMessagesBytes, messageView.Body.Length);
+                }
+            }
+            finally
+            {
+                _cachedMessageLock.ExitWriteLock();
+            }
+        }
+
+        private int GetReceptionBatchSize()
+        {
+            var bufferSize = _consumer.CacheMessageCountThresholdPerQueue() - CachedMessagesCount();
+            bufferSize = Math.Max(bufferSize, 1);
+            return Math.Min(bufferSize, _consumer.GetPushConsumerSettings().GetReceiveBatchSize());
+        }
+
+        /// <summary>
+        /// Start to fetch messages from remote immediately.
+        /// </summary>
+        public void FetchMessageImmediately()
+        {
+            ReceiveMessageImmediately();
+        }
+
+        /// <summary>
+        /// Receive message later by message queue.
+        /// </summary>
+        /// <remarks>
+        /// Make sure that no exception will be thrown.
+        /// </remarks>
+        public void OnReceiveMessageException(Exception t, string attemptId)
+        {
+            var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay;
+            ReceiveMessageLater(delay, attemptId);
+        }
+
+        private void ReceiveMessageLater(TimeSpan delay, string attemptId)
+        {
+            var clientId = _consumer.GetClientId();
+            Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}");
+            Task.Run(async () =>
+            {
+                try
+                {
+                    await Task.Delay(delay, _receiveMsgCts.Token);
+                    ReceiveMessage(attemptId);
+                }
+                catch (Exception ex)
+                {
+                    if (_receiveMsgCts.IsCancellationRequested)
+                    {
+                        return;
+                    }
+                    Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}");
+                    OnReceiveMessageException(ex, attemptId);
+                }
+            });
+        }
+
+        private string GenerateAttemptId()
+        {
+            return Guid.NewGuid().ToString();
+        }
+
+        public void ReceiveMessage()
+        {
+            ReceiveMessage(GenerateAttemptId());
+        }
+
+        public void ReceiveMessage(string attemptId)
+        {
+            var clientId = _consumer.GetClientId();
+            if (_dropped)
+            {
+                Logger.LogInformation($"Process queue has been dropped, no longer receive message, mq={_mq}, clientId={clientId}");
+                return;
+            }
+            if (IsCacheFull())
+            {
+                Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}");
+                ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId);
+                return;
+            }
+            ReceiveMessageImmediately(attemptId);
+        }
+
+        private void ReceiveMessageImmediately()
+        {
+            ReceiveMessageImmediately(GenerateAttemptId());
+        }
+
+        private void ReceiveMessageImmediately(string attemptId)
+        {
+            var clientId = _consumer.GetClientId();
+            if (_consumer.State != State.Running)
+            {
+                Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}");
+                return;
+            }
+
+            try
+            {
+                var endpoints = _mq.Broker.Endpoints;
+                var batchSize = GetReceptionBatchSize();
+                var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
+                var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout,
+                    attemptId);
+
+                Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks);
+
+                var task = _consumer.ReceiveMessage(request, _mq, longPollingTimeout);
+                task.ContinueWith(t =>
+                {
+                    if (t.IsFaulted)
+                    {
+                        string nextAttemptId = null;
+                        if (t.Exception is { InnerException: RpcException { StatusCode: StatusCode.DeadlineExceeded } })
+                        {
+                            nextAttemptId = request.AttemptId;
+                        }
+
+                        Logger.LogError(t.Exception, $"Exception raised during message reception, mq={_mq}," +
+                                                     $" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," +
+                                                     $" clientId={clientId}");
+                        OnReceiveMessageException(t.Exception, nextAttemptId);
+                    }
+                    else
+                    {
+                        try
+                        {
+                            var result = t.Result;
+                            OnReceiveMessageResult(result);
+                        }
+                        catch (Exception ex)
+                        {
+                            // Should never reach here.
+                            Logger.LogError($"[Bug] Exception raised while handling receive result, mq={_mq}," +
+                                            $" endpoints={endpoints}, clientId={clientId}, exception={ex}");
+                            OnReceiveMessageException(ex, attemptId);
+                        }
+                    }
+                }, TaskContinuationOptions.ExecuteSynchronously);
+            }
+            catch (Exception ex)
+            {
+                Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}");
+                OnReceiveMessageException(ex, attemptId);
+            }
+        }
+
+        private void OnReceiveMessageResult(ReceiveMessageResult result)
+        {
+            var messages = result.Messages;
+            if (messages.Count > 0)
+            {
+                CacheMessages(messages);
+                _consumer.GetConsumeService().Consume(this, messages);
+            }
+            ReceiveMessage();
+        }
+
+        private bool IsCacheFull()
+        {
+            var cacheMessageCountThresholdPerQueue = _consumer.CacheMessageCountThresholdPerQueue();
+            var actualMessagesQuantity = CachedMessagesCount();
+            var clientId = _consumer.GetClientId();
+            if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity)
+            {
+                Logger.LogWarning($"Process queue total cached messages quantity exceeds the threshold," +
+                                  $" threshold={cacheMessageCountThresholdPerQueue}, actual={actualMessagesQuantity}," +
+                                  $" mq={_mq}, clientId={clientId}");
+                Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
+                return true;
+            }
+
+            var cacheMessageBytesThresholdPerQueue = _consumer.CacheMessageBytesThresholdPerQueue();
+            var actualCachedMessagesBytes = CachedMessageBytes();
+            if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes)
+            {
+                Logger.LogWarning($"Process queue total cached messages memory exceeds the threshold," +
+                                  $" threshold={cacheMessageBytesThresholdPerQueue} bytes," +
+                                  $" actual={actualCachedMessagesBytes} bytes, mq={_mq}, clientId={clientId}");
+                Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
+                return true;
+            }
+
+            return false;
+        }
+
+        /// <summary>
+        /// Erase messages(Non-FIFO-consume-mode) which have been consumed properly.
+        /// </summary>
+        /// <param name="messageView">the message to erase.</param>
+        /// <param name="consumeResult">consume result.</param>
+        public void EraseMessage(MessageView messageView, ConsumeResult consumeResult)
+        {
+            var task = ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : NackMessage(messageView);
+            _ = task.ContinueWith(_ =>
+            {
+                EvictCache(messageView);
+            }, TaskContinuationOptions.ExecuteSynchronously);
+        }
+
+        private Task AckMessage(MessageView messageView)
+        {
+            var tcs = new TaskCompletionSource<bool>();
+            AckMessage(messageView, 1, tcs);
+            return tcs.Task;
+        }
+
+        private void AckMessage(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+        {
+            var clientId = _consumer.GetClientId();
+            var consumerGroup = _consumer.GetConsumerGroup();
+            var messageId = messageView.MessageId;
+            var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+            var request = _consumer.WrapAckMessageRequest(messageView);
+            var task = _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+                _consumer.GetClientConfig().RequestTimeout);
+
+            task.ContinueWith(responseTask =>
+            {
+                if (responseTask.IsFaulted)
+                {
+                    Logger.LogError(responseTask.Exception, $"Exception raised while acknowledging message," +
+                                                            $" would retry later, clientId={clientId}," +
+                                                            $" consumerGroup={consumerGroup}," +
+                                                            $" messageId={messageId}," +
+                                                            $" mq={_mq}, endpoints={endpoints}");
+                    AckMessageLater(messageView, attempt + 1, tcs);
+                }
+                else
+                {
+                    var invocation = responseTask.Result;
+                    var requestId = invocation.RequestId;
+                    var status = invocation.Response.Status;
+                    var statusCode = status.Code;
+
+                    if (statusCode == Code.InvalidReceiptHandle)
+                    {
+                        Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," +
+                                        $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
+                                        $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
+                                        $" status message={status.Message}");
+                        tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
+                    }
+
+                    if (statusCode != Code.Ok)
+                    {
+                        Logger.LogError(
+                            $"Failed to change invisible duration, would retry later, clientId={clientId}," +
+                            $" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," +
+                            $" endpoints={endpoints}, requestId={requestId}, status message={status.Message}");
+                        AckMessageLater(messageView, attempt + 1, tcs);
+                        return;
+                    }
+
+                    tcs.SetResult(true);
+
+                    if (attempt > 1)
+                    {
+                        Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," +
+                                              $" consumerGroup={consumerGroup}, messageId={messageId}," +
+                                              $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+                                              $" requestId={requestId}");
+                    }
+                    else
+                    {
+                        Logger.LogDebug($"Successfully acked message, clientId={clientId}," +
+                                        $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
+                                        $" endpoints={endpoints}, requestId={requestId}");
+                    }
+                }
+            }, TaskContinuationOptions.ExecuteSynchronously);
+        }
+
+        private void AckMessageLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+        {
+            Task.Run(async () =>
+            {
+                try
+                {
+                    await Task.Delay(AckMessageFailureBackoffDelay, _ackMsgCts.Token);
+                    AckMessage(messageView, attempt + 1, tcs);
+                }
+                catch (Exception ex)
+                {
+                    if (_ackMsgCts.IsCancellationRequested)
+                    {
+                        return;
+                    }
+                    Logger.LogError(ex, $"[Bug] Failed to schedule message ack request, mq={_mq}," +
+                                        $" messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
+                    AckMessageLater(messageView, attempt + 1, tcs);
+                }
+            });
+        }
+
+        private Task NackMessage(MessageView messageView)
+        {
+            var deliveryAttempt = messageView.DeliveryAttempt;
+            var duration = _consumer.GetRetryPolicy().GetNextAttemptDelay(deliveryAttempt);
+            var tcs = new TaskCompletionSource<bool>();
+            ChangeInvisibleDuration(messageView, duration, 1, tcs);
+            return tcs.Task;
+        }
+
+        private void ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt,
+            TaskCompletionSource<bool> tcs)
+        {
+            var clientId = _consumer.GetClientId();
+            var consumerGroup = _consumer.GetConsumerGroup();
+            var messageId = messageView.MessageId;
+            var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+            var request = _consumer.WrapChangeInvisibleDuration(messageView, duration);
+            var task = _consumer.GetClientManager().ChangeInvisibleDuration(endpoints,
+                request, _consumer.GetClientConfig().RequestTimeout);
+            task.ContinueWith(responseTask =>
+            {
+                if (responseTask.IsFaulted)
+                {
+                    Logger.LogError(responseTask.Exception, $"Exception raised while changing invisible" +
+                                                            $" duration, would retry later, clientId={clientId}," +
+                                                            $" consumerGroup={consumerGroup}," +
+                                                            $" messageId={messageId}, mq={_mq}," +
+                                                            $" endpoints={endpoints}");
+                    ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+                }
+                else
+                {
+                    var invocation = responseTask.Result;
+                    var requestId = invocation.RequestId;
+                    var status = invocation.Response.Status;
+                    var statusCode = status.Code;
+
+                    if (statusCode == Code.InvalidReceiptHandle)
+                    {
+                        Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," +
+                                        $" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," +
+                                        $" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+                                        $" requestId={requestId}, status message={status.Message}");
+                        tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
+                    }
+
+                    if (statusCode != Code.Ok)
+                    {
+                        Logger.LogError($"Failed to change invisible duration, would retry later," +
+                                        $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
+                                        $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
+                                        $" status message={status.Message}");
+                        ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+                        return;
+                    }
+
+                    tcs.SetResult(true);
+
+                    if (attempt > 1)
+                    {
+                        Logger.LogInformation($"Finally, changed invisible duration successfully," +
+                                              $" clientId={clientId}, consumerGroup={consumerGroup}," +
+                                              $" messageId={messageId}, attempt={attempt}, mq={_mq}," +
+                                              $" endpoints={endpoints}, requestId={requestId}");
+                    }
+                    else
+                    {
+                        Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," +
+                                        $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
+                                        $" endpoints={endpoints}, requestId={requestId}");
+                    }
+                }
+            });
+        }
+
+        private void ChangeInvisibleDurationLater(MessageView messageView, TimeSpan duration, int attempt,
+            TaskCompletionSource<bool> tcs)
+        {
+            Task.Run(async () =>
+            {
+                try
+                {
+                    await Task.Delay(ChangeInvisibleDurationFailureBackoffDelay, _changeInvisibleDurationCts.Token);
+                    ChangeInvisibleDuration(messageView, duration, attempt, tcs);
+                }
+                catch (Exception ex)
+                {
+                    if (_changeInvisibleDurationCts.IsCancellationRequested)
+                    {
+                        return;
+                    }
+                    Logger.LogError(ex, $"[Bug] Failed to schedule message change invisible duration request," +
+                                        $" mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
+                    ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+                }
+            });
+        }
+
+        public Task EraseFifoMessage(MessageView messageView, ConsumeResult consumeResult)
+        {
+            var retryPolicy = _consumer.GetRetryPolicy();
+            var maxAttempts = retryPolicy.GetMaxAttempts();
+            var attempt = messageView.DeliveryAttempt;
+            var messageId = messageView.MessageId;
+            var service = _consumer.GetConsumeService();
+            var clientId = _consumer.GetClientId();
+
+            if (consumeResult == ConsumeResult.FAILURE && attempt < maxAttempts)
+            {
+                var nextAttemptDelay = retryPolicy.GetNextAttemptDelay(attempt);
+                attempt = messageView.IncrementAndGetDeliveryAttempt();
+                Logger.LogDebug($"Prepare to redeliver the fifo message because of the consumption failure," +
+                                $" maxAttempt={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
+                                $" messageId={messageId}, nextAttemptDelay={nextAttemptDelay}, clientId={clientId}");
+                var redeliverTask = service.Consume(messageView, nextAttemptDelay);
+                _ = redeliverTask.ContinueWith(async t =>
+                {
+                    var result = await t;
+                    await EraseFifoMessage(messageView, result);
+                }, TaskContinuationOptions.ExecuteSynchronously);
+            }
+            else
+            {
+                var success = consumeResult == ConsumeResult.SUCCESS;
+                if (!success)
+                {
+                    Logger.LogInformation($"Failed to consume fifo message finally, run out of attempt times," +
+                                          $" maxAttempts={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
+                                          $" messageId={messageId}, clientId={clientId}");
+                }
+
+                var task = ConsumeResult.SUCCESS.Equals(consumeResult)
+                    ? AckMessage(messageView)
+                    : ForwardToDeadLetterQueue(messageView);
+
+                _ = task.ContinueWith(_ => { EvictCache(messageView); },
+                    TaskContinuationOptions.ExecuteSynchronously);
+            }
+
+            return Task.CompletedTask;
+        }
+
+        private Task ForwardToDeadLetterQueue(MessageView messageView)
+        {
+            var tcs = new TaskCompletionSource<bool>();
+            ForwardToDeadLetterQueue(messageView, 1, tcs);
+            return tcs.Task;
+        }
+
+        private void ForwardToDeadLetterQueue(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+        {
+            var clientId = _consumer.GetClientId();
+            var consumerGroup = _consumer.GetConsumerGroup();
+            var messageId = messageView.MessageId;
+            var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+            var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView);
+            var task = _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request,
+                _consumer.GetClientConfig().RequestTimeout);
+
+            task.ContinueWith(responseTask =>
+            {
+                if (responseTask.IsFaulted)
+                {
+                    // Log failure and retry later.
+                    Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " +
+                                $"clientId={_consumer.GetClientId()}," +
+                                $" consumerGroup={_consumer.GetConsumerGroup()}," +
+                                $" messageId={messageView.MessageId}, mq={_mq}", responseTask.Exception);
+
+                    ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
+                }
+                else
+                {
+                    var invocation = responseTask.Result;
+                    var requestId = invocation.RequestId;
+                    var status = invocation.Response.Status;
+                    var statusCode = status.Code;
+
+                    // Log failure and retry later.
+                    if (statusCode != Code.Ok)
+                    {
+                        Logger.LogError($"Failed to forward message to dead letter queue," +
+                                        $" would attempt to re-forward later, clientId={clientId}," +
+                                        $" consumerGroup={consumerGroup}, messageId={messageId}," +
+                                        $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+                                        $" requestId={requestId}, code={statusCode}," +
+                                        $" status message={status.Message}");
+                        ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
+                        return;
+                    }
+
+                    tcs.SetResult(true);
+
+                    // Log success.
+                    if (attempt > 1)
+                    {
+                        Logger.LogInformation($"Re-forward message to dead letter queue successfully, " +
+                                              $"clientId={clientId}, consumerGroup={consumerGroup}," +
+                                              $" attempt={attempt}, messageId={messageId}, mq={_mq}," +
+                                              $" endpoints={endpoints}, requestId={requestId}");
+                    }
+                    else
+                    {
+                        Logger.LogInformation($"Forward message to dead letter queue successfully, " +
+                                              $"clientId={clientId}, consumerGroup={consumerGroup}," +
+                                              $" messageId={messageId}, mq={_mq}, endpoints={endpoints}," +
+                                              $" requestId={requestId}");
+                    }
+                }
+            });
+        }
+
+        private void ForwardToDeadLetterQueueLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+        {
+            Task.Run(async () =>
+            {
+                try
+                {
+                    await Task.Delay(ForwardMessageToDeadLetterQueueFailureBackoffDelay,
+                        _forwardMessageToDeadLetterQueueCts.Token);
+                    ForwardToDeadLetterQueue(messageView, attempt, tcs);
+                }
+                catch (Exception ex)
+                {
+                    // Should never reach here.
+                    Logger.LogError($"[Bug] Failed to schedule DLQ message request, " +
+                                    $"mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}", ex);
+
+                    ForwardToDeadLetterQueueLater(messageView, attempt + 1, tcs);
+                }
+            });
+        }
+
+        /// <summary>
+        /// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly.
+        /// </summary>
+        /// <param name="messageView">the message to discard.</param>
+        public void DiscardMessage(MessageView messageView)
+        {
+            Logger.LogInformation($"Discard message, mq={_mq}, messageId={messageView.MessageId}," +
+                                  $" clientId={_consumer.GetClientId()}");
+            var task = NackMessage(messageView);
+            _ = task.ContinueWith(_ =>
+            {
+                EvictCache(messageView);
+            }, TaskContinuationOptions.ExecuteSynchronously);
+        }
+
+        /// <summary>
+        /// Discard the message(FIFO-consume-mode) which could not consumed properly.
+        /// </summary>
+        /// <param name="messageView">the FIFO message to discard.</param>
+        public void DiscardFifoMessage(MessageView messageView)
+        {
+            Logger.LogInformation($"Discard fifo message, mq={_mq}, messageId={messageView.MessageId}," +
+                                  $" clientId={_consumer.GetClientId()}");
+            var task = ForwardToDeadLetterQueue(messageView);
+            _ = task.ContinueWith(_ =>
+            {
+                EvictCache(messageView);
+            }, TaskContinuationOptions.ExecuteSynchronously);
+        }
+
+        private void EvictCache(MessageView messageView)
+        {
+            _cachedMessageLock.EnterWriteLock();
+            try
+            {
+                if (_cachedMessages.Remove(messageView))
+                {
+                    Interlocked.Add(ref _cachedMessagesBytes, -messageView.Body.Length);
+                }
+            }
+            finally
+            {
+                _cachedMessageLock.ExitWriteLock();
+            }
+        }
+
+        public int CachedMessagesCount()
+        {
+            _cachedMessageLock.EnterReadLock();
+            try
+            {
+                return _cachedMessages.Count;
+            }
+            finally
+            {
+                _cachedMessageLock.ExitReadLock();
+            }
+        }
+
+        public long CachedMessageBytes()
+        {
+            return Interlocked.Read(ref _cachedMessagesBytes);
+        }
+
+        /// <summary>
+        /// Get the count of cached messages.
+        /// </summary>
+        /// <returns>count of pending messages.</returns>
+        public long GetCachedMessageCount()
+        {
+            _cachedMessageLock.EnterReadLock();
+            try
+            {
+                return _cachedMessages.Count;
+            }
+            finally
+            {
+                _cachedMessageLock.ExitReadLock();
+            }
+        }
+
+        /// <summary>
+        /// Get the bytes of cached message memory footprint.
+        /// </summary>
+        /// <returns>bytes of cached message memory footprint.</returns>
+        public long GetCachedMessageBytes()
+        {
+            return _cachedMessagesBytes;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 136bdad..24f1a0a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -21,28 +21,31 @@
 using System.Diagnostics;
 using System.Diagnostics.Metrics;
 using System.Linq;
+using System.Runtime.CompilerServices;
 using System.Threading.Tasks;
 using Microsoft.Extensions.Logging;
 using Proto = Apache.Rocketmq.V2;
 using Org.Apache.Rocketmq.Error;
 
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
 namespace Org.Apache.Rocketmq
 {
     public class Producer : Client, IAsyncDisposable, IDisposable
     {
         private static readonly ILogger Logger = MqLogManager.CreateLogger<Producer>();
-        private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
+        internal readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
         internal readonly PublishingSettings PublishingSettings;
         private readonly ConcurrentDictionary<string, bool> _publishingTopics;
         private readonly ITransactionChecker _checker;
 
         private readonly Histogram<double> _sendCostTimeHistogram;
 
-        private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
+        internal Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
             int maxAttempts, ITransactionChecker checker) : base(clientConfig)
         {
             var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
-            PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy,
+            PublishingSettings = new PublishingSettings(ClientConfig.Namespace, ClientId, Endpoints, retryPolicy,
                 clientConfig.RequestTimeout, publishingTopics);
             _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
             _publishingTopics = publishingTopics;
@@ -102,7 +105,7 @@
             }
         }
 
-        protected override Proto::HeartbeatRequest WrapHeartbeatRequest()
+        internal override Proto::HeartbeatRequest WrapHeartbeatRequest()
         {
             return new Proto::HeartbeatRequest
             {
@@ -110,7 +113,7 @@
             };
         }
 
-        protected override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+        internal override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
         {
             return new Proto::NotifyClientTerminationRequest();
         }
@@ -192,11 +195,11 @@
             return sendReceipt;
         }
 
-        private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
+        private Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
         {
             return new Proto.SendMessageRequest
             {
-                Messages = { message.ToProtobuf(mq.QueueId) }
+                Messages = { message.ToProtobuf(ClientConfig.Namespace, mq.QueueId) }
             };
         }
 
@@ -331,6 +334,7 @@
         {
             var topicResource = new Proto.Resource
             {
+                ResourceNamespace = ClientConfig.Namespace,
                 Name = topic
             };
             var request = new Proto.EndTransactionRequest
diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs b/csharp/rocketmq-client-csharp/PublishingMessage.cs
index 7937d93..d214598 100644
--- a/csharp/rocketmq-client-csharp/PublishingMessage.cs
+++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs
@@ -76,7 +76,7 @@
             MessageType = MessageType.Transaction;
         }
 
-        public Proto::Message ToProtobuf(int queueId)
+        public Proto::Message ToProtobuf(string namespaceName, int queueId)
         {
             var systemProperties = new Proto.SystemProperties
             {
@@ -105,6 +105,7 @@
 
             var topicResource = new Proto.Resource
             {
+                ResourceNamespace = namespaceName,
                 Name = Topic
             };
             return new Proto.Message
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index a5ab74d..cdd8418 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -31,8 +31,8 @@
         private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
         private volatile bool _validateMessageType = true;
 
-        public PublishingSettings(string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
-            TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer,
+        public PublishingSettings(string namespaceName, string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
+            TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(namespaceName, clientId, ClientType.Producer,
             endpoints, retryPolicy, requestTimeout)
         {
             Topics = topics;
@@ -66,7 +66,8 @@
 
         public override Proto.Settings ToProtobuf()
         {
-            var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList();
+            var topics = Topics.Select(topic =>
+                new Proto.Resource { ResourceNamespace = Namespace, Name = topic.Key }).ToList();
 
             var publishing = new Proto.Publishing();
             publishing.Topics.Add(topics);
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..c46523f
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Schedulers;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+using Microsoft.Extensions.Logging;
+
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
+namespace Org.Apache.Rocketmq
+{
+    public class PushConsumer : Consumer, IAsyncDisposable, IDisposable
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<PushConsumer>();
+
+        private static readonly TimeSpan AssignmentScanScheduleDelay = TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan AssignmentScanSchedulePeriod = TimeSpan.FromSeconds(5);
+
+        private readonly ClientConfig _clientConfig;
+        private readonly PushSubscriptionSettings _pushSubscriptionSettings;
+        private readonly string _consumerGroup;
+        private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+        private readonly ConcurrentDictionary<string, Assignments> _cacheAssignments;
+        private readonly IMessageListener _messageListener;
+        private readonly int _maxCacheMessageCount;
+        private readonly int _maxCacheMessageSizeInBytes;
+
+        private readonly ConcurrentDictionary<MessageQueue, ProcessQueue> _processQueueTable;
+        private ConsumeService _consumeService;
+        private readonly TaskScheduler _consumptionTaskScheduler;
+        private readonly CancellationTokenSource _consumptionCts;
+
+        private readonly CancellationTokenSource _scanAssignmentCts;
+
+        private readonly CancellationTokenSource _receiveMsgCts;
+        private readonly CancellationTokenSource _ackMsgCts;
+        private readonly CancellationTokenSource _changeInvisibleDurationCts;
+        private readonly CancellationTokenSource _forwardMsgToDeadLetterQueueCts;
+
+        /// <summary>
+        /// The caller is supposed to have validated the arguments and handled throwing exception or
+        /// logging warnings already, so we avoid repeating args check here.
+        /// </summary>
+        public PushConsumer(ClientConfig clientConfig, string consumerGroup,
+            ConcurrentDictionary<string, FilterExpression> subscriptionExpressions, IMessageListener messageListener,
+            int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount)
+            : base(clientConfig, consumerGroup)
+        {
+            _clientConfig = clientConfig;
+            _consumerGroup = consumerGroup;
+            _subscriptionExpressions = subscriptionExpressions;
+            _pushSubscriptionSettings = new PushSubscriptionSettings(_clientConfig.Namespace, ClientId, Endpoints, consumerGroup,
+                clientConfig.RequestTimeout, subscriptionExpressions);
+            _cacheAssignments = new ConcurrentDictionary<string, Assignments>();
+            _messageListener = messageListener;
+            _maxCacheMessageCount = maxCacheMessageCount;
+            _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+
+            _scanAssignmentCts = new CancellationTokenSource();
+
+            _processQueueTable = new ConcurrentDictionary<MessageQueue, ProcessQueue>();
+            _consumptionTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(consumptionThreadCount);
+            _consumptionCts = new CancellationTokenSource();
+
+            _receiveMsgCts = new CancellationTokenSource();
+            _ackMsgCts = new CancellationTokenSource();
+            _changeInvisibleDurationCts = new CancellationTokenSource();
+            _forwardMsgToDeadLetterQueueCts = new CancellationTokenSource();
+        }
+
+        protected override async Task Start()
+        {
+            try
+            {
+                State = State.Starting;
+                Logger.LogInformation($"Begin to start the rocketmq push consumer, clientId={ClientId}");
+                await base.Start();
+                _consumeService = CreateConsumerService();
+                ScheduleWithFixedDelay(ScanAssignments, AssignmentScanScheduleDelay, AssignmentScanSchedulePeriod,
+                    _scanAssignmentCts.Token);
+                Logger.LogInformation($"The rocketmq push consumer starts successfully, clientId={ClientId}");
+                State = State.Running;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            await Shutdown().ConfigureAwait(false);
+            GC.SuppressFinalize(this);
+        }
+
+        public void Dispose()
+        {
+            Shutdown().Wait();
+            GC.SuppressFinalize(this);
+        }
+
+        protected override async Task Shutdown()
+        {
+            try
+            {
+                State = State.Stopping;
+                Logger.LogInformation($"Begin to shutdown the rocketmq push consumer, clientId={ClientId}");
+                _receiveMsgCts.Cancel();
+                _ackMsgCts.Cancel();
+                _changeInvisibleDurationCts.Cancel();
+                _forwardMsgToDeadLetterQueueCts.Cancel();
+                _scanAssignmentCts.Cancel();
+                await base.Shutdown();
+                _consumptionCts.Cancel();
+                Logger.LogInformation($"Shutdown the rocketmq push consumer successfully, clientId={ClientId}");
+                State = State.Terminated;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
+        }
+
+        private ConsumeService CreateConsumerService()
+        {
+            if (_pushSubscriptionSettings.IsFifo())
+            {
+                Logger.LogInformation(
+                    $"Create FIFO consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
+                return new FifoConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
+            }
+            Logger.LogInformation(
+                $"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
+            return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
+        }
+
+        /// <summary>
+        /// Adds a subscription expression dynamically.
+        /// </summary>
+        /// <param name="filterExpression">The new filter expression to add.</param>
+        /// <returns>The push consumer instance.</returns>
+        public async Task Subscribe(string topic, FilterExpression filterExpression)
+        {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Push consumer is not running");
+            }
+
+            await GetRouteData(topic);
+            _subscriptionExpressions[topic] = filterExpression;
+        }
+
+        /// <summary>
+        /// Removes a subscription expression dynamically by topic.
+        /// </summary>
+        /// <remarks>
+        /// It stops the backend task to fetch messages from the server.
+        /// The locally cached messages whose topic was removed before would not be delivered 
+        /// to the <see cref="IMessageListener"/> anymore.
+        /// 
+        /// Nothing occurs if the specified topic does not exist in subscription expressions 
+        /// of the push consumer.
+        /// </remarks>
+        /// <param name="topic">The topic to remove the subscription.</param>
+        /// <returns>The push consumer instance.</returns>
+        public void Unsubscribe(string topic)
+        {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Push consumer is not running");
+            }
+
+            _subscriptionExpressions.TryRemove(topic, out _);
+        }
+
+        internal void ScanAssignments()
+        {
+            try
+            {
+                Logger.LogDebug($"Start to scan assignments periodically, clientId={ClientId}");
+                foreach (var (topic, filterExpression) in _subscriptionExpressions)
+                {
+                    var existed = _cacheAssignments.GetValueOrDefault(topic);
+
+                    var queryAssignmentTask = QueryAssignment(topic);
+                    queryAssignmentTask.ContinueWith(task =>
+                    {
+                        if (task.IsFaulted)
+                        {
+                            Logger.LogError(task.Exception, "Exception raised while scanning the assignments," +
+                                                            $" topic={topic}, clientId={ClientId}");
+                            return;
+                        }
+
+                        var latest = task.Result;
+                        if (latest.GetAssignmentList().Count == 0)
+                        {
+                            if (existed == null || existed.GetAssignmentList().Count == 0)
+                            {
+                                Logger.LogInformation("Acquired empty assignments from remote, would scan later," +
+                                                      $" topic={topic}, clientId={ClientId}");
+                                return;
+                            }
+
+                            Logger.LogInformation("Attention!!! acquired empty assignments from remote, but" +
+                                                  $" existed assignments are not empty, topic={topic}," +
+                                                  $" clientId={ClientId}");
+                        }
+
+                        if (!latest.Equals(existed))
+                        {
+                            Logger.LogInformation($"Assignments of topic={topic} has changed, {existed} =>" +
+                                                  $" {latest}, clientId={ClientId}");
+                            SyncProcessQueue(topic, latest, filterExpression);
+                            _cacheAssignments[topic] = latest;
+                            return;
+                        }
+
+                        Logger.LogDebug($"Assignments of topic={topic} remain the same," +
+                                        $" assignments={existed}, clientId={ClientId}");
+                        // Process queue may be dropped, need to be synchronized anyway.
+                        SyncProcessQueue(topic, latest, filterExpression);
+                    }, TaskContinuationOptions.ExecuteSynchronously);
+                }
+            }
+            catch (Exception ex)
+            {
+                Logger.LogError(ex, $"Exception raised while scanning the assignments for all topics, clientId={ClientId}");
+            }
+        }
+
+        private void SyncProcessQueue(string topic, Assignments assignments, FilterExpression filterExpression)
+        {
+            var latest = new HashSet<MessageQueue>();
+            var assignmentList = assignments.GetAssignmentList();
+            foreach (var assignment in assignmentList)
+            {
+                latest.Add(assignment.MessageQueue);
+            }
+
+            var activeMqs = new HashSet<MessageQueue>();
+            foreach (var (mq, pq) in _processQueueTable)
+            {
+                if (!topic.Equals(mq.Topic))
+                {
+                    continue;
+                }
+
+                if (!latest.Contains(mq))
+                {
+                    Logger.LogInformation($"Drop message queue according to the latest assignmentList," +
+                                          $" mq={mq}, clientId={ClientId}");
+                    DropProcessQueue(mq);
+                    continue;
+                }
+
+                if (pq.Expired())
+                {
+                    Logger.LogWarning($"Drop message queue because it is expired," +
+                                      $" mq={mq}, clientId={ClientId}");
+                    DropProcessQueue(mq);
+                    continue;
+                }
+                activeMqs.Add(mq);
+            }
+
+            foreach (var mq in latest)
+            {
+                if (activeMqs.Contains(mq))
+                {
+                    continue;
+                }
+                var processQueue = CreateProcessQueue(mq, filterExpression);
+                if (processQueue != null)
+                {
+                    Logger.LogInformation($"Start to fetch message from remote, mq={mq}, clientId={ClientId}");
+                    processQueue.FetchMessageImmediately();
+                }
+            }
+        }
+
+        internal Task<Assignments> QueryAssignment(string topic)
+        {
+            var pickEndpointsTask = PickEndpointsToQueryAssignments(topic);
+            return pickEndpointsTask.ContinueWith(task0 =>
+            {
+                if (task0 is { IsFaulted: true, Exception: { } })
+                {
+                    throw task0.Exception;
+                }
+
+                var endpoints = task0.Result;
+                var request = WrapQueryAssignmentRequest(topic);
+                var requestTimeout = _clientConfig.RequestTimeout;
+                var queryAssignmentTask = ClientManager.QueryAssignment(endpoints, request, requestTimeout);
+
+                return queryAssignmentTask.ContinueWith(task1 =>
+                {
+                    if (task1 is { IsFaulted: true, Exception: { } })
+                    {
+                        throw task1.Exception;
+                    }
+
+                    var response = task1.Result.Response;
+                    var status = response.Status;
+                    StatusChecker.Check(status, request, task1.Result.RequestId);
+                    var assignmentList = response.Assignments
+                        .Select(assignment => new Assignment(new MessageQueue(assignment.MessageQueue)))
+                        .ToList();
+                    return Task.FromResult(new Assignments(assignmentList));
+                }, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
+            }, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
+        }
+
+        private Task<Endpoints> PickEndpointsToQueryAssignments(string topic)
+        {
+            var getRouteDataTask = GetRouteData(topic);
+            return getRouteDataTask.ContinueWith(task =>
+            {
+                if (task is { IsFaulted: true, Exception: { } })
+                {
+                    throw task.Exception;
+                }
+
+                var topicRouteData = task.Result;
+                return topicRouteData.PickEndpointsToQueryAssignments();
+            }, TaskContinuationOptions.ExecuteSynchronously);
+        }
+
+        private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic)
+        {
+            var topicResource = new Proto.Resource
+            {
+                ResourceNamespace = _clientConfig.Namespace,
+                Name = topic
+            };
+            return new QueryAssignmentRequest
+            {
+                Topic = topicResource,
+                Group = GetProtobufGroup(),
+                Endpoints = Endpoints.ToProtobuf()
+            };
+        }
+
+        /// <summary>
+        /// Drops the <see cref="ProcessQueue"/> by <see cref="MessageQueue"/>.
+        /// <see cref="ProcessQueue"/> must be removed before it is dropped.
+        /// </summary>
+        /// <param name="mq">The message queue.</param>
+        internal void DropProcessQueue(MessageQueue mq)
+        {
+            if (_processQueueTable.TryRemove(mq, out var pq))
+            {
+                pq.Drop();
+            }
+        }
+
+        /// <summary>
+        /// Creates a process queue and adds it into the <see cref="_processQueueTable"/>.
+        /// Returns <see cref="ProcessQueue"/> if the mapped process queue already exists.
+        /// </summary>
+        /// <remarks>
+        /// This function and <see cref="DropProcessQueue"/> ensure that a process queue is not dropped if
+        /// it is contained in <see cref="_processQueueTable"/>. Once a process queue is dropped, it must have been
+        /// removed from <see cref="_processQueueTable"/>.
+        /// </remarks>
+        /// <param name="mq">The message queue.</param>
+        /// <param name="filterExpression">The filter expression of the topic.</param>
+        /// <returns>A process queue.</returns>
+        protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression)
+        {
+            var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts,
+                _changeInvisibleDurationCts, _forwardMsgToDeadLetterQueueCts);
+            if (_processQueueTable.TryGetValue(mq, out var previous))
+            {
+                return null;
+            }
+            _processQueueTable.TryAdd(mq, processQueue);
+            return processQueue;
+        }
+
+        public async Task AckMessage(MessageView messageView)
+        {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Push consumer is not running");
+            }
+
+            var request = WrapAckMessageRequest(messageView);
+            var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+                ClientConfig.RequestTimeout);
+            StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
+        }
+
+        protected override IEnumerable<string> GetTopics()
+        {
+            return _subscriptionExpressions.Keys;
+        }
+
+        internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
+        {
+            return new Proto::HeartbeatRequest
+            {
+                ClientType = Proto.ClientType.PushConsumer,
+                Group = GetProtobufGroup()
+            };
+        }
+
+        protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView,
+            TimeSpan invisibleDuration)
+        {
+            var topicResource = new Proto.Resource
+            {
+                ResourceNamespace = _clientConfig.Namespace,
+                Name = messageView.Topic
+            };
+            return new Proto.ChangeInvisibleDurationRequest
+            {
+                Topic = topicResource,
+                Group = GetProtobufGroup(),
+                ReceiptHandle = messageView.ReceiptHandle,
+                InvisibleDuration = Duration.FromTimeSpan(invisibleDuration),
+                MessageId = messageView.MessageId
+            };
+        }
+
+        protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageView)
+        {
+            var topicResource = new Proto.Resource
+            {
+                ResourceNamespace = _clientConfig.Namespace,
+                Name = messageView.Topic
+            };
+            var entry = new Proto.AckMessageEntry
+            {
+                MessageId = messageView.MessageId,
+                ReceiptHandle = messageView.ReceiptHandle,
+            };
+            return new Proto.AckMessageRequest
+            {
+                Group = GetProtobufGroup(),
+                Topic = topicResource,
+                Entries = { entry }
+            };
+        }
+
+        protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDeadLetterQueueRequest(MessageView messageView)
+        {
+            var topicResource = new Proto.Resource
+            {
+                ResourceNamespace = _clientConfig.Namespace,
+                Name = messageView.Topic
+            };
+
+            return new ForwardMessageToDeadLetterQueueRequest
+            {
+                Group = GetProtobufGroup(),
+                Topic = topicResource,
+                ReceiptHandle = messageView.ReceiptHandle,
+                MessageId = messageView.MessageId,
+                DeliveryAttempt = messageView.DeliveryAttempt,
+                MaxDeliveryAttempts = GetRetryPolicy().GetMaxAttempts()
+            };
+        }
+
+        protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+        {
+        }
+
+        internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command)
+        {
+            var nonce = command.Nonce;
+            var messageView = MessageView.FromProtobuf(command.Message);
+            var messageId = messageView.MessageId;
+            Proto.TelemetryCommand telemetryCommand = null;
+
+            try
+            {
+                var consumeResult = await _consumeService.Consume(messageView);
+                var code = consumeResult == ConsumeResult.SUCCESS ? Code.Ok : Code.FailedToConsumeMessage;
+                var status = new Status
+                {
+                    Code = code
+                };
+                var verifyMessageResult = new VerifyMessageResult
+                {
+                    Nonce = nonce
+                };
+                telemetryCommand = new TelemetryCommand
+                {
+                    VerifyMessageResult = verifyMessageResult,
+                    Status = status
+                };
+                var (_, session) = GetSession(endpoints);
+                await session.WriteAsync(telemetryCommand);
+            }
+            catch (Exception e)
+            {
+                Logger.LogError(e,
+                    $"Failed to send message verification result command, endpoints={Endpoints}, command={telemetryCommand}, messageId={messageId}, clientId={ClientId}");
+            }
+        }
+
+        internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+        {
+            return new NotifyClientTerminationRequest()
+            {
+                Group = GetProtobufGroup()
+            };
+        }
+
+        internal int GetQueueSize()
+        {
+            return _processQueueTable.Count;
+        }
+
+        internal int CacheMessageBytesThresholdPerQueue()
+        {
+            var size = this.GetQueueSize();
+            // All process queues are removed, no need to cache messages.
+            return size <= 0 ? 0 : Math.Max(1, _maxCacheMessageSizeInBytes / size);
+        }
+
+        internal int CacheMessageCountThresholdPerQueue()
+        {
+            var size = this.GetQueueSize();
+            // All process queues are removed, no need to cache messages.
+            if (size <= 0)
+            {
+                return 0;
+            }
+
+            return Math.Max(1, _maxCacheMessageCount / size);
+        }
+
+        internal override Settings GetSettings()
+        {
+            return _pushSubscriptionSettings;
+        }
+
+        /// <summary>
+        /// Gets the load balancing group for the consumer.
+        /// </summary>
+        /// <returns>The consumer load balancing group.</returns>
+        public string GetConsumerGroup()
+        {
+            return _consumerGroup;
+        }
+
+        public PushSubscriptionSettings GetPushConsumerSettings()
+        {
+            return _pushSubscriptionSettings;
+        }
+
+        /// <summary>
+        /// Lists the existing subscription expressions in the push consumer.
+        /// </summary>
+        /// <returns>Collections of the subscription expressions.</returns>
+        public ConcurrentDictionary<string, FilterExpression> GetSubscriptionExpressions()
+        {
+            return _subscriptionExpressions;
+        }
+
+        public IRetryPolicy GetRetryPolicy()
+        {
+            return _pushSubscriptionSettings.GetRetryPolicy();
+        }
+
+        public ConsumeService GetConsumeService()
+        {
+            return _consumeService;
+        }
+
+        private Proto.Resource GetProtobufGroup()
+        {
+            return new Proto.Resource()
+            {
+                ResourceNamespace = _clientConfig.Namespace,
+                Name = ConsumerGroup
+            };
+        }
+
+        public class Builder
+        {
+            private ClientConfig _clientConfig;
+            private string _consumerGroup;
+            private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+            private IMessageListener _messageListener;
+            private int _maxCacheMessageCount = 1024;
+            private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
+            private int _consumptionThreadCount = 20;
+
+            public Builder SetClientConfig(ClientConfig clientConfig)
+            {
+                Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null");
+                _clientConfig = clientConfig;
+                return this;
+            }
+
+            public Builder SetConsumerGroup(string consumerGroup)
+            {
+                Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null");
+                Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success,
+                    $"topic does not match the regex {ConsumerGroupRegex}");
+                _consumerGroup = consumerGroup;
+                return this;
+            }
+
+            public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions)
+            {
+                Preconditions.CheckArgument(null != subscriptionExpressions,
+                    "subscriptionExpressions should not be null");
+                Preconditions.CheckArgument(subscriptionExpressions!.Count != 0,
+                    "subscriptionExpressions should not be empty");
+                _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions!);
+                return this;
+            }
+
+            public Builder SetMessageListener(IMessageListener messageListener)
+            {
+                Preconditions.CheckArgument(null != messageListener,
+                    "messageListener should not be null");
+                _messageListener = messageListener;
+                return this;
+            }
+
+            public Builder SetMaxCacheMessageCount(int maxCacheMessageCount)
+            {
+                Preconditions.CheckArgument(maxCacheMessageCount > 0,
+                    "maxCacheMessageCount should be positive");
+                _maxCacheMessageCount = maxCacheMessageCount;
+                return this;
+            }
+
+            public Builder SetMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes)
+            {
+                Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0,
+                    "maxCacheMessageSizeInBytes should be positive");
+                _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+                return this;
+            }
+
+            public Builder SetConsumptionThreadCount(int consumptionThreadCount)
+            {
+                Preconditions.CheckArgument(consumptionThreadCount > 0,
+                    "consumptionThreadCount should be positive");
+                _consumptionThreadCount = consumptionThreadCount;
+                return this;
+            }
+
+            public async Task<PushConsumer> Build()
+            {
+                Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet");
+                Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet");
+                Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty,
+                    "subscriptionExpressions has not been set yet");
+                Preconditions.CheckArgument(null != _messageListener, "messageListener has not been set yet");
+                var pushConsumer = new PushConsumer(_clientConfig, _consumerGroup, _subscriptionExpressions,
+                    _messageListener, _maxCacheMessageCount,
+                    _maxCacheMessageSizeInBytes, _consumptionThreadCount);
+                await pushConsumer.Start();
+                return pushConsumer;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
new file mode 100644
index 0000000..b2ff519
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.Extensions.Logging;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class PushSubscriptionSettings : Settings
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<PushSubscriptionSettings>();
+
+        private readonly Resource _group;
+        private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+        private volatile bool _fifo = false;
+        private volatile int _receiveBatchSize = 32;
+        private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30);
+
+        public PushSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
+            TimeSpan requestTimeout, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions)
+            : base(namespaceName, clientId, ClientType.PushConsumer, endpoints, requestTimeout)
+        {
+            _group = new Resource(namespaceName, consumerGroup);
+            _subscriptionExpressions = subscriptionExpressions;
+        }
+
+        public bool IsFifo()
+        {
+            return _fifo;
+        }
+
+        public int GetReceiveBatchSize()
+        {
+            return _receiveBatchSize;
+        }
+
+        public TimeSpan GetLongPollingTimeout()
+        {
+            return _longPollingTimeout;
+        }
+
+        public override Proto.Settings ToProtobuf()
+        {
+            var subscriptionEntries = new List<Proto.SubscriptionEntry>();
+            foreach (var (key, value) in _subscriptionExpressions)
+            {
+                var topic = new Proto.Resource()
+                {
+                    ResourceNamespace = Namespace,
+                    Name = key
+                };
+                var filterExpression = new Proto.FilterExpression()
+                {
+                    Expression = value.Expression
+                };
+                switch (value.Type)
+                {
+                    case ExpressionType.Tag:
+                        filterExpression.Type = Proto.FilterType.Tag;
+                        break;
+                    case ExpressionType.Sql92:
+                        filterExpression.Type = Proto.FilterType.Sql;
+                        break;
+                    default:
+                        Logger.LogWarning($"[Bug] Unrecognized filter type={value.Type} for push consumer");
+                        break;
+                }
+
+                var subscriptionEntry = new Proto.SubscriptionEntry
+                {
+                    Topic = topic,
+                    Expression = filterExpression
+                };
+
+                subscriptionEntries.Add(subscriptionEntry);
+            }
+
+            var subscription = new Proto.Subscription
+            {
+                Group = _group.ToProtobuf(),
+                Subscriptions = { subscriptionEntries }
+            };
+
+            return new Proto.Settings
+            {
+                AccessPoint = Endpoints.ToProtobuf(),
+                ClientType = ClientTypeHelper.ToProtobuf(ClientType),
+                RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
+                Subscription = subscription,
+                UserAgent = UserAgent.Instance.ToProtobuf()
+            };
+        }
+
+        public override void Sync(Proto.Settings settings)
+        {
+            if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase)
+            {
+                Logger.LogError($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " +
+                                $"pubSubCase={settings.PubSubCase}, clientType={ClientType}");
+            }
+
+            var subscription = settings.Subscription ?? new Proto.Subscription();
+            _fifo = subscription.Fifo;
+            _receiveBatchSize = subscription.ReceiveBatchSize;
+            _longPollingTimeout = subscription.LongPollingTimeout?.ToTimeSpan() ?? TimeSpan.Zero;
+            var backoffPolicy = settings.BackoffPolicy ?? new Proto.RetryPolicy();
+            switch (backoffPolicy.StrategyCase)
+            {
+                case Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff:
+                    RetryPolicy = ExponentialBackoffRetryPolicy.FromProtobuf(backoffPolicy);
+                    break;
+                case Proto.RetryPolicy.StrategyOneofCase.CustomizedBackoff:
+                    RetryPolicy = CustomizedBackoffRetryPolicy.FromProtobuf(backoffPolicy);
+                    break;
+                default:
+                    throw new ArgumentException("Unrecognized backoff policy strategy.");
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs
index a0f27df..e2847e7 100644
--- a/csharp/rocketmq-client-csharp/Resource.cs
+++ b/csharp/rocketmq-client-csharp/Resource.cs
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
+using System;
 using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
-    public class Resource
+    public class Resource : IEquatable<Resource>
     {
+        public Resource(string namespaceName, string name)
+        {
+            Namespace = namespaceName;
+            Name = name;
+        }
+
         public Resource(Proto.Resource resource)
         {
             Namespace = resource.ResourceNamespace;
@@ -33,7 +40,7 @@
             Name = name;
         }
 
-        private string Namespace { get; }
+        public string Namespace { get; }
         public string Name { get; }
 
         public Proto.Resource ToProtobuf()
@@ -45,9 +52,46 @@
             };
         }
 
+        public bool Equals(Resource other)
+        {
+            if (ReferenceEquals(null, other))
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, other))
+            {
+                return true;
+            }
+
+            return Name == other.Name && Namespace == other.Namespace;
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj))
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, obj))
+            {
+                return true;
+            }
+
+            return obj.GetType() == GetType() && Equals((Resource)obj);
+        }
+
+        public override int GetHashCode()
+        {
+            return HashCode.Combine(Namespace, Name);
+        }
+
         public override string ToString()
         {
             return string.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}";
         }
+
+
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs
index 0ee95fb..d504a7a 100644
--- a/csharp/rocketmq-client-csharp/Settings.cs
+++ b/csharp/rocketmq-client-csharp/Settings.cs
@@ -22,15 +22,17 @@
 {
     public abstract class Settings
     {
+        protected readonly string Namespace;
         protected readonly string ClientId;
         protected readonly ClientType ClientType;
         protected readonly Endpoints Endpoints;
         protected volatile IRetryPolicy RetryPolicy;
         protected readonly TimeSpan RequestTimeout;
 
-        protected Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
+        protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
             TimeSpan requestTimeout)
         {
+            Namespace = namespaceName;
             ClientId = clientId;
             ClientType = clientType;
             Endpoints = endpoints;
@@ -38,8 +40,9 @@
             RequestTimeout = requestTimeout;
         }
 
-        protected Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
+        protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
         {
+            Namespace = namespaceName;
             ClientId = clientId;
             ClientType = clientType;
             Endpoints = endpoints;
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 949ac49..dfd1cc0 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -42,6 +42,7 @@
             dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
             dictionary.Add(MetadataConstants.RequestIdKey, Guid.NewGuid().ToString());
             dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
+            dictionary.Add(MetadataConstants.NamespaceKey, client.GetClientConfig().Namespace);
 
             var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
             dictionary.Add(MetadataConstants.DateTimeKey, time);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d16a8c5..1ede707 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -36,6 +36,8 @@
         private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings;
         private int _topicRoundRobinIndex;
 
+        private readonly ClientConfig _clientConfig;
+
         public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
             Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup,
             awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions))
@@ -48,9 +50,10 @@
             _awaitDuration = awaitDuration;
             _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>();
             _subscriptionExpressions = subscriptionExpressions;
-            _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, Endpoints,
+            _simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfig.Namespace, ClientId, Endpoints,
                 ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions);
             _topicRoundRobinIndex = 0;
+            _clientConfig = clientConfig;
         }
 
         public async Task Subscribe(string topic, FilterExpression filterExpression)
@@ -125,7 +128,7 @@
             return _subscriptionExpressions.Keys;
         }
 
-        protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+        internal override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
         {
             return new Proto.NotifyClientTerminationRequest()
             {
@@ -133,7 +136,7 @@
             };
         }
 
-        protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
+        internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
         {
             return new Proto::HeartbeatRequest
             {
@@ -209,7 +212,7 @@
             return receiveMessageResult.Messages;
         }
 
-        public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
+        public async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
         {
             if (State.Running != State)
             {
@@ -240,6 +243,7 @@
         {
             var topicResource = new Proto.Resource
             {
+                ResourceNamespace = _clientConfig.Namespace,
                 Name = messageView.Topic
             };
             var entry = new Proto.AckMessageEntry
@@ -260,6 +264,7 @@
         {
             var topicResource = new Proto.Resource
             {
+                ResourceNamespace = _clientConfig.Namespace,
                 Name = messageView.Topic
             };
             return new Proto.ChangeInvisibleDurationRequest
@@ -276,6 +281,7 @@
         {
             return new Proto.Resource()
             {
+                ResourceNamespace = _clientConfig.Namespace,
                 Name = ConsumerGroup
             };
         }
diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
index 2b214fa..c5a0dfa 100644
--- a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
+++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
@@ -32,12 +32,12 @@
         private readonly TimeSpan _longPollingTimeout;
         private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
 
-        public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
+        public SimpleSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
             TimeSpan requestTimeout, TimeSpan longPollingTimeout,
             ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(
-            clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
+            namespaceName, clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
         {
-            _group = new Resource(consumerGroup);
+            _group = new Resource(namespaceName, consumerGroup);
             _longPollingTimeout = longPollingTimeout;
             _subscriptionExpressions = subscriptionExpressions;
         }
@@ -58,6 +58,7 @@
             {
                 var topic = new Proto.Resource()
                 {
+                    ResourceNamespace = Namespace,
                     Name = key,
                 };
                 var subscriptionEntry = new Proto.SubscriptionEntry();
@@ -77,6 +78,7 @@
 
                 filterExpression.Expression = value.Expression;
                 subscriptionEntry.Topic = topic;
+                subscriptionEntry.Expression = filterExpression;
                 subscriptionEntries.Add(subscriptionEntry);
             }
 
diff --git a/csharp/rocketmq-client-csharp/StandardConsumeService.cs b/csharp/rocketmq-client-csharp/StandardConsumeService.cs
new file mode 100644
index 0000000..da753e0
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/StandardConsumeService.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+    public class StandardConsumeService : ConsumeService
+    {
+        private static readonly ILogger Logger = MqLogManager.CreateLogger<StandardConsumeService>();
+
+        public StandardConsumeService(string clientId, IMessageListener messageListener,
+            TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken) :
+            base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken)
+        {
+        }
+
+        public override void Consume(ProcessQueue pq, List<MessageView> messageViews)
+        {
+            foreach (var messageView in messageViews)
+            {
+                if (messageView.IsCorrupted())
+                {
+                    Logger.LogError("Message is corrupted for standard consumption, prepare to discard it," +
+                                    $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
+                    pq.DiscardMessage(messageView);
+                    continue;
+                }
+
+                var consumeTask = Consume(messageView);
+
+                consumeTask.ContinueWith(task =>
+                {
+                    if (task.IsFaulted)
+                    {
+                        // Should never reach here.
+                        Logger.LogError(task.Exception,
+                            $"[Bug] Exception raised in consumption callback, clientId={ClientId}");
+                    }
+                    else
+                    {
+                        pq.EraseMessage(messageView, task.Result);
+                    }
+                }, TaskContinuationOptions.ExecuteSynchronously);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 885db5f..950aa09 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -18,12 +18,16 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
+using Org.Apache.Rocketmq.Error;
 using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
     public class TopicRouteData : IEquatable<TopicRouteData>
     {
+        private int _index = 0;
+
         public TopicRouteData(IEnumerable<Proto.MessageQueue> messageQueues)
         {
             var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList();
@@ -33,6 +37,36 @@
 
         public List<MessageQueue> MessageQueues { get; }
 
+        public Endpoints PickEndpointsToQueryAssignments()
+        {
+            var nextIndex = Interlocked.Increment(ref _index) - 1;
+            foreach (var mq in MessageQueues)
+            {
+                var modIndex = Mod(nextIndex++, MessageQueues.Count);
+                var curMessageQueue = MessageQueues[modIndex];
+
+                if (Utilities.MasterBrokerId != curMessageQueue.Broker.Id)
+                {
+                    continue;
+                }
+                if (Permission.None.Equals(curMessageQueue.Permission))
+                {
+                    continue;
+                }
+                return curMessageQueue.Broker.Endpoints;
+            }
+            throw new NotFoundException("Failed to pick endpoints to query assignment");
+        }
+
+        private int Mod(int x, int m)
+        {
+            if (m <= 0)
+            {
+                throw new ArgumentException("Modulus must be positive", nameof(m));
+            }
+            var result = x % m;
+            return result >= 0 ? result : result + m;
+        }
 
         public bool Equals(TopicRouteData other)
         {
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
index 71d5b74..8a4df4a 100644
--- a/csharp/rocketmq-client-csharp/Transaction.cs
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -19,6 +19,7 @@
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
@@ -44,7 +45,7 @@
             _messagesLock.EnterReadLock();
             try
             {
-                if (_messages.Count > MaxMessageNum)
+                if (_messages.Count >= MaxMessageNum)
                 {
                     throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
                 }
@@ -57,7 +58,7 @@
             _messagesLock.EnterWriteLock();
             try
             {
-                if (_messages.Count > MaxMessageNum)
+                if (_messages.Count >= MaxMessageNum)
                 {
                     throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
                 }
@@ -90,7 +91,7 @@
             }
         }
 
-        public async void Commit()
+        public async Task Commit()
         {
             if (State.Running != _producer.State)
             {
@@ -109,7 +110,7 @@
             }
         }
 
-        public async void Rollback()
+        public async Task Rollback()
         {
             if (State.Running != _producer.State)
             {
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 5fe87fc..597bf9f 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -34,9 +34,10 @@
         <PackageReference Include="OpenTelemetry" Version="1.3.1" />

         <PackageReference Include="OpenTelemetry.Api" Version="1.3.1" />

         <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.1" />

+        <PackageReference Include="ParallelExtensionsExtras" Version="1.2.0" />

 

         <Protobuf Include="..\..\protos\apache\rocketmq\v2\definition.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />

-        <Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />

+        <Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Both" />

         <None Include="logo.png" Pack="true" PackagePath="" />

     </ItemGroup>

 </Project>

diff --git a/csharp/tests/AttemptIdIntegrationTest.cs b/csharp/tests/AttemptIdIntegrationTest.cs
new file mode 100644
index 0000000..99bfbbd
--- /dev/null
+++ b/csharp/tests/AttemptIdIntegrationTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using FilterExpression = Org.Apache.Rocketmq.FilterExpression;
+
+namespace tests
+{
+    [TestClass]
+    public class AttemptIdIntegrationTest : GrpcServerIntegrationTest
+    {
+        private const string Topic = "topic";
+        private const string Broker = "broker";
+
+        private Server _server;
+        private readonly List<string> _attemptIdList = new ConcurrentBag<string>().ToList();
+
+        [TestInitialize]
+        public void SetUp()
+        {
+            var mockServer = new MockServer(Topic, Broker, _attemptIdList);
+            _server = SetUpServer(mockServer);
+            mockServer.Port = Port;
+        }
+
+        [TestCleanup]
+        public void TearDown()
+        {
+            _server.ShutdownAsync();
+        }
+
+        [TestMethod]
+        public async Task Test()
+        {
+            var endpoint = "127.0.0.1" + ":" + Port;
+            var credentialsProvider = new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey");
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoint)
+                .SetCredentialsProvider(credentialsProvider)
+                .EnableSsl(false)
+                .SetRequestTimeout(TimeSpan.FromMilliseconds(1000))
+                .Build();
+
+            const string consumerGroup = "yourConsumerGroup";
+            const string topic = "yourTopic";
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression("*") } };
+
+            var pushConsumer = await new PushConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetSubscriptionExpression(subscription)
+                .SetMessageListener(new CustomMessageListener())
+                .Build();
+
+            await Task.Run(async () =>
+            {
+                await WaitForConditionAsync(() =>
+                {
+                    Assert.IsTrue(_attemptIdList.Count >= 3);
+                    Assert.AreEqual(_attemptIdList[0], _attemptIdList[1]);
+                    Assert.AreNotEqual(_attemptIdList[0], _attemptIdList[2]);
+                }, TimeSpan.FromSeconds(5));
+            });
+        }
+
+        private async Task WaitForConditionAsync(Action assertCondition, TimeSpan timeout)
+        {
+            var startTime = DateTime.UtcNow;
+            while (DateTime.UtcNow - startTime < timeout)
+            {
+                try
+                {
+                    assertCondition();
+                    return; // Condition met, exit the method
+                }
+                catch
+                {
+                    // Condition not met, ignore exception and try again after a delay
+                }
+
+                await Task.Delay(100); // Small delay to avoid tight loop
+            }
+
+            // Perform last check to throw the exception
+            assertCondition();
+        }
+
+        private class CustomMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                return ConsumeResult.SUCCESS;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs
new file mode 100644
index 0000000..5e4e7ee
--- /dev/null
+++ b/csharp/tests/ClientManagerTest.cs
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using Apache.Rocketmq.V2;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+
+namespace tests
+{
+    [TestClass]
+    public class ClientManagerTest
+    {
+        private static readonly Endpoints FakeEndpoints = new Endpoints("127.0.0.1:8080");
+        private static IClientManager _clientManager;
+
+        private readonly ClientConfig _clientConfig = new ClientConfig.Builder()
+            .SetEndpoints("127.0.0.1:8080")
+            .Build();
+
+        [TestInitialize]
+        public void Initialize()
+        {
+            _clientManager = new ClientManager(CreateTestClient());
+        }
+
+        [TestMethod]
+        public void TestHeartbeat()
+        {
+            var request = new HeartbeatRequest();
+            _clientManager.Heartbeat(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.Heartbeat(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestSendMessage()
+        {
+            var request = new SendMessageRequest();
+            _clientManager.SendMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.SendMessage(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestQueryAssignment()
+        {
+            var request = new QueryAssignmentRequest();
+            _clientManager.QueryAssignment(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.QueryAssignment(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestReceiveMessage()
+        {
+            var request = new ReceiveMessageRequest();
+            _clientManager.ReceiveMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.ReceiveMessage(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestAckMessage()
+        {
+            var request = new AckMessageRequest();
+            _clientManager.AckMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.AckMessage(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestChangeInvisibleDuration()
+        {
+            var request = new ChangeInvisibleDurationRequest();
+            _clientManager.ChangeInvisibleDuration(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.ChangeInvisibleDuration(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestForwardMessageToDeadLetterQueue()
+        {
+            var request = new ForwardMessageToDeadLetterQueueRequest();
+            _clientManager.ForwardMessageToDeadLetterQueue(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.ForwardMessageToDeadLetterQueue(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestEndTransaction()
+        {
+            var request = new EndTransactionRequest();
+            _clientManager.EndTransaction(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.EndTransaction(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        [TestMethod]
+        public void TestNotifyClientTermination()
+        {
+            var request = new NotifyClientTerminationRequest();
+            _clientManager.NotifyClientTermination(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+            _clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
+        private Client CreateTestClient()
+        {
+            return new Producer(_clientConfig, new ConcurrentDictionary<string, bool>(), 1, null);
+        }
+    }
+}
diff --git a/csharp/tests/ClientMeterManagerTest.cs b/csharp/tests/ClientMeterManagerTest.cs
new file mode 100644
index 0000000..9713e43
--- /dev/null
+++ b/csharp/tests/ClientMeterManagerTest.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Metric = Org.Apache.Rocketmq.Metric;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class ClientMeterManagerTest
+    {
+        [TestMethod]
+        public void TestResetWithMetricOn()
+        {
+            var meterManager = CreateClientMeterManager();
+            var metric = CreateMetric(true);
+            meterManager.Reset(metric);
+            Assert.IsTrue(meterManager.IsEnabled());
+        }
+
+        [TestMethod]
+        public void TestResetWithMetricOff()
+        {
+            var meterManager = CreateClientMeterManager();
+            var metric = CreateMetric(false);
+            meterManager.Reset(metric);
+            Assert.IsFalse(meterManager.IsEnabled());
+        }
+
+        private ClientMeterManager CreateClientMeterManager()
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1:8080")
+                .Build();
+
+            return new ClientMeterManager(CreateTestClient(clientConfig));
+        }
+
+        private Client CreateTestClient(ClientConfig clientConfig)
+        {
+            return new PushConsumer(clientConfig, "testGroup",
+                new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+                0, 0, 1);
+        }
+
+        private Metric CreateMetric(bool isOn)
+        {
+            var endpoints = new Proto.Endpoints
+            {
+                Scheme = Proto.AddressScheme.Ipv4,
+                Addresses =
+                {
+                    new Proto.Address { Host = "127.0.0.1", Port = 8080 }
+                }
+            };
+
+            return new Metric(new Proto.Metric { On = isOn, Endpoints = endpoints });
+        }
+
+        private class TestMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                return ConsumeResult.SUCCESS;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientMeterTest.cs b/csharp/tests/ClientMeterTest.cs
new file mode 100644
index 0000000..aa3aecc
--- /dev/null
+++ b/csharp/tests/ClientMeterTest.cs
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
+using Org.Apache.Rocketmq;
+using Metric = Org.Apache.Rocketmq.Metric;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class ClientMeterTest
+    {
+        private MeterProvider CreateMeterProvider()
+        {
+            return Sdk.CreateMeterProviderBuilder()
+                      .SetResourceBuilder(ResourceBuilder.CreateEmpty())
+                      .Build();
+        }
+
+        [TestMethod]
+        public void TestShutdownWithEnabledMeter()
+        {
+            var endpoints = new Endpoints(new Proto.Endpoints
+            {
+                Scheme = Proto.AddressScheme.Ipv4,
+                Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+            });
+            var provider = CreateMeterProvider();
+            var clientId = "testClientId";
+            var clientMeter = new ClientMeter(endpoints, provider, clientId);
+            Assert.IsTrue(clientMeter.Enabled);
+            clientMeter.Shutdown();
+        }
+
+        [TestMethod]
+        public void TestShutdownWithDisabledMeter()
+        {
+            var clientId = "testClientId";
+            var clientMeter = ClientMeter.DisabledInstance(clientId);
+            Assert.IsFalse(clientMeter.Enabled);
+            clientMeter.Shutdown();
+        }
+
+        [TestMethod]
+        public void TestSatisfy()
+        {
+            var clientId = "testClientId";
+            var clientMeter = ClientMeter.DisabledInstance(clientId);
+
+            var metric = new Metric(new Proto.Metric { On = false });
+            Assert.IsTrue(clientMeter.Satisfy(metric));
+
+            metric = new Metric(new Proto.Metric { On = true });
+            Assert.IsTrue(clientMeter.Satisfy(metric));
+
+            var endpoints0 = new Proto.Endpoints
+            {
+                Scheme = Proto.AddressScheme.Ipv4,
+                Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+            };
+
+            metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 });
+            Assert.IsTrue(clientMeter.Satisfy(metric));
+
+            metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 });
+            Assert.IsFalse(clientMeter.Satisfy(metric));
+
+            var endpoints = new Endpoints(endpoints0);
+            var provider = CreateMeterProvider();
+            clientMeter = new ClientMeter(endpoints, provider, clientId);
+
+            metric = new Metric(new Proto.Metric { On = false });
+            Assert.IsFalse(clientMeter.Satisfy(metric));
+
+            metric = new Metric(new Proto.Metric { On = true });
+            Assert.IsFalse(clientMeter.Satisfy(metric));
+
+            metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 });
+            Assert.IsFalse(clientMeter.Satisfy(metric));
+
+            metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 });
+            Assert.IsTrue(clientMeter.Satisfy(metric));
+
+            var endpoints1 = new Proto.Endpoints
+            {
+                Scheme = Proto.AddressScheme.Ipv4,
+                Addresses = { new Proto.Address { Host = "127.0.0.2", Port = 8081 } }
+            };
+            metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints1 });
+            Assert.IsFalse(clientMeter.Satisfy(metric));
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientTest.cs b/csharp/tests/ClientTest.cs
new file mode 100644
index 0000000..8fdfa86
--- /dev/null
+++ b/csharp/tests/ClientTest.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class ClientTest
+    {
+        [TestMethod]
+        public async Task TestOnVerifyMessageCommand()
+        {
+            var testClient = CreateTestClient();
+            var endpoints = new Endpoints("testEndpoints");
+            var command = new VerifyMessageCommand { Nonce = "testNonce" };
+
+            var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+                new MockClientStreamWriter<TelemetryCommand>(),
+                new MockAsyncStreamReader<TelemetryCommand>(),
+                null,
+                null,
+                null,
+                null);
+            var mockClientManager = new Mock<IClientManager>();
+            mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+            testClient.SetClientManager(mockClientManager.Object);
+
+            testClient.OnVerifyMessageCommand(endpoints, command);
+
+            mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
+        }
+
+        [TestMethod]
+        public async Task TestOnTopicRouteDataFetchedFailure()
+        {
+            var testClient = CreateTestClient();
+            var endpoints = new Endpoints("testEndpoints");
+            var mq = new Proto.MessageQueue
+            {
+                Topic = new Proto::Resource
+                {
+                    ResourceNamespace = "testNamespace",
+                    Name = "testTopic"
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Broker = new Proto::Broker
+                {
+                    Name = "testBroker",
+                    Id = 0,
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+                    }
+                }
+            };
+            var topicRouteData = new TopicRouteData(new[] { mq });
+
+            var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+                new MockClientStreamWriter<TelemetryCommand>(),
+                new MockAsyncStreamReader<TelemetryCommand>(),
+                null,
+                null,
+                null,
+                null);
+            var mockClientManager = new Mock<IClientManager>();
+            mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+            testClient.SetClientManager(mockClientManager.Object);
+
+            try
+            {
+                await testClient.OnTopicRouteDataFetched("testTopic", topicRouteData);
+                Assert.Fail();
+            }
+            catch (Exception e)
+            {
+                mockClientManager.Verify(cm => cm.Telemetry(It.IsAny<Endpoints>()), Times.Once);
+            }
+        }
+
+        [TestMethod]
+        public async Task TestOnPrintThreadStackTraceCommand()
+        {
+            var testClient = CreateTestClient();
+            var endpoints = new Endpoints("testEndpoints");
+            var command = new PrintThreadStackTraceCommand { Nonce = "testNonce" };
+            var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+                new MockClientStreamWriter<TelemetryCommand>(),
+                new MockAsyncStreamReader<TelemetryCommand>(),
+                null,
+                null,
+                null,
+                null);
+
+            var mockClientManager = new Mock<IClientManager>();
+            mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+            testClient.SetClientManager(mockClientManager.Object);
+
+            // Act
+            testClient.OnPrintThreadStackTraceCommand(endpoints, command);
+
+            // Assert
+            mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
+        }
+
+        private Client CreateTestClient()
+        {
+            return new Producer(new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(),
+                new ConcurrentDictionary<string, bool>(), 1, null);
+        }
+
+        private class MockClientStreamWriter<T> : IClientStreamWriter<T>
+        {
+            public Task WriteAsync(T message)
+            {
+                // Simulate async operation
+                return Task.CompletedTask;
+            }
+
+            public WriteOptions WriteOptions { get; set; }
+
+            public Task CompleteAsync()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private class MockAsyncStreamReader<T> : IAsyncStreamReader<T>
+        {
+            public Task<bool> MoveNext(CancellationToken cancellationToken)
+            {
+                throw new System.NotImplementedException();
+            }
+
+            public T Current => throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ConsumeServiceTest.cs b/csharp/tests/ConsumeServiceTest.cs
new file mode 100644
index 0000000..8ef757e
--- /dev/null
+++ b/csharp/tests/ConsumeServiceTest.cs
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Schedulers;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class ConsumeServiceTest
+    {
+        private Proto.Digest _digest;
+        private Proto.SystemProperties _systemProperties;
+        private ByteString _body;
+        private Proto.Message _message;
+        private MessageView _messageView;
+
+        [TestInitialize]
+        public void SetUp()
+        {
+            _digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+            _systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = "127.0.0.1:8080",
+                BodyDigest = _digest,
+                BornTimestamp = new Timestamp()
+            };
+            _body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            _message = new Proto.Message
+            {
+                SystemProperties = _systemProperties,
+                Topic = new Proto.Resource { Name = "testTopic" },
+                Body = _body
+            };
+            _messageView = MessageView.FromProtobuf(_message);
+        }
+
+        [TestMethod]
+        public void TestConsumeSuccess()
+        {
+            var messageListener = new TestSuccessMessageListener();
+            var consumeService = CreateService(messageListener);
+            Assert.AreEqual(ConsumeResult.SUCCESS, consumeService.Consume(_messageView).Result);
+        }
+
+        [TestMethod]
+        public void TestConsumeFailure()
+        {
+            var messageListener = new TestFailureMessageListener();
+            var consumeService = CreateService(messageListener);
+            Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(_messageView).Result);
+        }
+
+        [TestMethod]
+        public void TestConsumeWithException()
+        {
+            var messageListener = new TestExceptionMessageListener();
+            var consumeService = CreateService(messageListener);
+            Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(_messageView).Result);
+        }
+
+        [TestMethod]
+        public void TestConsumeWithDelay()
+        {
+            var messageListener = new TestSuccessMessageListener();
+            var consumeService = CreateService(messageListener);
+            Assert.AreEqual(ConsumeResult.SUCCESS,
+                consumeService.Consume(_messageView, TimeSpan.FromMilliseconds(500)).Result);
+        }
+
+        private TestConsumeService CreateService(IMessageListener messageListener)
+        {
+            return new TestConsumeService("testClientId", messageListener,
+                new CurrentThreadTaskScheduler(), new CancellationToken());
+        }
+
+        private class TestSuccessMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView) => ConsumeResult.SUCCESS;
+        }
+
+        private class TestFailureMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView) => ConsumeResult.FAILURE;
+        }
+
+        private class TestExceptionMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView) => throw new Exception();
+        }
+
+        private class TestConsumeService : ConsumeService
+        {
+            public TestConsumeService(string clientId, IMessageListener messageListener,
+                TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken)
+                : base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken) { }
+
+            public override void Consume(ProcessQueue pq, List<MessageView> messageViews) => Task.FromResult(0);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/ConsumerTest.cs b/csharp/tests/ConsumerTest.cs
new file mode 100644
index 0000000..82ef09c
--- /dev/null
+++ b/csharp/tests/ConsumerTest.cs
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
+namespace tests
+{
+    [TestClass]
+    public class ConsumerTest
+    {
+        [TestMethod]
+        public async Task TestReceiveMessage()
+        {
+            var maxCacheMessageCount = 8;
+            var maxCacheMessageSizeInBytes = 1024;
+            var consumptionThreadCount = 4;
+
+            var consumer =
+                CreateTestClient(maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+            var mockClientManager = new Mock<IClientManager>();
+            consumer.SetClientManager(mockClientManager.Object);
+
+            var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = "127.0.0.1",
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource
+                {
+                    ResourceNamespace = "testNamespace",
+                    Name = "testTopic"
+                },
+                Body = body
+            };
+            var receiveMessageResponse0 = new Proto.ReceiveMessageResponse
+            {
+                Status = new Proto.Status
+                {
+                    Code = Proto.Code.Ok
+                }
+            };
+            var receiveMessageResponse1 = new Proto.ReceiveMessageResponse
+            {
+                Message = message
+            };
+            var metadata = consumer.Sign();
+            var receiveMessageResponseList = new List<Proto.ReceiveMessageResponse>
+                { receiveMessageResponse0, receiveMessageResponse1 };
+            var receiveMessageInvocation =
+                new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(null,
+                    receiveMessageResponseList, metadata);
+            mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.ReceiveMessageRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(receiveMessageInvocation));
+
+            var receivedMessageCount = 1;
+            var mq = new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = "127.0.0.1",
+                                Port = 8080
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    ResourceNamespace = "testNamespace",
+                    Name = "testTopic",
+                },
+                AcceptMessageTypes = { Proto.MessageType.Normal }
+            };
+            var request = consumer.WrapReceiveMessageRequest(1, new MessageQueue(mq), new FilterExpression("*"),
+                TimeSpan.FromSeconds(15), Guid.NewGuid().ToString());
+            var receiveMessageResult = await consumer.ReceiveMessage(request, new MessageQueue(mq),
+                TimeSpan.FromSeconds(15));
+            Assert.AreEqual(receiveMessageResult.Messages.Count, receivedMessageCount);
+        }
+
+        private PushConsumer CreateTestClient(int maxCacheMessageCount, int maxCacheMessageSizeInBytes,
+            int consumptionThreadCount)
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1:9876")
+                .Build();
+            return new PushConsumer(clientConfig, "testGroup",
+                new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+                maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        }
+
+        private class TestMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                return ConsumeResult.SUCCESS;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/CustomizedBackoffRetryPolicyTest.cs b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs
new file mode 100644
index 0000000..dcbd4e3
--- /dev/null
+++ b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class CustomizedBackoffRetryPolicyTest
+    {
+        [TestMethod]
+        public void TestConstructWithValidDurationsAndMaxAttempts()
+        {
+            var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) };
+            var maxAttempts = 3;
+            var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+
+            Assert.AreEqual(maxAttempts, policy.GetMaxAttempts());
+            CollectionAssert.AreEqual(durations, policy.GetDurations());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestConstructWithEmptyDurations()
+        {
+            new CustomizedBackoffRetryPolicy(new List<TimeSpan>(), 3);
+        }
+
+        [TestMethod]
+        public void TestGetNextAttemptDelayWithValidAttempts()
+        {
+            var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5) };
+            var policy = new CustomizedBackoffRetryPolicy(durations, 5);
+
+            Assert.AreEqual(TimeSpan.FromSeconds(1), policy.GetNextAttemptDelay(1));
+            Assert.AreEqual(TimeSpan.FromSeconds(3), policy.GetNextAttemptDelay(2));
+            Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(3));
+            Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(4)); // Should inherit the last duration
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestGetNextAttemptDelayWithInvalidAttempt()
+        {
+            var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }, 3);
+            policy.GetNextAttemptDelay(0);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestGetNextAttemptDelayWithNegativeAttempt()
+        {
+            var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }, 3);
+            policy.GetNextAttemptDelay(-1);
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithValidRetryPolicy()
+        {
+            var protoDurations = new List<Duration>
+            {
+                Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+                Duration.FromTimeSpan(TimeSpan.FromSeconds(2))
+            };
+            var protoRetryPolicy = new RetryPolicy
+            {
+                MaxAttempts = 3,
+                CustomizedBackoff = new CustomizedBackoff { Next = { protoDurations } },
+            };
+            var policy = CustomizedBackoffRetryPolicy.FromProtobuf(protoRetryPolicy);
+
+            Assert.AreEqual(3, policy.GetMaxAttempts());
+            Assert.AreEqual(protoDurations.Count, policy.GetDurations().Count);
+            CollectionAssert.AreEqual(protoDurations.Select(d => d.ToTimeSpan()).ToList(), policy.GetDurations());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestFromProtobufWithInvalidRetryPolicy()
+        {
+            var retryPolicy = new RetryPolicy
+            {
+                MaxAttempts = 3,
+                ExponentialBackoff = new ExponentialBackoff
+                {
+                    Initial = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+                    Max = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+                    Multiplier = 1.0f
+                }
+            };
+            CustomizedBackoffRetryPolicy.FromProtobuf(retryPolicy);
+        }
+
+        [TestMethod]
+        public void ToProtobuf_ShouldReturnCorrectProtobuf()
+        {
+            var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) };
+            var maxAttempts = 3;
+            var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+            var proto = policy.ToProtobuf();
+
+            Assert.AreEqual(maxAttempts, proto.MaxAttempts);
+            CollectionAssert.AreEqual(durations, proto.CustomizedBackoff.Next.Select(d => d.ToTimeSpan()).ToList());
+        }
+
+        [TestMethod]
+        public void TestInheritBackoffWithValidCustomizedBackoffPolicy()
+        {
+            var originalDurations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3) };
+            var newDurations = new List<Duration>
+            {
+                Duration.FromTimeSpan(TimeSpan.FromSeconds(2)),
+                Duration.FromTimeSpan(TimeSpan.FromSeconds(4))
+            };
+            var backoff = new CustomizedBackoff { Next = { newDurations } };
+            var retryPolicy = new RetryPolicy
+            {
+                MaxAttempts = 5,
+                CustomizedBackoff = backoff,
+            };
+            var policy = new CustomizedBackoffRetryPolicy(originalDurations, 5);
+            var inheritedPolicy = policy.InheritBackoff(retryPolicy);
+            Assert.IsTrue(inheritedPolicy is CustomizedBackoffRetryPolicy);
+            var customizedBackoffRetryPolicy = (CustomizedBackoffRetryPolicy)inheritedPolicy;
+            Assert.AreEqual(policy.GetMaxAttempts(), inheritedPolicy.GetMaxAttempts());
+            var inheritedDurations = customizedBackoffRetryPolicy.GetDurations();
+            Assert.AreEqual(newDurations.Count, inheritedDurations.Count);
+            for (var i = 0; i < newDurations.Count; i++)
+            {
+                Assert.AreEqual(newDurations[i].ToTimeSpan(), inheritedDurations[i]);
+            }
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public void TestInheritBackoffWithInvalidPolicy()
+        {
+            var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan>
+            {
+                TimeSpan.FromSeconds(3),
+                TimeSpan.FromSeconds(2),
+                TimeSpan.FromSeconds(1)
+            }, 3);
+            var retryPolicy = new RetryPolicy
+            {
+                ExponentialBackoff = new ExponentialBackoff()
+            };
+            policy.InheritBackoff(retryPolicy);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/EncodingTest.cs b/csharp/tests/EncodingTest.cs
new file mode 100644
index 0000000..198fec9
--- /dev/null
+++ b/csharp/tests/EncodingTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class EncodingTest
+    {
+        [TestMethod]
+        public void TestToProtobuf()
+        {
+            Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Identity), Apache.Rocketmq.V2.Encoding.Identity);
+            Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Gzip), Apache.Rocketmq.V2.Encoding.Gzip);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ExponentialBackoffRetryPolicyTest.cs b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs
new file mode 100644
index 0000000..70b3254
--- /dev/null
+++ b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class ExponentialBackoffRetryPolicyTest
+    {
+        private TimeSpan initialBackoff = TimeSpan.FromMilliseconds(5);
+        private TimeSpan maxBackoff = TimeSpan.FromSeconds(1);
+        private float backoffMultiplier = 5;
+        private int maxAttempts = 3;
+
+        [TestMethod]
+        public void TestNextAttemptDelayForImmediatelyRetryPolicy()
+        {
+            var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3);
+            for (int i = 1; i <= 4; i++)
+            {
+                Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(i));
+            }
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestGetNextAttemptDelayWithIllegalAttempt()
+        {
+            var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+            retryPolicy.GetNextAttemptDelay(0);
+        }
+
+        [TestMethod]
+        public void TestGetNextAttemptDelay()
+        {
+            var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+            Assert.AreEqual(TimeSpan.FromMilliseconds(5), retryPolicy.GetNextAttemptDelay(1));
+            Assert.AreEqual(TimeSpan.FromMilliseconds(25), retryPolicy.GetNextAttemptDelay(2));
+            Assert.AreEqual(TimeSpan.FromMilliseconds(125), retryPolicy.GetNextAttemptDelay(3));
+            Assert.AreEqual(TimeSpan.FromMilliseconds(625), retryPolicy.GetNextAttemptDelay(4));
+            Assert.AreEqual(TimeSpan.FromSeconds(1), retryPolicy.GetNextAttemptDelay(5));
+        }
+
+        [TestMethod]
+        public void TestFromProtobuf()
+        {
+            var initialBackoffProto = Duration.FromTimeSpan(initialBackoff);
+            var maxBackoffProto = Duration.FromTimeSpan(maxBackoff);
+
+            var exponentialBackoff = new ExponentialBackoff
+            {
+                Initial = initialBackoffProto,
+                Max = maxBackoffProto,
+                Multiplier = backoffMultiplier
+            };
+            var retryPolicyProto = new RetryPolicy
+            {
+                MaxAttempts = maxAttempts,
+                ExponentialBackoff = exponentialBackoff
+            };
+
+            var policy = ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto);
+
+            Assert.AreEqual(maxAttempts, policy.GetMaxAttempts());
+            Assert.AreEqual(initialBackoff, policy.InitialBackoff);
+            Assert.AreEqual(maxBackoff, policy.MaxBackoff);
+            Assert.AreEqual(backoffMultiplier, policy.BackoffMultiplier);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestFromProtobufWithoutExponentialBackoff()
+        {
+            var customizedBackoff = new CustomizedBackoff();
+            var retryPolicyProto = new RetryPolicy
+            {
+                MaxAttempts = maxAttempts,
+                CustomizedBackoff = customizedBackoff
+            };
+            ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto);
+        }
+
+        [TestMethod]
+        public void TestToProtobuf()
+        {
+            var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+            var retryPolicyProto = retryPolicy.ToProtobuf();
+
+            Assert.IsNotNull(retryPolicyProto.ExponentialBackoff);
+
+            var exponentialBackoff = retryPolicyProto.ExponentialBackoff;
+            var initialBackoffProto = Duration.FromTimeSpan(initialBackoff);
+            var maxBackoffProto = Duration.FromTimeSpan(maxBackoff);
+
+            Assert.AreEqual(exponentialBackoff.Initial, initialBackoffProto);
+            Assert.AreEqual(exponentialBackoff.Max, maxBackoffProto);
+            Assert.AreEqual(exponentialBackoff.Multiplier, backoffMultiplier);
+            Assert.AreEqual(retryPolicyProto.MaxAttempts, maxAttempts);
+        }
+
+        [TestMethod]
+        public void TestInheritBackoff()
+        {
+            var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+
+            var initialBackoffProto = TimeSpan.FromMilliseconds(10);
+            var maxBackoffProto = TimeSpan.FromSeconds(3);
+            double backoffMultiplierProto = 10;
+
+            var exponentialBackoff = new ExponentialBackoff
+            {
+                Initial = Duration.FromTimeSpan(initialBackoffProto),
+                Max = Duration.FromTimeSpan(maxBackoffProto),
+                Multiplier = (float)backoffMultiplierProto
+            };
+            var retryPolicyProto = new RetryPolicy
+            {
+                ExponentialBackoff = exponentialBackoff
+            };
+
+            var inheritedRetryPolicy = retryPolicy.InheritBackoff(retryPolicyProto);
+
+            Assert.IsInstanceOfType(inheritedRetryPolicy, typeof(ExponentialBackoffRetryPolicy));
+
+            var exponentialBackoffRetryPolicy = (ExponentialBackoffRetryPolicy)inheritedRetryPolicy;
+
+            Assert.AreEqual(initialBackoffProto, exponentialBackoffRetryPolicy.InitialBackoff);
+            Assert.AreEqual(maxBackoffProto, exponentialBackoffRetryPolicy.MaxBackoff);
+            Assert.AreEqual(backoffMultiplierProto, exponentialBackoffRetryPolicy.BackoffMultiplier);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public void TestInheritBackoffWithoutExponentialBackoff()
+        {
+            var customizedBackoff = new CustomizedBackoff();
+            var retryPolicyProto = new RetryPolicy
+            {
+                MaxAttempts = maxAttempts,
+                CustomizedBackoff = customizedBackoff
+            };
+
+            var exponentialBackoffRetryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+            exponentialBackoffRetryPolicy.InheritBackoff(retryPolicyProto);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/GrpcServerIntegrationTest.cs b/csharp/tests/GrpcServerIntegrationTest.cs
new file mode 100644
index 0000000..f7f242f
--- /dev/null
+++ b/csharp/tests/GrpcServerIntegrationTest.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+
+namespace tests
+{
+    public abstract class GrpcServerIntegrationTest
+    {
+        protected int Port;
+
+        protected Server SetUpServer(MessagingService.MessagingServiceBase mockServer)
+        {
+            var server = new Server
+            {
+                Ports = { new ServerPort("127.0.0.1", Port, ServerCredentials.Insecure) },
+                Services = { MessagingService.BindService(mockServer) }
+            };
+            server.Start();
+            Port = server.Ports.First().BoundPort;
+            return server;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/MessageViewTest.cs b/csharp/tests/MessageViewTest.cs
new file mode 100644
index 0000000..1387b48
--- /dev/null
+++ b/csharp/tests/MessageViewTest.cs
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class MessageViewTests
+    {
+        private const string FakeHost = "127.0.0.1";
+        private const string FakeTopic = "test-topic";
+
+        [TestMethod]
+        public void TestFromProtobufWithCrc32()
+        {
+            var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.AreEqual(FakeHost, messageView.BornHost);
+            Assert.IsFalse(messageView.IsCorrupted());
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithWrongCrc32()
+        {
+            var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.IsTrue(messageView.IsCorrupted());
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithMd5()
+        {
+            var digest = new Proto.Digest
+            { Type = Proto.DigestType.Md5, Checksum = "3858F62230AC3C915F300C664312C63F" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.IsFalse(messageView.IsCorrupted());
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithWrongMd5()
+        {
+            var digest = new Proto.Digest
+            { Type = Proto.DigestType.Md5, Checksum = "00000000000000000000000000000000" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.IsTrue(messageView.IsCorrupted());
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithSha1()
+        {
+            var digest = new Proto.Digest
+            { Type = Proto.DigestType.Sha1, Checksum = "8843D7F92416211DE9EBB963FF4CE28125932878" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.IsFalse(messageView.IsCorrupted());
+        }
+
+        [TestMethod]
+        public void TestFromProtobufWithWrongSha1()
+        {
+            var digest = new Proto.Digest
+            { Type = Proto.DigestType.Sha1, Checksum = "0000000000000000000000000000000000000000" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = FakeHost,
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = FakeTopic },
+                Body = body
+            };
+
+            var messageView = MessageView.FromProtobuf(message);
+
+            CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+            Assert.AreEqual(FakeTopic, messageView.Topic);
+            Assert.IsTrue(messageView.IsCorrupted());
+        }
+    }
+}
diff --git a/csharp/tests/MockServer.cs b/csharp/tests/MockServer.cs
new file mode 100644
index 0000000..6f65842
--- /dev/null
+++ b/csharp/tests/MockServer.cs
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    public class MockServer : Proto.MessagingService.MessagingServiceBase
+    {
+        private readonly List<string> _attemptIdList;
+        private int _serverDeadlineFlag = 1;
+
+        private readonly Proto.Status _mockStatus = new Proto.Status
+        {
+            Code = Proto.Code.Ok,
+            Message = "mock test"
+        };
+
+        private readonly string _topic;
+        private readonly string _broker;
+
+        public MockServer(string topic, string broker, List<string> attemptIdList)
+        {
+            _topic = topic;
+            _broker = broker;
+            _attemptIdList = attemptIdList;
+        }
+
+        public int Port { get; set; }
+
+        public override Task<Proto.QueryRouteResponse> QueryRoute(Proto.QueryRouteRequest request,
+            ServerCallContext context)
+        {
+            var response = new Proto.QueryRouteResponse
+            {
+                Status = _mockStatus,
+                MessageQueues =
+                {
+                    new Proto.MessageQueue
+                    {
+                        Topic = new Proto.Resource { Name = _topic },
+                        Id = 0,
+                        Permission = Proto.Permission.ReadWrite,
+                        Broker = new Proto.Broker
+                        {
+                            Name = _broker,
+                            Id = 0,
+                            Endpoints = new Proto.Endpoints
+                            {
+                                Addresses =
+                                {
+                                    new Proto.Address { Host = "127.0.0.1", Port = Port }
+                                }
+                            }
+                        },
+                        AcceptMessageTypes = { Proto.MessageType.Normal }
+                    }
+                }
+            };
+            return Task.FromResult(response);
+        }
+
+        public override Task<Proto.HeartbeatResponse> Heartbeat(Proto.HeartbeatRequest request,
+            ServerCallContext context)
+        {
+            var response = new Proto.HeartbeatResponse { Status = _mockStatus };
+            return Task.FromResult(response);
+        }
+
+        public override Task<Proto.QueryAssignmentResponse> QueryAssignment(Proto.QueryAssignmentRequest request,
+            ServerCallContext context)
+        {
+            var response = new Proto.QueryAssignmentResponse
+            {
+                Status = _mockStatus,
+                Assignments =
+                {
+                    new Proto.Assignment
+                    {
+                        MessageQueue = new Proto.MessageQueue
+                        {
+                            Topic = new Proto.Resource { Name = _topic },
+                            Id = 0,
+                            Permission = Proto.Permission.ReadWrite,
+                            Broker = new Proto.Broker
+                            {
+                                Name = _broker,
+                                Id = 0,
+                                Endpoints = new Proto.Endpoints
+                                {
+                                    Addresses =
+                                    {
+                                        new Proto.Address { Host = "127.0.0.1", Port = Port }
+                                    }
+                                }
+                            },
+                            AcceptMessageTypes = { Proto.MessageType.Normal }
+                        }
+                    }
+                }
+            };
+            return Task.FromResult(response);
+        }
+
+        public override async Task ReceiveMessage(Proto.ReceiveMessageRequest request,
+            IServerStreamWriter<Proto.ReceiveMessageResponse> responseStream, ServerCallContext context)
+        {
+            if (_attemptIdList.Count >= 3)
+            {
+                await Task.Delay(100);
+            }
+
+            _attemptIdList.Add(request.AttemptId);
+
+            if (CompareAndSetServerDeadlineFlag(true, false))
+            {
+                // timeout
+                await Task.Delay(TimeSpan.FromSeconds(3));
+            }
+            else
+            {
+                var response = new Proto.ReceiveMessageResponse { Status = _mockStatus };
+                await responseStream.WriteAsync(response);
+            }
+        }
+
+        public override async Task Telemetry(IAsyncStreamReader<Proto.TelemetryCommand> requestStream,
+            IServerStreamWriter<Proto.TelemetryCommand> responseStream, ServerCallContext context)
+        {
+            await foreach (var command in requestStream.ReadAllAsync())
+            {
+                var response = command.Clone();
+                response.Status = _mockStatus;
+                response.Settings = new Proto.Settings
+                {
+                    BackoffPolicy = new Proto.RetryPolicy
+                    {
+                        MaxAttempts = 16,
+                        ExponentialBackoff = new Proto.ExponentialBackoff
+                        {
+                            Initial = new Duration { Seconds = 1 },
+                            Max = new Duration { Seconds = 10 },
+                            Multiplier = 1.5f
+                        }
+                    }
+                };
+
+                await responseStream.WriteAsync(response);
+            }
+        }
+
+        private bool CompareAndSetServerDeadlineFlag(bool expectedValue, bool newValue)
+        {
+            var expected = expectedValue ? 1 : 0;
+            var newVal = newValue ? 1 : 0;
+            return Interlocked.CompareExchange(ref _serverDeadlineFlag, newVal, expected) == expected;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProcessQueueTest.cs b/csharp/tests/ProcessQueueTest.cs
new file mode 100644
index 0000000..d18188d
--- /dev/null
+++ b/csharp/tests/ProcessQueueTest.cs
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class ProcessQueueTest
+    {
+        private PushConsumer CreateAndSetupPushConsumer()
+        {
+            var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, 4);
+            pushConsumer.State = State.Running;
+            return pushConsumer;
+        }
+
+        private Mock<IClientManager> SetupMockClientManager(PushConsumer pushConsumer)
+        {
+            var mockClientManager = new Mock<IClientManager>();
+            pushConsumer.SetClientManager(mockClientManager.Object);
+            return mockClientManager;
+        }
+
+        private static Proto.MessageQueue CreateMessageQueue()
+        {
+            return new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = "127.0.0.1",
+                                Port = 8080
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    ResourceNamespace = "testNamespace",
+                    Name = "testTopic",
+                },
+                AcceptMessageTypes = { Proto.MessageType.Normal }
+            };
+        }
+
+        [TestMethod]
+        public void TestExpired()
+        {
+            var pushConsumer = CreateAndSetupPushConsumer();
+            var processQueue = CreateProcessQueue(pushConsumer);
+            Assert.IsFalse(processQueue.Expired());
+        }
+
+        [TestMethod]
+        public async Task TestReceiveMessageImmediately()
+        {
+            var pushConsumer = CreateAndSetupPushConsumer();
+            var processQueue = CreateProcessQueue(pushConsumer);
+            var mockClientManager = SetupMockClientManager(pushConsumer);
+
+            var message = CreateMessage();
+            var receiveMessageResponses = new List<Proto.ReceiveMessageResponse>
+            {
+                new Proto.ReceiveMessageResponse { Status = new Proto.Status { Code = Proto.Code.Ok } },
+                new Proto.ReceiveMessageResponse { Message = message }
+            };
+
+            MockReceiveMessage(mockClientManager, pushConsumer, receiveMessageResponses);
+
+            await Task.Delay(3000);
+            processQueue.FetchMessageImmediately();
+
+            Assert.AreEqual(processQueue.GetCachedMessageCount(), 1);
+        }
+
+        [TestMethod]
+        public async Task TestEraseMessageWithConsumeOk()
+        {
+            var pushConsumer = CreateAndSetupPushConsumer();
+            var messageView = CreateMessageView();
+            var processQueue = CreateProcessQueue(pushConsumer);
+            var mockClientManager = SetupMockClientManager(pushConsumer);
+
+            MockAckMessage(mockClientManager, pushConsumer, Proto.Code.Ok);
+
+            processQueue.CacheMessages(new List<MessageView> { messageView });
+
+            processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS);
+
+            mockClientManager.Verify(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
+        }
+
+        [TestMethod]
+        public async Task TestEraseMessageWithAckFailure()
+        {
+            var pushConsumer = CreateAndSetupPushConsumer();
+            var messageView = CreateMessageView();
+            var processQueue = CreateProcessQueue(pushConsumer);
+            var mockClientManager = SetupMockClientManager(pushConsumer);
+
+            MockAckMessage(mockClientManager, pushConsumer, Proto.Code.InternalServerError);
+
+            processQueue.CacheMessages(new List<MessageView> { messageView });
+
+            var ackTimes = 3;
+
+            processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS);
+            await Task.Delay(ProcessQueue.AckMessageFailureBackoffDelay * ackTimes);
+
+            mockClientManager.Verify(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()), Times.AtLeast(ackTimes));
+        }
+
+        private void MockReceiveMessage(Mock<IClientManager> mockClientManager, PushConsumer pushConsumer, List<Proto.ReceiveMessageResponse> responses)
+        {
+            var metadata = pushConsumer.Sign();
+            var invocation = new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(null, responses, metadata);
+
+            mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.ReceiveMessageRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(invocation));
+        }
+
+        private void MockAckMessage(Mock<IClientManager> mockClientManager, PushConsumer pushConsumer, Proto.Code responseCode)
+        {
+            var metadata = pushConsumer.Sign();
+            var response = new Proto.AckMessageResponse { Status = new Proto.Status { Code = responseCode } };
+
+            var invocation = new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(null, response, metadata);
+
+            mockClientManager.Setup(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(invocation));
+        }
+
+        private MessageView CreateMessageView()
+        {
+            return MessageView.FromProtobuf(CreateMessage(), new MessageQueue(CreateMessageQueue()));
+        }
+
+        private static ProcessQueue CreateProcessQueue(PushConsumer pushConsumer)
+        {
+            var processQueue = new ProcessQueue(pushConsumer, new MessageQueue(CreateMessageQueue()),
+                pushConsumer.GetSubscriptionExpressions()["testTopic"], new CancellationTokenSource(),
+                new CancellationTokenSource(), new CancellationTokenSource(),
+                new CancellationTokenSource());
+            return processQueue;
+        }
+
+        private Proto.Message CreateMessage()
+        {
+            var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = "127.0.0.1",
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = "testTopic" },
+                Body = body
+            };
+            return message;
+        }
+
+        private PushConsumer CreatePushConsumer(string topic, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount)
+        {
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:8080").Build();
+            var subscription = new Dictionary<string, FilterExpression> { { topic, new FilterExpression("*") } };
+            return new PushConsumer(clientConfig, "testGroup",
+                new ConcurrentDictionary<string, FilterExpression>(subscription), new TestMessageListener(),
+                maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        }
+
+        private class TestMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView) => ConsumeResult.SUCCESS;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProducerBuilderTest.cs b/csharp/tests/ProducerBuilderTest.cs
new file mode 100644
index 0000000..c318aaf
--- /dev/null
+++ b/csharp/tests/ProducerBuilderTest.cs
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class ProducerBuilderTest
+    {
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetClientConfigurationWithNull()
+        {
+            var builder = new Producer.Builder();
+            builder.SetClientConfig(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(NullReferenceException))]
+        public void TestSetTopicWithNull()
+        {
+            var builder = new Producer.Builder();
+            builder.SetTopics(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetIllegalTopic()
+        {
+            var builder = new Producer.Builder();
+            builder.SetTopics("\t");
+        }
+
+        [TestMethod]
+        public void TestSetTopic()
+        {
+            var builder = new Producer.Builder();
+            builder.SetTopics("abc");
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetNegativeMaxAttempts()
+        {
+            var builder = new Producer.Builder();
+            builder.SetMaxAttempts(-1);
+        }
+
+        [TestMethod]
+        public void TestSetMaxAttempts()
+        {
+            var builder = new Producer.Builder();
+            builder.SetMaxAttempts(3);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetTransactionCheckerWithNull()
+        {
+            var builder = new Producer.Builder();
+            builder.SetTransactionChecker(null);
+        }
+
+        [TestMethod]
+        public void TestSetTransactionChecker()
+        {
+            var builder = new Producer.Builder();
+            builder.SetTransactionChecker(new TestTransactionChecker());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public async Task TestBuildWithoutClientConfiguration()
+        {
+            var builder = new Producer.Builder();
+            await builder.Build();
+        }
+
+        [TestMethod]
+        public void TestBuild()
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1:9876").Build();
+            var builder = new Producer.Builder();
+            builder.SetClientConfig(clientConfig).Build();
+        }
+
+        private class TestTransactionChecker : ITransactionChecker
+        {
+            public TransactionResolution Check(MessageView messageView)
+            {
+                return TransactionResolution.Commit;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
new file mode 100644
index 0000000..ce0cca1
--- /dev/null
+++ b/csharp/tests/ProducerTest.cs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+using System.Collections.Concurrent;
+using System.Text;
+using System.Threading.Tasks;
+using Moq;
+
+namespace tests
+{
+    [TestClass]
+    public class ProducerTest
+    {
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestSendBeforeStartup()
+        {
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+            var publishingTopics = new ConcurrentDictionary<string, bool>();
+            publishingTopics.TryAdd("testTopic", true);
+            var producer = new Producer(clientConfig, publishingTopics, 1, null);
+            var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+            await producer.Send(message);
+        }
+
+        [TestMethod]
+        public async Task TestSendWithTopic()
+        {
+            var producer = CreateTestClient();
+            producer.State = State.Running;
+            var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+            var metadata = producer.Sign();
+            var sendResultEntry = new Proto.SendResultEntry
+            {
+                MessageId = "fakeMsgId",
+                Status = new Proto.Status
+                {
+                    Code = Proto.Code.Ok
+                },
+                Offset = 1
+            };
+            var sendMessageResponse = new Proto.SendMessageResponse
+            {
+                Status = new Proto.Status
+                {
+                    Code = Proto.Code.Ok
+                },
+                Entries = { sendResultEntry }
+            };
+            var sendMessageInvocation = new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(null,
+                sendMessageResponse, metadata);
+            var mockClientManager = new Mock<IClientManager>();
+            producer.SetClientManager(mockClientManager.Object);
+            mockClientManager.Setup(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(sendMessageInvocation));
+            await producer.Send(message);
+            mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public async Task TestSendFailureWithTopic()
+        {
+            var producer = CreateTestClient();
+            producer.State = State.Running;
+            var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+            var mockClientManager = new Mock<IClientManager>();
+            producer.SetClientManager(mockClientManager.Object);
+            var exception = new ArgumentException();
+            mockClientManager.Setup(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>())).Throws(exception);
+            await producer.Send(message);
+            var maxAttempts = producer.PublishingSettings.GetRetryPolicy().GetMaxAttempts();
+            mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Exactly(maxAttempts));
+        }
+
+        private Producer CreateTestClient()
+        {
+            const string host0 = "127.0.0.1";
+            var mqs = new List<Proto.MessageQueue>();
+            var mq0 = new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = host0,
+                                Port = 80
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    ResourceNamespace = "foo-bar-namespace",
+                    Name = "testTopic",
+                },
+                AcceptMessageTypes = { Proto.MessageType.Normal }
+            };
+            mqs.Add(mq0);
+            var topicRouteData = new TopicRouteData(mqs);
+            var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+            var producer = new Producer(clientConfig, new ConcurrentDictionary<string, bool>(),
+                1, null);
+            producer._publishingRouteDataCache.TryAdd("testTopic", publishingLoadBalancer);
+            return producer;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/PublishingLoadBalancerTest.cs b/csharp/tests/PublishingLoadBalancerTest.cs
index b86a095..6b4a74d 100644
--- a/csharp/tests/PublishingLoadBalancerTest.cs
+++ b/csharp/tests/PublishingLoadBalancerTest.cs
@@ -53,6 +53,7 @@
                 Permission = Proto.Permission.ReadWrite,
                 Topic = new Proto.Resource
                 {
+                    ResourceNamespace = "foo-bar-namespace",
                     Name = "TestTopic",
                 }
             };
@@ -78,6 +79,7 @@
                 Permission = Proto.Permission.ReadWrite,
                 Topic = new Proto.Resource
                 {
+                    ResourceNamespace = "foo-bar-namespace",
                     Name = "TestTopic",
                 }
             };
diff --git a/csharp/tests/PublishingMessageTest.cs b/csharp/tests/PublishingMessageTest.cs
index c12b908..02936a6 100644
--- a/csharp/tests/PublishingMessageTest.cs
+++ b/csharp/tests/PublishingMessageTest.cs
@@ -28,6 +28,7 @@
     {
         private const string ClientId = "fakeClientId";
         private static readonly Endpoints Endpoints = new Endpoints("127.0.0.1:8081");
+        private const string Namespace = "fakeNamespace";
 
 
         [TestMethod]
@@ -39,7 +40,7 @@
             {
                 [topic] = true
             };
-            var settings = new PublishingSettings(ClientId, Endpoints,
+            var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
                 ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3), TimeSpan.FromSeconds(3), topics);
             var publishingMessage = new PublishingMessage(message, settings, false);
             Assert.AreEqual(publishingMessage.MessageType, MessageType.Normal);
@@ -57,7 +58,7 @@
             {
                 [topic] = true
             };
-            var settings = new PublishingSettings(ClientId, Endpoints,
+            var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
                 ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3), TimeSpan.FromSeconds(3), topics);
             var publishingMessage = new PublishingMessage(message, settings, false);
             Assert.AreEqual(publishingMessage.MessageType, MessageType.Fifo);
@@ -75,7 +76,7 @@
             {
                 [topic] = true
             };
-            var settings = new PublishingSettings(ClientId, Endpoints,
+            var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
                 ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3),
                 TimeSpan.FromSeconds(3), topics);
             var publishingMessage = new PublishingMessage(message, settings, false);
@@ -93,7 +94,7 @@
             {
                 [topic] = true
             };
-            var settings = new PublishingSettings(ClientId, Endpoints,
+            var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
                 ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3),
                 TimeSpan.FromSeconds(3), topics);
             var publishingMessage = new PublishingMessage(message, settings, true);
diff --git a/csharp/tests/PushConsumerBuilderTest.cs b/csharp/tests/PushConsumerBuilderTest.cs
new file mode 100644
index 0000000..ba33a1a
--- /dev/null
+++ b/csharp/tests/PushConsumerBuilderTest.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class PushConsumerBuilderTest
+    {
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetClientConfigWithNull()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetClientConfig(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetConsumerGroupWithNull()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetConsumerGroup(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetConsumerGroupWithSpecialChar()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetConsumerGroup("#.testGroup#");
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestBuildWithoutExpressions()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetSubscriptionExpression(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestBuildWithEmptyExpressions()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetSubscriptionExpression(new Dictionary<string, FilterExpression>());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestBuildWithNullMessageListener()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetMessageListener(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestNegativeMaxCacheMessageCount()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetMaxCacheMessageCount(-1);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestNegativeMaxCacheMessageSizeInBytes()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetMaxCacheMessageSizeInBytes(-1);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestNegativeConsumptionThreadCount()
+        {
+            var builder = new PushConsumer.Builder();
+            builder.SetMaxCacheMessageCount(-1);
+        }
+
+        [TestMethod]
+        public void TestBuild()
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1:9876").Build();
+            var subscription = new Dictionary<string, FilterExpression>
+                {{ "fakeTopic", new FilterExpression("*") }};
+            var builder = new PushConsumer.Builder();
+            builder.SetClientConfig(clientConfig).SetSubscriptionExpression(subscription).SetConsumerGroup("testGroup")
+                .SetMessageListener(new TestMessageListener()).SetMaxCacheMessageCount(10)
+                .SetMaxCacheMessageSizeInBytes(10).SetConsumptionThreadCount(10).Build();
+        }
+
+        private class TestMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                // Handle the received message and return consume result.
+                return ConsumeResult.SUCCESS;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
new file mode 100644
index 0000000..c8a383c
--- /dev/null
+++ b/csharp/tests/PushConsumerTest.cs
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class PushConsumerTest
+    {
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestSubscribeBeforeStartup()
+        {
+            var pushConsumer = CreatePushConsumer();
+            await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public void TestUnsubscribeBeforeStartup()
+        {
+            var pushConsumer = CreatePushConsumer();
+            pushConsumer.Unsubscribe("testTopic");
+        }
+
+        [TestMethod]
+        public async Task TestQueryAssignment()
+        {
+            var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+            var queryAssignmentResponse = CreateQueryAssignmentResponse();
+            var queryAssignmentInvocation =
+                new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+                queryAssignmentResponse, metadata);
+
+            SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+            await pushConsumer.QueryAssignment("testTopic");
+        }
+
+        [TestMethod]
+        public async Task TestScanAssignments()
+        {
+            var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+            var queryAssignmentResponse = CreateQueryAssignmentResponse(new Proto.Assignment
+            {
+                MessageQueue = queryRouteResponse.MessageQueues[0]
+            });
+            var queryAssignmentInvocation =
+                new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+                queryAssignmentResponse, metadata);
+
+            SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+
+            pushConsumer.State = State.Running;
+            await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+            pushConsumer.ScanAssignments();
+        }
+
+        [TestMethod]
+        public async Task TestScanAssignmentsWithoutResults()
+        {
+            var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+            var queryAssignmentResponse = CreateQueryAssignmentResponse();
+            var queryAssignmentInvocation =
+                new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+                queryAssignmentResponse, metadata);
+
+            SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+
+            pushConsumer.State = State.Running;
+            await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+            pushConsumer.ScanAssignments();
+        }
+
+        private PushConsumer CreatePushConsumer()
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1")
+                .Build();
+            return new PushConsumer(clientConfig, "testGroup",
+                new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+                10, 10, 1);
+        }
+
+        private class TestMessageListener : IMessageListener
+        {
+            public ConsumeResult Consume(MessageView messageView)
+            {
+                return ConsumeResult.SUCCESS;
+            }
+        }
+
+        private class MockClientStreamWriter<T> : IClientStreamWriter<T>
+        {
+            public Task WriteAsync(T message)
+            {
+                return Task.CompletedTask;
+            }
+
+            public WriteOptions WriteOptions { get; set; }
+
+            public Task CompleteAsync()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private class MockAsyncStreamReader<T> : IAsyncStreamReader<T>
+        {
+            public Task<bool> MoveNext(CancellationToken cancellationToken)
+            {
+                throw new NotImplementedException();
+            }
+
+            public T Current => throw new NotImplementedException();
+        }
+
+        private (PushConsumer, Mock<IClientManager>, Proto.QueryRouteResponse, Metadata) SetupMockConsumer()
+        {
+            var pushConsumer = CreatePushConsumer();
+            var metadata = pushConsumer.Sign();
+
+            var mq = new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    ResourceNamespace = "testNamespace",
+                    Name = "testTopic",
+                },
+                AcceptMessageTypes = { Proto.MessageType.Normal }
+            };
+
+            var queryRouteResponse = new Proto.QueryRouteResponse
+            {
+                Status = new Proto.Status { Code = Proto.Code.Ok },
+                MessageQueues = { mq }
+            };
+
+            var mockClientManager = new Mock<IClientManager>();
+            pushConsumer.SetClientManager(mockClientManager.Object);
+            return (pushConsumer, mockClientManager, queryRouteResponse, metadata);
+        }
+
+        private Proto.QueryAssignmentResponse CreateQueryAssignmentResponse(params Proto.Assignment[] assignments)
+        {
+            return new Proto.QueryAssignmentResponse
+            {
+                Status = new Proto.Status { Code = Proto.Code.Ok },
+                Assignments = { assignments }
+            };
+        }
+
+        private void SetupMockClientManager(Mock<IClientManager> mockClientManager,
+            Proto.QueryRouteResponse queryRouteResponse,
+            RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse> queryAssignmentInvocation,
+            Metadata metadata)
+        {
+            var queryRouteInvocation = new RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>(null,
+                queryRouteResponse, metadata);
+
+            mockClientManager.Setup(cm =>
+                cm.QueryRoute(It.IsAny<Endpoints>(), It.IsAny<Proto.QueryRouteRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(queryRouteInvocation));
+
+            var mockCall = new AsyncDuplexStreamingCall<Proto.TelemetryCommand, Proto.TelemetryCommand>(
+                new MockClientStreamWriter<Proto.TelemetryCommand>(),
+                new MockAsyncStreamReader<Proto.TelemetryCommand>(),
+                null, null, null, null);
+
+            mockClientManager.Setup(cm =>
+                cm.QueryAssignment(It.IsAny<Endpoints>(), It.IsAny<Proto.QueryAssignmentRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(queryAssignmentInvocation));
+
+            mockClientManager.Setup(cm => cm.Telemetry(It.IsAny<Endpoints>())).Returns(mockCall);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/PushSubscriptionSettingsTest.cs b/csharp/tests/PushSubscriptionSettingsTest.cs
new file mode 100644
index 0000000..e998d0d
--- /dev/null
+++ b/csharp/tests/PushSubscriptionSettingsTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Castle.Core.Internal;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class PushSubscriptionSettingsTest
+    {
+        private const string Namespace = "testNamespace";
+        private const string GroupResource = "testConsumerGroup";
+        private const string ClientId = "testClientId";
+        private const string Endpoint = "127.0.0.1:8080";
+        private static readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(3);
+        private static readonly ConcurrentDictionary<string, FilterExpression> SubscriptionExpression =
+            new ConcurrentDictionary<string, FilterExpression>(new Dictionary<string, FilterExpression> { { "testTopic", new FilterExpression("*") } });
+
+        private PushSubscriptionSettings CreateSettings()
+        {
+            return new PushSubscriptionSettings(Namespace, ClientId, new Endpoints(Endpoint), GroupResource, RequestTimeout, SubscriptionExpression);
+        }
+
+        [TestMethod]
+        public void TestToProtobuf()
+        {
+            var pushSubscriptionSettings = CreateSettings();
+            var settings = pushSubscriptionSettings.ToProtobuf();
+
+            Assert.AreEqual(Proto.ClientType.PushConsumer, settings.ClientType);
+            Assert.AreEqual(Duration.FromTimeSpan(RequestTimeout), settings.RequestTimeout);
+            Assert.IsFalse(settings.Subscription.Subscriptions.Count == 0);
+
+            var subscription = settings.Subscription;
+            Assert.AreEqual(subscription.Group, new Proto.Resource
+            {
+                ResourceNamespace = Namespace,
+                Name = GroupResource
+            });
+
+            Assert.IsFalse(subscription.Fifo);
+
+            var subscriptionsList = subscription.Subscriptions;
+            Assert.AreEqual(1, subscriptionsList.Count);
+
+            var subscriptionEntry = subscriptionsList[0];
+            Assert.AreEqual(Proto.FilterType.Tag, subscriptionEntry.Expression.Type);
+            Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource
+            {
+                ResourceNamespace = Namespace,
+                Name = "testTopic"
+            });
+        }
+
+        [TestMethod]
+        public void TestSync()
+        {
+            var durations = new List<Duration>
+        {
+            Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+            Duration.FromTimeSpan(TimeSpan.FromSeconds(2)),
+            Duration.FromTimeSpan(TimeSpan.FromSeconds(3))
+        };
+
+            var customizedBackoff = new Proto.CustomizedBackoff
+            {
+                Next = { durations }
+            };
+
+            var retryPolicy = new Proto.RetryPolicy
+            {
+                CustomizedBackoff = customizedBackoff,
+                MaxAttempts = 3
+            };
+
+            var subscription = new Proto.Subscription
+            {
+                Fifo = true,
+                ReceiveBatchSize = 96,
+                LongPollingTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(60))
+            };
+
+            var settings = new Proto.Settings
+            {
+                Subscription = subscription,
+                BackoffPolicy = retryPolicy
+            };
+
+            var pushSubscriptionSettings = new PushSubscriptionSettings(
+                "fakeNamespace", ClientId, new Endpoints(Endpoint), GroupResource, RequestTimeout,
+                new ConcurrentDictionary<string, FilterExpression>(SubscriptionExpression));
+
+            pushSubscriptionSettings.Sync(settings);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/ResourceTest.cs b/csharp/tests/ResourceTest.cs
new file mode 100644
index 0000000..006deb9
--- /dev/null
+++ b/csharp/tests/ResourceTest.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class ResourceTests
+    {
+        [TestMethod]
+        public void TestGetterAndSetter()
+        {
+            var resource = new Resource("foobar");
+            Assert.AreEqual("foobar", resource.Name);
+            Assert.AreEqual(string.Empty, resource.Namespace);
+
+            resource = new Resource("foo", "bar");
+            Assert.AreEqual("bar", resource.Name);
+            Assert.AreEqual("foo", resource.Namespace);
+        }
+
+        [TestMethod]
+        public void TestToProtobuf()
+        {
+            var resource = new Resource("foo", "bar");
+            var protobuf = resource.ToProtobuf();
+            Assert.AreEqual("foo", protobuf.ResourceNamespace);
+            Assert.AreEqual("bar", protobuf.Name);
+        }
+
+        [TestMethod]
+        public void TestEqual()
+        {
+            var resource0 = new Resource("foo", "bar");
+            var resource1 = new Resource("foo", "bar");
+            Assert.AreEqual(resource0, resource1);
+
+            var resource2 = new Resource("foo0", "bar");
+            Assert.AreNotEqual(resource0, resource2);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/SessionTest.cs b/csharp/tests/SessionTest.cs
new file mode 100644
index 0000000..ec53ef1
--- /dev/null
+++ b/csharp/tests/SessionTest.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class SessionTests
+    {
+        private static Client CreateTestClient()
+        {
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+            return new Producer(clientConfig, new ConcurrentDictionary<string, bool>(), 1, null);
+        }
+
+        [TestMethod]
+        public async Task TestSyncSettings()
+        {
+            var testClient = CreateTestClient();
+            var endpoints = new Endpoints(testClient.GetClientConfig().Endpoints);
+
+            var mockStreamWriter = new Mock<IClientStreamWriter<Proto.TelemetryCommand>>();
+            var mockAsyncStreamReader = new Mock<IAsyncStreamReader<Proto.TelemetryCommand>>();
+            var mockClientManager = new Mock<IClientManager>();
+            var mockGrpcCall = new AsyncDuplexStreamingCall<Proto.TelemetryCommand, Proto.TelemetryCommand>(
+                mockStreamWriter.Object, mockAsyncStreamReader.Object, null, null, null, null);
+
+            mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockGrpcCall);
+            var session = new Session(endpoints, mockGrpcCall, testClient);
+
+            var settings = new Proto.Settings();
+            mockStreamWriter.Setup(m => m.WriteAsync(It.Is<Proto.TelemetryCommand>(tc => tc.Settings == settings)))
+                .Returns(Task.CompletedTask);
+            testClient.SetClientManager(mockClientManager.Object);
+
+            await session.SyncSettings(true);
+
+            mockStreamWriter.Verify(m => m.WriteAsync(It.IsAny<Proto.TelemetryCommand>()), Times.Once);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerBuilderTest.cs b/csharp/tests/SimpleConsumerBuilderTest.cs
new file mode 100644
index 0000000..1031f91
--- /dev/null
+++ b/csharp/tests/SimpleConsumerBuilderTest.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+    [TestClass]
+    public class SimpleConsumerBuilderTest
+    {
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetClientConfigurationWithNull()
+        {
+            var builder = new SimpleConsumer.Builder();
+            builder.SetClientConfig(null);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestSetConsumerGroupWithNull()
+        {
+            var builder = new SimpleConsumer.Builder();
+            builder.SetConsumerGroup(null);
+        }
+
+        [TestMethod]
+        public void TestSetAwaitDuration()
+        {
+            var builder = new SimpleConsumer.Builder();
+            builder.SetAwaitDuration(TimeSpan.FromSeconds(5));
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestBuildWithEmptyExpressions()
+        {
+            var builder = new SimpleConsumer.Builder();
+            builder.SetSubscriptionExpression(new Dictionary<string, FilterExpression>());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestBuildWithoutExpressions()
+        {
+            var builder = new SimpleConsumer.Builder();
+            builder.SetSubscriptionExpression(null);
+        }
+
+        [TestMethod]
+        public void TestBuild()
+        {
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints("127.0.0.1:9876").Build();
+            var subscription = new Dictionary<string, FilterExpression>
+                {{ "testTopic", new FilterExpression("*") }};
+            var builder = new SimpleConsumer.Builder();
+            builder.SetClientConfig(clientConfig).SetConsumerGroup("testGroup").
+                SetSubscriptionExpression(subscription).Build();
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs b/csharp/tests/SimpleConsumerTest.cs
new file mode 100644
index 0000000..b8ea64b
--- /dev/null
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Org.Apache.Rocketmq.Error;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class SimpleConsumerTest
+    {
+        // Helper method to mock the client manager and assert exceptions
+        private async Task MockAndAssertAckException<TException>(SimpleConsumer consumer, MessageView messageView, Proto.Code code) where TException : Exception
+        {
+            var mockClientManager = new Mock<IClientManager>();
+            consumer.SetClientManager(mockClientManager.Object);
+
+            var metadata = consumer.Sign();
+            var response = new Proto.AckMessageResponse
+            {
+                Status = new Proto.Status { Code = code }
+            };
+            var invocation = new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(null, response, metadata);
+            mockClientManager.Setup(cm =>
+                    cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()))
+                    .Returns(Task.FromResult(invocation));
+            try
+            {
+                await consumer.Ack(messageView);
+            }
+            catch (Exception e)
+            {
+                Assert.IsInstanceOfType(e, typeof(TException));
+            }
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestReceiveWithoutStart()
+        {
+            var consumer = CreateSimpleConsumer();
+            await consumer.Receive(16, TimeSpan.FromSeconds(15));
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestAckWithoutStart()
+        {
+            var consumer = CreateSimpleConsumer();
+            var messageView = MessageView.FromProtobuf(CreateMessage());
+            await consumer.Ack(messageView);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestSubscribeWithoutStart()
+        {
+            var consumer = CreateSimpleConsumer();
+            await consumer.Subscribe("testTopic", new FilterExpression("*"));
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public void TestUnsubscribeWithoutStart()
+        {
+            var consumer = CreateSimpleConsumer();
+            consumer.Unsubscribe("testTopic");
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InternalErrorException))]
+        public async Task TestReceiveWithZeroMaxMessageNum()
+        {
+            var consumer = CreateSimpleConsumer();
+            consumer.State = State.Running;
+            await consumer.Receive(0, TimeSpan.FromSeconds(15));
+        }
+
+        [TestMethod]
+        public async Task TestAck()
+        {
+            var consumer = CreateSimpleConsumer();
+            consumer.State = State.Running;
+            var messageView = CreateMessageView();
+
+            await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.BadRequest);
+            await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.IllegalTopic);
+            await MockAndAssertAckException<BadRequestException>(consumer, messageView,
+                Proto.Code.IllegalConsumerGroup);
+            await MockAndAssertAckException<BadRequestException>(consumer, messageView,
+                Proto.Code.InvalidReceiptHandle);
+            await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.ClientIdRequired);
+            await MockAndAssertAckException<UnauthorizedException>(consumer, messageView, Proto.Code.Unauthorized);
+            await MockAndAssertAckException<ForbiddenException>(consumer, messageView, Proto.Code.Forbidden);
+            await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.NotFound);
+            await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.TopicNotFound);
+            await MockAndAssertAckException<TooManyRequestsException>(consumer, messageView,
+                Proto.Code.TooManyRequests);
+            await MockAndAssertAckException<InternalErrorException>(consumer, messageView, Proto.Code.InternalError);
+            await MockAndAssertAckException<InternalErrorException>(consumer, messageView,
+                Proto.Code.InternalServerError);
+            await MockAndAssertAckException<ProxyTimeoutException>(consumer, messageView, Proto.Code.ProxyTimeout);
+            await MockAndAssertAckException<UnsupportedException>(consumer, messageView, Proto.Code.Unsupported);
+        }
+
+        [TestMethod]
+        public async Task TestChangeInvisibleDuration()
+        {
+            var consumer = CreateSimpleConsumer();
+            consumer.State = State.Running;
+            var messageView = CreateMessageView();
+            var invisibleDuration = TimeSpan.FromSeconds(3);
+
+            var mockClientManager = new Mock<IClientManager>();
+            consumer.SetClientManager(mockClientManager.Object);
+            var metadata = consumer.Sign();
+
+            var response = new Proto.ChangeInvisibleDurationResponse
+            {
+                Status = new Proto.Status { Code = Proto.Code.Ok }
+            };
+            var invocation =
+                new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
+                    response, metadata);
+            mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
+                    It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(invocation));
+            await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
+
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.BadRequest);
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.IllegalTopic);
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.IllegalConsumerGroup);
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.IllegalInvisibleTime);
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.InvalidReceiptHandle);
+            await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+                invisibleDuration, Proto.Code.ClientIdRequired);
+            await MockAndAssertChangeInvisibleDurationException<UnauthorizedException>(consumer, messageView,
+                invisibleDuration, Proto.Code.Unauthorized);
+            await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
+                invisibleDuration, Proto.Code.NotFound);
+            await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
+                invisibleDuration, Proto.Code.TopicNotFound);
+            await MockAndAssertChangeInvisibleDurationException<TooManyRequestsException>(consumer, messageView,
+                invisibleDuration, Proto.Code.TooManyRequests);
+            await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
+                invisibleDuration, Proto.Code.InternalError);
+            await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
+                invisibleDuration, Proto.Code.InternalServerError);
+            await MockAndAssertChangeInvisibleDurationException<ProxyTimeoutException>(consumer, messageView,
+                invisibleDuration, Proto.Code.ProxyTimeout);
+            await MockAndAssertChangeInvisibleDurationException<UnsupportedException>(consumer, messageView,
+                invisibleDuration, Proto.Code.Unsupported);
+        }
+
+        private async Task MockAndAssertChangeInvisibleDurationException<TException>(SimpleConsumer consumer,
+            MessageView messageView, TimeSpan invisibleDuration, Proto.Code code) where TException : Exception
+        {
+            var mockClientManager = new Mock<IClientManager>();
+            consumer.SetClientManager(mockClientManager.Object);
+
+            var metadata = consumer.Sign();
+            var response = new Proto.ChangeInvisibleDurationResponse
+            {
+                Status = new Proto.Status { Code = code }
+            };
+            var invocation =
+                new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
+                    response, metadata);
+            mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
+                    It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
+                .Returns(Task.FromResult(invocation));
+            try
+            {
+                await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
+            }
+            catch (Exception e)
+            {
+                Assert.IsInstanceOfType(e, typeof(TException));
+            }
+        }
+
+        private SimpleConsumer CreateSimpleConsumer()
+        {
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+            var subscription = new Dictionary<string, FilterExpression> { { "testTopic", new FilterExpression("*") } };
+            var consumer =
+                new SimpleConsumer(clientConfig, "testConsumerGroup", TimeSpan.FromSeconds(15), subscription);
+            return consumer;
+        }
+
+        private Proto.Message CreateMessage()
+        {
+            var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+            var systemProperties = new Proto.SystemProperties
+            {
+                MessageType = Proto.MessageType.Normal,
+                MessageId = MessageIdGenerator.GetInstance().Next(),
+                BornHost = "127.0.0.1",
+                BodyDigest = digest,
+                BornTimestamp = new Timestamp()
+            };
+            var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+            var message = new Proto.Message
+            {
+                SystemProperties = systemProperties,
+                Topic = new Proto.Resource { Name = "testTopic" },
+                Body = body
+            };
+            return message;
+        }
+
+        private MessageView CreateMessageView()
+        {
+            var message = CreateMessage();
+            var messageQueue = new MessageQueue(new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = "127.0.0.1",
+                                Port = 8080
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource { ResourceNamespace = "testNamespace", Name = "testTopic" }
+            });
+            return MessageView.FromProtobuf(message, messageQueue);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleSubscriptionSettingsTest.cs b/csharp/tests/SimpleSubscriptionSettingsTest.cs
new file mode 100644
index 0000000..46cd8aa
--- /dev/null
+++ b/csharp/tests/SimpleSubscriptionSettingsTest.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Rocketmq.V2;
+using Castle.Core.Internal;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using FilterExpression = Org.Apache.Rocketmq.FilterExpression;
+
+namespace tests
+{
+    [TestClass]
+    public class SimpleSubscriptionSettingsTest
+    {
+        private const string TestNamespace = "testNamespace";
+        private const string GroupResource = "testConsumerGroup";
+        private const string ClientId = "testClientId";
+        private const string TestTopic = "testTopic";
+        private static readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(3);
+        private static readonly TimeSpan LongPollingTimeout = TimeSpan.FromSeconds(15);
+        private SimpleSubscriptionSettings _simpleSubscriptionSettings;
+
+        [TestInitialize]
+        public void Setup()
+        {
+            var subscriptionExpression = new ConcurrentDictionary<string, FilterExpression>(
+                new Dictionary<string, FilterExpression> { { TestTopic, new FilterExpression("*") } });
+            _simpleSubscriptionSettings = new SimpleSubscriptionSettings(
+                TestNamespace,
+                ClientId,
+                new Endpoints("127.0.0.1:9876"),
+                GroupResource,
+                RequestTimeout,
+                LongPollingTimeout,
+                subscriptionExpression
+            );
+        }
+
+        [TestMethod]
+        public void TestToProtobuf()
+        {
+            var settings = _simpleSubscriptionSettings.ToProtobuf();
+
+            Assert.AreEqual(Proto.ClientType.SimpleConsumer, settings.ClientType);
+            Assert.AreEqual(Duration.FromTimeSpan(RequestTimeout), settings.RequestTimeout);
+            Assert.IsFalse(settings.Subscription.Subscriptions.Count == 0);
+
+            var subscription = settings.Subscription;
+
+            Assert.AreEqual(subscription.Group, new Proto.Resource
+            {
+                ResourceNamespace = TestNamespace,
+                Name = GroupResource
+            });
+            Assert.IsFalse(subscription.Fifo);
+            Assert.AreEqual(Duration.FromTimeSpan(LongPollingTimeout), subscription.LongPollingTimeout);
+
+            var subscriptionsList = subscription.Subscriptions;
+            Assert.AreEqual(1, subscriptionsList.Count);
+
+            var subscriptionEntry = subscriptionsList[0];
+            Assert.AreEqual(FilterType.Tag, subscriptionEntry.Expression.Type);
+            Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource
+            {
+                ResourceNamespace = TestNamespace,
+                Name = TestTopic
+            });
+        }
+
+        [TestMethod]
+        public void TestSync()
+        {
+            var subscription = new Proto.Subscription
+            {
+                Fifo = true
+            };
+
+            var settings = new Proto.Settings
+            {
+                Subscription = subscription
+            };
+
+            _simpleSubscriptionSettings.Sync(settings);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/StatusCheckerTest.cs b/csharp/tests/StatusCheckerTest.cs
new file mode 100644
index 0000000..b64ccc7
--- /dev/null
+++ b/csharp/tests/StatusCheckerTest.cs
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Org.Apache.Rocketmq.Error;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class StatusCheckerTests
+    {
+        [TestMethod]
+        public void TestCheckStatusOk()
+        {
+            var status = new Proto.Status { Code = Proto.Code.Ok, Message = "OK" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Exception exception = null;
+            try
+            {
+                StatusChecker.Check(status, request, requestId);
+            }
+            catch (Exception ex)
+            {
+                exception = ex;
+            }
+
+            Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception);
+        }
+
+        [TestMethod]
+        public void TestCheckStatusMultipleResults()
+        {
+            var status = new Proto.Status { Code = Proto.Code.MultipleResults, Message = "Multiple Results" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Exception exception = null;
+            try
+            {
+                StatusChecker.Check(status, request, requestId);
+            }
+            catch (Exception ex)
+            {
+                exception = ex;
+            }
+
+            Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception);
+        }
+
+        [TestMethod]
+        public void TestCheckStatusBadRequest()
+        {
+            var status = new Proto.Status { Code = Proto.Code.BadRequest, Message = "Bad Request" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<BadRequestException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusUnauthorized()
+        {
+            var status = new Proto.Status { Code = Proto.Code.Unauthorized, Message = "Unauthorized" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<UnauthorizedException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusPaymentRequired()
+        {
+            var status = new Proto.Status { Code = Proto.Code.PaymentRequired, Message = "Payment Required" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<PaymentRequiredException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusForbidden()
+        {
+            var status = new Proto.Status { Code = Proto.Code.Forbidden, Message = "Forbidden" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<ForbiddenException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusMessageNotFoundForNonReceiveRequest()
+        {
+            var status = new Proto.Status { Code = Proto.Code.MessageNotFound, Message = "Message Not Found" };
+            var request = new Proto.SendMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<NotFoundException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusNotFound()
+        {
+            var status = new Proto.Status { Code = Proto.Code.NotFound, Message = "Not Found" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<NotFoundException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusPayloadTooLarge()
+        {
+            var status = new Proto.Status { Code = Proto.Code.PayloadTooLarge, Message = "Payload Too Large" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<PayloadTooLargeException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusTooManyRequests()
+        {
+            var status = new Proto.Status { Code = Proto.Code.TooManyRequests, Message = "Too Many Requests" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<TooManyRequestsException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusRequestHeaderFieldsTooLarge()
+        {
+            var status = new Proto.Status { Code = Proto.Code.RequestHeaderFieldsTooLarge, Message = "Request Header Fields Too Large" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<RequestHeaderFieldsTooLargeException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusInternalError()
+        {
+            var status = new Proto.Status { Code = Proto.Code.InternalError, Message = "Internal Error" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<InternalErrorException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusProxyTimeout()
+        {
+            var status = new Proto.Status { Code = Proto.Code.ProxyTimeout, Message = "Proxy Timeout" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<ProxyTimeoutException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusUnsupported()
+        {
+            var status = new Proto.Status { Code = Proto.Code.Unsupported, Message = "Unsupported" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<UnsupportedException>(() => StatusChecker.Check(status, request, requestId));
+        }
+
+        [TestMethod]
+        public void TestCheckStatusUnrecognized()
+        {
+            var status = new Proto.Status { Code = (Proto.Code)999, Message = "Unrecognized" };
+            var request = new Proto.ReceiveMessageRequest();
+            var requestId = "requestId";
+
+            Assert.ThrowsException<UnsupportedException>(() => StatusChecker.Check(status, request, requestId));
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/TransactionTest.cs b/csharp/tests/TransactionTest.cs
new file mode 100644
index 0000000..231acd2
--- /dev/null
+++ b/csharp/tests/TransactionTest.cs
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class TransactionTest
+    {
+        private const string FakeTag = "fakeTag";
+        private const string FakeTopic = "fakeTopic";
+        private const string FakeMsgKey = "fakeMsgKey";
+        private const string BrokerName = "broker0";
+        private const string Host = "127.0.0.1";
+        private const int Port = 8080;
+        private Producer _producer;
+        private byte[] _bytes;
+
+        [TestInitialize]
+        public void SetUp()
+        {
+            _producer = CreateTestClient();
+            _bytes = Encoding.UTF8.GetBytes("fakeBytes");
+        }
+
+        [TestMethod]
+        public void TestTryAddMessage()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            var publishingMessage = transaction.TryAddMessage(message);
+            Assert.AreEqual(MessageType.Transaction, publishingMessage.MessageType);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestTryAddExceededMessages()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            transaction.TryAddMessage(message);
+            transaction.TryAddMessage(message);
+        }
+
+        [TestMethod]
+        public void TestTryAddReceipt()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            var publishingMessage = transaction.TryAddMessage(message);
+            var mq0 = CreateMessageQueue();
+
+            var sendReceipt = CreateSendReceipt(mq0);
+            transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public void TestTryAddReceiptNotContained()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            var publishingMessage = new PublishingMessage(message, new PublishingSettings("fakeNamespace",
+                "fakeClientId", new Endpoints("fakeEndpoints"), new Mock<IRetryPolicy>().Object,
+                TimeSpan.FromSeconds(10), new ConcurrentDictionary<string, bool>()), true);
+            var mq0 = CreateMessageQueue();
+
+            var sendReceipt = CreateSendReceipt(mq0);
+            transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestCommitWithNoReceipts()
+        {
+            var transaction = new Transaction(_producer);
+            await transaction.Commit();
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(InvalidOperationException))]
+        public async Task TestRollbackWithNoReceipts()
+        {
+            var transaction = new Transaction(_producer);
+            await transaction.Rollback();
+        }
+
+        [TestMethod]
+        public async Task TestCommit()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            var publishingMessage = transaction.TryAddMessage(message);
+            var mq0 = CreateMessageQueue();
+
+            var sendReceipt = CreateSendReceipt(mq0);
+            transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+
+            var mockClientManager = new Mock<IClientManager>();
+            _producer.SetClientManager(mockClientManager.Object);
+
+            SetupCommitOrRollback(mockClientManager, true);
+
+            await transaction.Commit();
+        }
+
+        [TestMethod]
+        public async Task TestRollback()
+        {
+            var transaction = new Transaction(_producer);
+            var message = CreateMessage();
+            var publishingMessage = transaction.TryAddMessage(message);
+            var mq0 = CreateMessageQueue();
+
+            var sendReceipt = CreateSendReceipt(mq0);
+            transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+
+            var mockClientManager = new Mock<IClientManager>();
+            _producer.SetClientManager(mockClientManager.Object);
+
+            SetupCommitOrRollback(mockClientManager, false);
+
+            await transaction.Rollback();
+        }
+
+        private Producer CreateTestClient()
+        {
+            var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+            return new Producer(clientConfig, new ConcurrentDictionary<string, bool>(),
+                1, null);
+        }
+
+        private Message CreateMessage()
+        {
+            return new Message.Builder()
+                .SetTopic(FakeTopic)
+                .SetBody(_bytes)
+                .SetTag(FakeTag)
+                .SetKeys(FakeMsgKey)
+                .Build();
+        }
+
+        private Proto.MessageQueue CreateMessageQueue()
+        {
+            return new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = BrokerName,
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses = { new Proto.Address { Host = Host, Port = Port } }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource { ResourceNamespace = "foo-bar-namespace", Name = "TestTopic" }
+            };
+        }
+
+        private IEnumerable<SendReceipt> CreateSendReceipt(Proto.MessageQueue mq0)
+        {
+            var metadata = _producer.Sign();
+            var sendResultEntry = new Proto.SendResultEntry
+            {
+                MessageId = "fakeMsgId",
+                TransactionId = "fakeTxId",
+                Status = new Proto.Status { Code = Proto.Code.Ok },
+                Offset = 1
+            };
+            var sendMessageResponse = new Proto.SendMessageResponse
+            {
+                Status = new Proto.Status { Code = Proto.Code.Ok },
+                Entries = { sendResultEntry }
+            };
+            var invocation = new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(null, sendMessageResponse, metadata);
+            return SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), invocation);
+        }
+
+        private void SetupCommitOrRollback(Mock<IClientManager> mockClientManager, bool commit)
+        {
+            var endTransactionMetadata = _producer.Sign();
+            var endTransactionResponse = new Proto.EndTransactionResponse
+            {
+                Status = new Proto.Status { Code = Proto.Code.Ok }
+            };
+            var endTransactionInvocation = new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(null,
+                endTransactionResponse, endTransactionMetadata);
+            mockClientManager.Setup(cm => cm.EndTransaction(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.EndTransactionRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(endTransactionInvocation));
+
+            _producer.State = State.Running;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/tests.csproj b/csharp/tests/tests.csproj
index c16faa0..6cb51b3 100644
--- a/csharp/tests/tests.csproj
+++ b/csharp/tests/tests.csproj
@@ -7,6 +7,8 @@
   </PropertyGroup>

 

   <ItemGroup>

+    <PackageReference Include="Contrib.Grpc.Core.M1" Version="2.46.7" />

+    <PackageReference Include="Grpc.Core" Version="2.46.6" />

     <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />

     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />

     <PackageReference Include="Moq" Version="4.20.70" />

diff --git a/style/codespell/ignore_words.txt b/style/codespell/ignore_words.txt
index 583d907..f997894 100644
--- a/style/codespell/ignore_words.txt
+++ b/style/codespell/ignore_words.txt
@@ -1,4 +1,6 @@
 # guava
 errorprone
 # rust keyword
-crate
\ No newline at end of file
+crate
+# csharp keyword
+atleast