[ISSUE #776] Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C# SDK (#777)
Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C#
---------
Co-authored-by: tsaitsung-han.tht <tsaitsung-han.tht@alibaba-inc.com>
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 6e57d7e..c5627ec 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -34,6 +34,10 @@
private static long _successCounter;
private static long _failureCounter;
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
private static readonly BlockingCollection<Task<ISendReceipt>> Tasks =
new BlockingCollection<Task<ISendReceipt>>();
@@ -79,14 +83,11 @@
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
- const string accessKey = "yourAccessKey";
- const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
- var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
- const string endpoints = "foobar.com:8080";
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
- .SetEndpoints(endpoints)
+ .SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();
@@ -108,6 +109,7 @@
.SetTag(tag)
// You could set multiple keys for the single message actually.
.SetKeys("yourMessageKey-7044358f98fc")
+ .SetMessageGroup("fifo-group")
.Build();
DoStats();
diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index 1da8918..a97867d 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -27,18 +27,19 @@
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerDelayMessageExample).FullName);
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
- const string accessKey = "yourAccessKey";
- const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
- var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
- const string endpoints = "foobar.com:8080";
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
- .SetEndpoints(endpoints)
+ .SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index 138cd02..9a9d4fd 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerFifoMessageExample).FullName);
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
- const string accessKey = "yourAccessKey";
- const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
- var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
- const string endpoints = "foobar.com:8080";
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
- .SetEndpoints(endpoints)
+ .SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 6038eb6..21fb79c 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerNormalMessageExample).FullName);
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
- const string accessKey = "yourAccessKey";
- const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
- var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
- const string endpoints = "foobar.com:8080";
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
- .SetEndpoints(endpoints)
+ .SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();
diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index d353f74..4c5b3a7 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -26,6 +27,10 @@
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerTransactionMessageExample).FullName);
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
private class TransactionChecker : ITransactionChecker
{
public TransactionResolution Check(MessageView messageView)
@@ -39,14 +44,11 @@
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
- const string accessKey = "yourAccessKey";
- const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
- var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
- const string endpoints = "foobar.com:8080";
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
- .SetEndpoints(endpoints)
+ .SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();
@@ -76,9 +78,9 @@
var sendReceipt = await producer.Send(message, transaction);
Logger.LogInformation("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
// Commit the transaction.
- transaction.Commit();
+ await transaction.Commit();
// Or rollback the transaction.
- // transaction.Rollback();
+ // await transaction.Rollback();
// Close the producer if you don't need it anymore.
await producer.DisposeAsync();
diff --git a/csharp/examples/PushConsumerExample.cs b/csharp/examples/PushConsumerExample.cs
new file mode 100644
index 0000000..00fbbf7
--- /dev/null
+++ b/csharp/examples/PushConsumerExample.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+ public class PushConsumerExample
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName);
+
+ private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
+ internal static async Task QuickStart()
+ {
+ // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
+ // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+
+ // Credential provider is optional for client configuration.
+ var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(Endpoint)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ // Add your subscriptions.
+ const string consumerGroup = "yourConsumerGroup";
+ const string topic = "yourTopic";
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression("*") } };
+
+ var pushConsumer = await new PushConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetSubscriptionExpression(subscription)
+ .SetMessageListener(new CustomMessageListener())
+ .Build();
+
+ Thread.Sleep(Timeout.Infinite);
+
+ // Close the push consumer if you don't need it anymore.
+ // await pushConsumer.DisposeAsync();
+ }
+
+ private class CustomMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ // Handle the received message and return consume result.
+ Logger.LogInformation($"Consume message={messageView}");
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 63d57e8..ec5992d 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -34,6 +34,7 @@
// ProducerFifoMessageExample.QuickStart().Wait();
// ProducerDelayMessageExample.QuickStart().Wait();
// ProducerTransactionMessageExample.QuickStart().Wait();
+ // PushConsumerExample.QuickStart().Wait();
// SimpleConsumerExample.QuickStart().Wait();
// ProducerBenchmark.QuickStart().Wait();
}
diff --git a/csharp/rocketmq-client-csharp/Assignment.cs b/csharp/rocketmq-client-csharp/Assignment.cs
new file mode 100644
index 0000000..1005676
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Assignment.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Rocketmq
+{
+ public class Assignment
+ {
+ public Assignment(MessageQueue messageQueue)
+ {
+ MessageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
+ }
+
+ public MessageQueue MessageQueue { get; }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj) return true;
+ if (obj == null || GetType() != obj.GetType()) return false;
+
+ var other = (Assignment)obj;
+ return EqualityComparer<MessageQueue>.Default.Equals(MessageQueue, other.MessageQueue);
+ }
+
+ public override int GetHashCode()
+ {
+ return EqualityComparer<MessageQueue>.Default.GetHashCode(MessageQueue);
+ }
+
+ public override string ToString()
+ {
+ return $"Assignment{{messageQueue={MessageQueue}}}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Assignments.cs b/csharp/rocketmq-client-csharp/Assignments.cs
new file mode 100644
index 0000000..a25f5ea
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Assignments.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class Assignments
+ {
+ private readonly List<Assignment> _assignmentList;
+
+ public Assignments(List<Assignment> assignmentList)
+ {
+ _assignmentList = assignmentList;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+
+ if (obj == null || GetType() != obj.GetType())
+ {
+ return false;
+ }
+
+ var other = (Assignments)obj;
+ return _assignmentList.SequenceEqual(other._assignmentList);
+ }
+
+ public override int GetHashCode()
+ {
+ return HashCode.Combine(_assignmentList);
+ }
+
+ public override string ToString()
+ {
+ return $"{nameof(Assignments)} {{ {nameof(_assignmentList)} = {_assignmentList} }}";
+ }
+
+ public List<Assignment> GetAssignmentList()
+ {
+ return _assignmentList;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 1c81b41..491ef4c 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -21,10 +21,13 @@
using System.Threading;
using System;
using System.Linq;
+using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Proto = Apache.Rocketmq.V2;
using grpcLib = Grpc.Core;
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public abstract class Client
@@ -49,7 +52,7 @@
protected readonly ClientConfig ClientConfig;
protected readonly Endpoints Endpoints;
- protected readonly IClientManager ClientManager;
+ protected IClientManager ClientManager;
protected readonly string ClientId;
protected readonly ClientMeterManager ClientMeterManager;
@@ -113,7 +116,7 @@
Logger.LogDebug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
}
- private (bool, Session) GetSession(Endpoints endpoints)
+ private protected (bool, Session) GetSession(Endpoints endpoints)
{
_sessionLock.EnterReadLock();
try
@@ -151,11 +154,11 @@
protected abstract IEnumerable<string> GetTopics();
- protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
+ internal abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData);
- private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
+ internal async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
{
var routeEndpoints = new HashSet<Endpoints>();
foreach (var mq in topicRouteData.MessageQueues)
@@ -261,7 +264,7 @@
$"AvailableCompletionPortThreads={availableIo}");
}
- private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
+ private protected void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
{
Task.Run(async () =>
{
@@ -313,6 +316,7 @@
{
Topic = new Proto::Resource
{
+ ResourceNamespace = ClientConfig.Namespace,
Name = topic
},
Endpoints = Endpoints.ToProtobuf()
@@ -397,7 +401,7 @@
return metadata;
}
- protected abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();
+ internal abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();
private async void NotifyClientTermination()
{
@@ -432,6 +436,17 @@
return ClientConfig;
}
+ internal IClientManager GetClientManager()
+ {
+ return ClientManager;
+ }
+
+ // Only for testing
+ internal void SetClientManager(IClientManager clientManager)
+ {
+ ClientManager = clientManager;
+ }
+
internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
Proto.RecoverOrphanedTransactionCommand command)
{
@@ -439,7 +454,7 @@
$"clientId={ClientId}, endpoints={endpoints}");
}
- internal async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
+ internal virtual async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
{
// Only push consumer support message consumption verification.
Logger.LogWarning($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " +
@@ -489,7 +504,7 @@
internal void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
{
- var metric = new Metric(settings.Metric);
+ var metric = new Metric(settings.Metric ?? new Proto.Metric());
ClientMeterManager.Reset(metric);
GetSettings().Sync(settings);
}
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index cf02a19..ed17f7d 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -22,12 +22,13 @@
public class ClientConfig
{
private ClientConfig(ISessionCredentialsProvider sessionCredentialsProvider, TimeSpan requestTimeout,
- string endpoints, bool sslEnabled)
+ string endpoints, bool sslEnabled, string namespaceName)
{
SessionCredentialsProvider = sessionCredentialsProvider;
RequestTimeout = requestTimeout;
Endpoints = endpoints;
SslEnabled = sslEnabled;
+ Namespace = namespaceName;
}
public ISessionCredentialsProvider SessionCredentialsProvider { get; }
@@ -38,12 +39,15 @@
public bool SslEnabled { get; }
+ public string Namespace { get; }
+
public class Builder
{
private ISessionCredentialsProvider _sessionCredentialsProvider;
private TimeSpan _requestTimeout = TimeSpan.FromSeconds(3);
private string _endpoints;
private bool _sslEnabled = true;
+ private string _namespace = "";
public Builder SetCredentialsProvider(ISessionCredentialsProvider sessionCredentialsProvider)
{
@@ -69,9 +73,15 @@
return this;
}
+ public Builder SetNamespace(string namespaceName)
+ {
+ _namespace = namespaceName;
+ return this;
+ }
+
public ClientConfig Build()
{
- return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled);
+ return new ClientConfig(_sessionCredentialsProvider, _requestTimeout, _endpoints, _sslEnabled, _namespace);
}
}
}
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index fed6f38..e42a29d 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -143,6 +143,7 @@
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);
+
return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
request, response, metadata);
}
@@ -166,6 +167,16 @@
request, response, metadata);
}
+ public async Task<RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>>
+ ForwardMessageToDeadLetterQueue(Endpoints endpoints,
+ Proto.ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+ {
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).ForwardMessageToDeadLetterQueue(metadata, request, timeout);
+ return new RpcInvocation<Proto.ForwardMessageToDeadLetterQueueRequest, Proto.ForwardMessageToDeadLetterQueueResponse>(
+ request, response, metadata);
+ }
+
public async Task<RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>> EndTransaction(
Endpoints endpoints, Proto.EndTransactionRequest request, TimeSpan timeout)
{
diff --git a/csharp/rocketmq-client-csharp/ConsumeResult.cs b/csharp/rocketmq-client-csharp/ConsumeResult.cs
new file mode 100644
index 0000000..6cd2124
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeResult.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Designed for push consumer specifically.
+ /// </summary>
+ public enum ConsumeResult
+ {
+ /// <summary>
+ /// Consume message successfully.
+ /// </summary>
+ SUCCESS,
+ /// <summary>
+ /// Failed to consume message.
+ /// </summary>
+ FAILURE
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ConsumeService.cs b/csharp/rocketmq-client-csharp/ConsumeService.cs
new file mode 100644
index 0000000..7dcc667
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeService.cs
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+ public abstract class ConsumeService
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<ConsumeService>();
+
+ protected readonly string ClientId;
+ private readonly IMessageListener _messageListener;
+ private readonly TaskScheduler _consumptionTaskScheduler;
+ private readonly CancellationToken _consumptionCtsToken;
+
+ public ConsumeService(string clientId, IMessageListener messageListener, TaskScheduler consumptionTaskScheduler,
+ CancellationToken consumptionCtsToken)
+ {
+ ClientId = clientId;
+ _messageListener = messageListener;
+ _consumptionTaskScheduler = consumptionTaskScheduler;
+ _consumptionCtsToken = consumptionCtsToken;
+ }
+
+ public abstract void Consume(ProcessQueue pq, List<MessageView> messageViews);
+
+ public Task<ConsumeResult> Consume(MessageView messageView)
+ {
+ return Consume(messageView, TimeSpan.Zero);
+ }
+
+ public Task<ConsumeResult> Consume(MessageView messageView, TimeSpan delay)
+ {
+ var task = new ConsumeTask(ClientId, _messageListener, messageView);
+ var delayMilliseconds = (int)delay.TotalMilliseconds;
+
+ if (delayMilliseconds <= 0)
+ {
+ return Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None,
+ _consumptionTaskScheduler);
+ }
+
+ var tcs = new TaskCompletionSource<ConsumeResult>();
+
+ Task.Run(async () =>
+ {
+ try
+ {
+ await Task.Delay(delay, _consumptionCtsToken);
+ var result = await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken,
+ TaskCreationOptions.None, _consumptionTaskScheduler);
+ tcs.SetResult(result);
+ }
+ catch (Exception e)
+ {
+ Logger.LogError(e, $"Error while consuming message, clientId={ClientId}");
+ tcs.SetException(e);
+ }
+ }, _consumptionCtsToken);
+
+ return tcs.Task;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ConsumeTask.cs b/csharp/rocketmq-client-csharp/ConsumeTask.cs
new file mode 100644
index 0000000..9214b68
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ConsumeTask.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+ public class ConsumeTask
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<ConsumeTask>();
+
+ private readonly string _clientId;
+ private readonly IMessageListener _messageListener;
+ private readonly MessageView _messageView;
+
+ public ConsumeTask(string clientId, IMessageListener messageListener, MessageView messageView)
+ {
+ _clientId = clientId;
+ _messageListener = messageListener;
+ _messageView = messageView;
+ }
+
+ /// <summary>
+ /// Invoke IMessageListener to consume the message.
+ /// </summary>
+ /// <returns>Message(s) which are consumed successfully.</returns>
+ public ConsumeResult Call()
+ {
+ try
+ {
+ var consumeResult = _messageListener.Consume(_messageView);
+ return consumeResult;
+ }
+ catch (Exception e)
+ {
+ Logger.LogError(e, $"Message listener raised an exception while consuming messages, clientId={_clientId}," +
+ $" mq={_messageView.MessageQueue}, messageId={_messageView.MessageId}");
+ return ConsumeResult.FAILURE;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
index 0bf7a45..2ad135b 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -18,11 +18,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Proto = Apache.Rocketmq.V2;
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public abstract class Consumer : Client
@@ -36,7 +39,7 @@
ConsumerGroup = consumerGroup;
}
- protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
+ internal async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
TimeSpan awaitDuration)
{
var tolerance = ClientConfig.RequestTimeout;
@@ -85,11 +88,12 @@
};
}
- protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+ internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
FilterExpression filterExpression, TimeSpan awaitDuration, TimeSpan invisibleDuration)
{
var group = new Proto.Resource
{
+ ResourceNamespace = ClientConfig.Namespace,
Name = ConsumerGroup
};
return new Proto.ReceiveMessageRequest
@@ -103,5 +107,26 @@
InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
};
}
+
+ protected internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+ FilterExpression filterExpression, TimeSpan awaitDuration, string attemptId)
+ {
+ attemptId ??= Guid.NewGuid().ToString();
+ var group = new Proto.Resource
+ {
+ ResourceNamespace = ClientConfig.Namespace,
+ Name = ConsumerGroup
+ };
+ return new Proto.ReceiveMessageRequest
+ {
+ Group = group,
+ MessageQueue = mq.ToProtobuf(),
+ FilterExpression = WrapFilterExpression(filterExpression),
+ LongPollingTimeout = Duration.FromTimeSpan(awaitDuration),
+ BatchSize = batchSize,
+ AutoRenew = true,
+ AttemptId = attemptId
+ };
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs
new file mode 100644
index 0000000..83f1bff
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class CustomizedBackoffRetryPolicy : IRetryPolicy
+ {
+ private readonly int _maxAttempts;
+ private readonly List<TimeSpan> _durations;
+
+ public CustomizedBackoffRetryPolicy(List<TimeSpan> durations, int maxAttempts)
+ {
+ if (durations == null || !durations.Any())
+ {
+ throw new ArgumentException("durations must not be empty", nameof(durations));
+ }
+ _durations = durations;
+ _maxAttempts = maxAttempts;
+ }
+
+ public int GetMaxAttempts()
+ {
+ return _maxAttempts;
+ }
+
+ public List<TimeSpan> GetDurations()
+ {
+ return _durations;
+ }
+
+ public TimeSpan GetNextAttemptDelay(int attempt)
+ {
+ if (attempt <= 0)
+ {
+ throw new ArgumentException("attempt must be positive", nameof(attempt));
+ }
+ return attempt > _durations.Count ? _durations.Last() : _durations[attempt - 1];
+ }
+
+ public static CustomizedBackoffRetryPolicy FromProtobuf(RetryPolicy retryPolicy)
+ {
+ if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff))
+ {
+ throw new ArgumentException("Illegal retry policy");
+ }
+ var customizedBackoff = retryPolicy.CustomizedBackoff;
+ var durations = customizedBackoff.Next.Select(duration => duration.ToTimeSpan()).ToList();
+ return new CustomizedBackoffRetryPolicy(durations, retryPolicy.MaxAttempts);
+ }
+
+ public RetryPolicy ToProtobuf()
+ {
+ var customizedBackoff = new CustomizedBackoff
+ {
+ Next = { _durations.Select(Duration.FromTimeSpan) }
+ };
+ return new RetryPolicy
+ {
+ MaxAttempts = _maxAttempts,
+ CustomizedBackoff = customizedBackoff
+ };
+ }
+
+ public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy)
+ {
+ if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff))
+ {
+ throw new InvalidOperationException("Strategy must be customized backoff");
+ }
+
+ return InheritBackoff(retryPolicy.CustomizedBackoff);
+ }
+
+ private IRetryPolicy InheritBackoff(CustomizedBackoff retryPolicy)
+ {
+ var durations = retryPolicy.Next.Select(duration => duration.ToTimeSpan()).ToList();
+ return new CustomizedBackoffRetryPolicy(durations, _maxAttempts);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
index d4826d8..1ee7a28 100644
--- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
@@ -25,7 +25,7 @@
{
private readonly int _maxAttempts;
- private ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
+ public ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
double backoffMultiplier)
{
_maxAttempts = maxAttempts;
@@ -39,11 +39,11 @@
return _maxAttempts;
}
- private TimeSpan InitialBackoff { get; }
+ public TimeSpan InitialBackoff { get; }
- private TimeSpan MaxBackoff { get; }
+ public TimeSpan MaxBackoff { get; }
- private double BackoffMultiplier { get; }
+ public double BackoffMultiplier { get; }
public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy)
{
@@ -63,6 +63,10 @@
public TimeSpan GetNextAttemptDelay(int attempt)
{
+ if (attempt <= 0)
+ {
+ throw new ArgumentException("attempt must be positive", nameof(attempt));
+ }
var delayMillis = Math.Min(
InitialBackoff.TotalMilliseconds * Math.Pow(BackoffMultiplier, 1.0 * (attempt - 1)),
MaxBackoff.TotalMilliseconds);
@@ -88,5 +92,18 @@
ExponentialBackoff = exponentialBackoff
};
}
+
+ public static ExponentialBackoffRetryPolicy FromProtobuf(Proto.RetryPolicy retryPolicy)
+ {
+ if (!retryPolicy.StrategyCase.Equals(Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff))
+ {
+ throw new ArgumentException("Illegal retry policy");
+ }
+ var exponentialBackoff = retryPolicy.ExponentialBackoff;
+ return new ExponentialBackoffRetryPolicy(retryPolicy.MaxAttempts,
+ exponentialBackoff.Initial.ToTimeSpan(),
+ exponentialBackoff.Max.ToTimeSpan(),
+ exponentialBackoff.Multiplier);
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/FifoConsumeService.cs b/csharp/rocketmq-client-csharp/FifoConsumeService.cs
new file mode 100644
index 0000000..b293c41
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/FifoConsumeService.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+ public class FifoConsumeService : ConsumeService
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<FifoConsumeService>();
+
+ public FifoConsumeService(string clientId, IMessageListener messageListener,
+ TaskScheduler consumptionExecutor, CancellationToken consumptionCtsToken) :
+ base(clientId, messageListener, consumptionExecutor, consumptionCtsToken)
+ {
+ }
+
+ public override void Consume(ProcessQueue pq, List<MessageView> messageViews)
+ {
+ ConsumeIteratively(pq, messageViews.GetEnumerator());
+ }
+
+ public void ConsumeIteratively(ProcessQueue pq, IEnumerator<MessageView> iterator)
+ {
+ if (!iterator.MoveNext())
+ {
+ return;
+ }
+
+ var messageView = iterator.Current;
+
+ if (messageView != null && messageView.IsCorrupted())
+ {
+ // Discard corrupted message.
+ Logger.LogError($"Message is corrupted for FIFO consumption, prepare to discard it," +
+ $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
+ pq.DiscardFifoMessage(messageView);
+ ConsumeIteratively(pq, iterator); // Recursively consume the next message
+ return;
+ }
+
+ var consumeTask = Consume(messageView);
+ consumeTask.ContinueWith(async t =>
+ {
+ var result = await t;
+ await pq.EraseFifoMessage(messageView, result);
+ }, TaskContinuationOptions.ExecuteSynchronously).ContinueWith(_ => ConsumeIteratively(pq, iterator),
+ TaskContinuationOptions.ExecuteSynchronously);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index 19f9459..743df9f 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -113,6 +113,16 @@
Endpoints endpoints, ChangeInvisibleDurationRequest request, TimeSpan timeout);
/// <summary>
+ /// Send a message to the dead letter queue asynchronously, the method ensures no throwable.
+ /// </summary>
+ /// <param name="endpoints">Requested endpoints.</param>
+ /// <param name="request">Request of sending a message to DLQ.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns></returns>
+ Task<RpcInvocation<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse>> ForwardMessageToDeadLetterQueue(
+ Endpoints endpoints, ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout);
+
+ /// <summary>
/// Transaction ending request.
/// </summary>
/// <param name="endpoints">The target endpoints.</param>
diff --git a/csharp/rocketmq-client-csharp/IMessageListener.cs b/csharp/rocketmq-client-csharp/IMessageListener.cs
new file mode 100644
index 0000000..d011fc8
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/IMessageListener.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+ public interface IMessageListener
+ {
+ /// <summary>
+ /// The callback interface to consume the message.
+ /// </summary>
+ /// <remarks>
+ /// You should process the <see cref="MessageView"/> and return the corresponding <see cref="ConsumeResult"/>.
+ /// The consumption is successful only when <see cref="ConsumeResult.SUCCESS"/> is returned, null pointer is returned
+ /// or exception is thrown would cause message consumption failure too.
+ /// </remarks>
+ ConsumeResult Consume(MessageView messageView);
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
index 27c770b..7abd5da 100644
--- a/csharp/rocketmq-client-csharp/ITransaction.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -15,12 +15,14 @@
* limitations under the License.
*/
+using System.Threading.Tasks;
+
namespace Org.Apache.Rocketmq
{
public interface ITransaction
{
- void Commit();
+ Task Commit();
- void Rollback();
+ Task Rollback();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index 52b821a..aaa5ebb 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -77,7 +77,17 @@
public DateTime BornTime { get; }
- public int DeliveryAttempt { get; }
+ public int DeliveryAttempt { get; set; }
+
+ public int IncrementAndGetDeliveryAttempt()
+ {
+ return ++DeliveryAttempt;
+ }
+
+ public bool IsCorrupted()
+ {
+ return _corrupted;
+ }
public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue = null)
{
diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs
new file mode 100644
index 0000000..0c182d7
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq.Error;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Process queue is a cache to store fetched messages from remote for <c>PushConsumer</c>.
+ ///
+ /// <c>PushConsumer</c> queries assignments periodically and converts them into message queues, each message queue is
+ /// mapped into one process queue to fetch message from remote. If the message queue is removed from the newest
+ /// assignment, the corresponding process queue is marked as expired soon, which means its lifecycle is over.
+ /// </summary>
+ public class ProcessQueue
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<ProcessQueue>();
+
+ internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1);
+ internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1);
+ internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1);
+
+ private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20);
+ private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1);
+ private static readonly TimeSpan ReceivingBackoffDelayWhenCacheIsFull = TimeSpan.FromSeconds(1);
+
+ private readonly PushConsumer _consumer;
+
+ /// <summary>
+ /// Dropped means ProcessQueue is deprecated, which means no message would be fetched from remote anymore.
+ /// </summary>
+ private volatile bool _dropped;
+ private readonly MessageQueue _mq;
+ private readonly FilterExpression _filterExpression;
+
+ /// <summary>
+ /// Messages which is pending means have been cached, but are not taken by consumer dispatcher yet.
+ /// </summary>
+ private readonly List<MessageView> _cachedMessages;
+ private readonly ReaderWriterLockSlim _cachedMessageLock;
+ private long _cachedMessagesBytes;
+
+ private long _activityTime = DateTime.UtcNow.Ticks;
+ private long _cacheFullTime = long.MinValue;
+
+ private readonly CancellationTokenSource _receiveMsgCts;
+ private readonly CancellationTokenSource _ackMsgCts;
+ private readonly CancellationTokenSource _changeInvisibleDurationCts;
+ private readonly CancellationTokenSource _forwardMessageToDeadLetterQueueCts;
+
+ public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression,
+ CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts,
+ CancellationTokenSource changeInvisibleDurationCts, CancellationTokenSource forwardMessageToDeadLetterQueueCts)
+ {
+ _consumer = consumer;
+ _dropped = false;
+ _mq = mq;
+ _filterExpression = filterExpression;
+ _cachedMessages = new List<MessageView>();
+ _cachedMessageLock = new ReaderWriterLockSlim();
+ _cachedMessagesBytes = 0;
+ _receiveMsgCts = receiveMsgCts;
+ _ackMsgCts = ackMsgCts;
+ _changeInvisibleDurationCts = changeInvisibleDurationCts;
+ _forwardMessageToDeadLetterQueueCts = forwardMessageToDeadLetterQueueCts;
+ }
+
+ /// <summary>
+ /// Get the mapped message queue.
+ /// </summary>
+ /// <returns>mapped message queue.</returns>
+ public MessageQueue GetMessageQueue()
+ {
+ return _mq;
+ }
+
+ /// <summary>
+ /// Drop the current process queue, which means the process queue's lifecycle is over,
+ /// thus it would not fetch messages from the remote anymore if dropped.
+ /// </summary>
+ public void Drop()
+ {
+ _dropped = true;
+ }
+
+ /// <summary>
+ /// ProcessQueue would be regarded as expired if no fetch message for a long time.
+ /// </summary>
+ /// <returns>if it is expired.</returns>
+ public bool Expired()
+ {
+ var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
+ var requestTimeout = _consumer.GetClientConfig().RequestTimeout;
+ var maxIdleDuration = longPollingTimeout.Add(requestTimeout).Multiply(3);
+ var idleDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _activityTime);
+ if (idleDuration < maxIdleDuration.Ticks)
+ {
+ return false;
+ }
+ var afterCacheFullDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _cacheFullTime);
+ if (afterCacheFullDuration < maxIdleDuration.Ticks)
+ {
+ return false;
+ }
+ Logger.LogWarning(
+ $"Process queue is idle, idleDuration={idleDuration}, maxIdleDuration={maxIdleDuration}," +
+ $" afterCacheFullDuration={afterCacheFullDuration}, mq={_mq}, clientId={_consumer.GetClientId()}");
+ return true;
+ }
+
+ internal void CacheMessages(List<MessageView> messageList)
+ {
+ _cachedMessageLock.EnterWriteLock();
+ try
+ {
+ foreach (var messageView in messageList)
+ {
+ _cachedMessages.Add(messageView);
+ Interlocked.Add(ref _cachedMessagesBytes, messageView.Body.Length);
+ }
+ }
+ finally
+ {
+ _cachedMessageLock.ExitWriteLock();
+ }
+ }
+
+ private int GetReceptionBatchSize()
+ {
+ var bufferSize = _consumer.CacheMessageCountThresholdPerQueue() - CachedMessagesCount();
+ bufferSize = Math.Max(bufferSize, 1);
+ return Math.Min(bufferSize, _consumer.GetPushConsumerSettings().GetReceiveBatchSize());
+ }
+
+ /// <summary>
+ /// Start to fetch messages from remote immediately.
+ /// </summary>
+ public void FetchMessageImmediately()
+ {
+ ReceiveMessageImmediately();
+ }
+
+ /// <summary>
+ /// Receive message later by message queue.
+ /// </summary>
+ /// <remarks>
+ /// Make sure that no exception will be thrown.
+ /// </remarks>
+ public void OnReceiveMessageException(Exception t, string attemptId)
+ {
+ var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay;
+ ReceiveMessageLater(delay, attemptId);
+ }
+
+ private void ReceiveMessageLater(TimeSpan delay, string attemptId)
+ {
+ var clientId = _consumer.GetClientId();
+ Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}");
+ Task.Run(async () =>
+ {
+ try
+ {
+ await Task.Delay(delay, _receiveMsgCts.Token);
+ ReceiveMessage(attemptId);
+ }
+ catch (Exception ex)
+ {
+ if (_receiveMsgCts.IsCancellationRequested)
+ {
+ return;
+ }
+ Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}");
+ OnReceiveMessageException(ex, attemptId);
+ }
+ });
+ }
+
+ private string GenerateAttemptId()
+ {
+ return Guid.NewGuid().ToString();
+ }
+
+ public void ReceiveMessage()
+ {
+ ReceiveMessage(GenerateAttemptId());
+ }
+
+ public void ReceiveMessage(string attemptId)
+ {
+ var clientId = _consumer.GetClientId();
+ if (_dropped)
+ {
+ Logger.LogInformation($"Process queue has been dropped, no longer receive message, mq={_mq}, clientId={clientId}");
+ return;
+ }
+ if (IsCacheFull())
+ {
+ Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}");
+ ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId);
+ return;
+ }
+ ReceiveMessageImmediately(attemptId);
+ }
+
+ private void ReceiveMessageImmediately()
+ {
+ ReceiveMessageImmediately(GenerateAttemptId());
+ }
+
+ private void ReceiveMessageImmediately(string attemptId)
+ {
+ var clientId = _consumer.GetClientId();
+ if (_consumer.State != State.Running)
+ {
+ Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}");
+ return;
+ }
+
+ try
+ {
+ var endpoints = _mq.Broker.Endpoints;
+ var batchSize = GetReceptionBatchSize();
+ var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
+ var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout,
+ attemptId);
+
+ Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks);
+
+ var task = _consumer.ReceiveMessage(request, _mq, longPollingTimeout);
+ task.ContinueWith(t =>
+ {
+ if (t.IsFaulted)
+ {
+ string nextAttemptId = null;
+ if (t.Exception is { InnerException: RpcException { StatusCode: StatusCode.DeadlineExceeded } })
+ {
+ nextAttemptId = request.AttemptId;
+ }
+
+ Logger.LogError(t.Exception, $"Exception raised during message reception, mq={_mq}," +
+ $" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," +
+ $" clientId={clientId}");
+ OnReceiveMessageException(t.Exception, nextAttemptId);
+ }
+ else
+ {
+ try
+ {
+ var result = t.Result;
+ OnReceiveMessageResult(result);
+ }
+ catch (Exception ex)
+ {
+ // Should never reach here.
+ Logger.LogError($"[Bug] Exception raised while handling receive result, mq={_mq}," +
+ $" endpoints={endpoints}, clientId={clientId}, exception={ex}");
+ OnReceiveMessageException(ex, attemptId);
+ }
+ }
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}");
+ OnReceiveMessageException(ex, attemptId);
+ }
+ }
+
+ private void OnReceiveMessageResult(ReceiveMessageResult result)
+ {
+ var messages = result.Messages;
+ if (messages.Count > 0)
+ {
+ CacheMessages(messages);
+ _consumer.GetConsumeService().Consume(this, messages);
+ }
+ ReceiveMessage();
+ }
+
+ private bool IsCacheFull()
+ {
+ var cacheMessageCountThresholdPerQueue = _consumer.CacheMessageCountThresholdPerQueue();
+ var actualMessagesQuantity = CachedMessagesCount();
+ var clientId = _consumer.GetClientId();
+ if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity)
+ {
+ Logger.LogWarning($"Process queue total cached messages quantity exceeds the threshold," +
+ $" threshold={cacheMessageCountThresholdPerQueue}, actual={actualMessagesQuantity}," +
+ $" mq={_mq}, clientId={clientId}");
+ Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
+ return true;
+ }
+
+ var cacheMessageBytesThresholdPerQueue = _consumer.CacheMessageBytesThresholdPerQueue();
+ var actualCachedMessagesBytes = CachedMessageBytes();
+ if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes)
+ {
+ Logger.LogWarning($"Process queue total cached messages memory exceeds the threshold," +
+ $" threshold={cacheMessageBytesThresholdPerQueue} bytes," +
+ $" actual={actualCachedMessagesBytes} bytes, mq={_mq}, clientId={clientId}");
+ Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
+ return true;
+ }
+
+ return false;
+ }
+
+ /// <summary>
+ /// Erase messages(Non-FIFO-consume-mode) which have been consumed properly.
+ /// </summary>
+ /// <param name="messageView">the message to erase.</param>
+ /// <param name="consumeResult">consume result.</param>
+ public void EraseMessage(MessageView messageView, ConsumeResult consumeResult)
+ {
+ var task = ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : NackMessage(messageView);
+ _ = task.ContinueWith(_ =>
+ {
+ EvictCache(messageView);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ private Task AckMessage(MessageView messageView)
+ {
+ var tcs = new TaskCompletionSource<bool>();
+ AckMessage(messageView, 1, tcs);
+ return tcs.Task;
+ }
+
+ private void AckMessage(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+ {
+ var clientId = _consumer.GetClientId();
+ var consumerGroup = _consumer.GetConsumerGroup();
+ var messageId = messageView.MessageId;
+ var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+ var request = _consumer.WrapAckMessageRequest(messageView);
+ var task = _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+ _consumer.GetClientConfig().RequestTimeout);
+
+ task.ContinueWith(responseTask =>
+ {
+ if (responseTask.IsFaulted)
+ {
+ Logger.LogError(responseTask.Exception, $"Exception raised while acknowledging message," +
+ $" would retry later, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}," +
+ $" messageId={messageId}," +
+ $" mq={_mq}, endpoints={endpoints}");
+ AckMessageLater(messageView, attempt + 1, tcs);
+ }
+ else
+ {
+ var invocation = responseTask.Result;
+ var requestId = invocation.RequestId;
+ var status = invocation.Response.Status;
+ var statusCode = status.Code;
+
+ if (statusCode == Code.InvalidReceiptHandle)
+ {
+ Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," +
+ $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
+ $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
+ $" status message={status.Message}");
+ tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
+ }
+
+ if (statusCode != Code.Ok)
+ {
+ Logger.LogError(
+ $"Failed to change invisible duration, would retry later, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," +
+ $" endpoints={endpoints}, requestId={requestId}, status message={status.Message}");
+ AckMessageLater(messageView, attempt + 1, tcs);
+ return;
+ }
+
+ tcs.SetResult(true);
+
+ if (attempt > 1)
+ {
+ Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}, messageId={messageId}," +
+ $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+ $" requestId={requestId}");
+ }
+ else
+ {
+ Logger.LogDebug($"Successfully acked message, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
+ $" endpoints={endpoints}, requestId={requestId}");
+ }
+ }
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ private void AckMessageLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+ {
+ Task.Run(async () =>
+ {
+ try
+ {
+ await Task.Delay(AckMessageFailureBackoffDelay, _ackMsgCts.Token);
+ AckMessage(messageView, attempt + 1, tcs);
+ }
+ catch (Exception ex)
+ {
+ if (_ackMsgCts.IsCancellationRequested)
+ {
+ return;
+ }
+ Logger.LogError(ex, $"[Bug] Failed to schedule message ack request, mq={_mq}," +
+ $" messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
+ AckMessageLater(messageView, attempt + 1, tcs);
+ }
+ });
+ }
+
+ private Task NackMessage(MessageView messageView)
+ {
+ var deliveryAttempt = messageView.DeliveryAttempt;
+ var duration = _consumer.GetRetryPolicy().GetNextAttemptDelay(deliveryAttempt);
+ var tcs = new TaskCompletionSource<bool>();
+ ChangeInvisibleDuration(messageView, duration, 1, tcs);
+ return tcs.Task;
+ }
+
+ private void ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt,
+ TaskCompletionSource<bool> tcs)
+ {
+ var clientId = _consumer.GetClientId();
+ var consumerGroup = _consumer.GetConsumerGroup();
+ var messageId = messageView.MessageId;
+ var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+ var request = _consumer.WrapChangeInvisibleDuration(messageView, duration);
+ var task = _consumer.GetClientManager().ChangeInvisibleDuration(endpoints,
+ request, _consumer.GetClientConfig().RequestTimeout);
+ task.ContinueWith(responseTask =>
+ {
+ if (responseTask.IsFaulted)
+ {
+ Logger.LogError(responseTask.Exception, $"Exception raised while changing invisible" +
+ $" duration, would retry later, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}," +
+ $" messageId={messageId}, mq={_mq}," +
+ $" endpoints={endpoints}");
+ ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+ }
+ else
+ {
+ var invocation = responseTask.Result;
+ var requestId = invocation.RequestId;
+ var status = invocation.Response.Status;
+ var statusCode = status.Code;
+
+ if (statusCode == Code.InvalidReceiptHandle)
+ {
+ Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," +
+ $" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," +
+ $" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+ $" requestId={requestId}, status message={status.Message}");
+ tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
+ }
+
+ if (statusCode != Code.Ok)
+ {
+ Logger.LogError($"Failed to change invisible duration, would retry later," +
+ $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
+ $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
+ $" status message={status.Message}");
+ ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+ return;
+ }
+
+ tcs.SetResult(true);
+
+ if (attempt > 1)
+ {
+ Logger.LogInformation($"Finally, changed invisible duration successfully," +
+ $" clientId={clientId}, consumerGroup={consumerGroup}," +
+ $" messageId={messageId}, attempt={attempt}, mq={_mq}," +
+ $" endpoints={endpoints}, requestId={requestId}");
+ }
+ else
+ {
+ Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
+ $" endpoints={endpoints}, requestId={requestId}");
+ }
+ }
+ });
+ }
+
+ private void ChangeInvisibleDurationLater(MessageView messageView, TimeSpan duration, int attempt,
+ TaskCompletionSource<bool> tcs)
+ {
+ Task.Run(async () =>
+ {
+ try
+ {
+ await Task.Delay(ChangeInvisibleDurationFailureBackoffDelay, _changeInvisibleDurationCts.Token);
+ ChangeInvisibleDuration(messageView, duration, attempt, tcs);
+ }
+ catch (Exception ex)
+ {
+ if (_changeInvisibleDurationCts.IsCancellationRequested)
+ {
+ return;
+ }
+ Logger.LogError(ex, $"[Bug] Failed to schedule message change invisible duration request," +
+ $" mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
+ ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
+ }
+ });
+ }
+
+ public Task EraseFifoMessage(MessageView messageView, ConsumeResult consumeResult)
+ {
+ var retryPolicy = _consumer.GetRetryPolicy();
+ var maxAttempts = retryPolicy.GetMaxAttempts();
+ var attempt = messageView.DeliveryAttempt;
+ var messageId = messageView.MessageId;
+ var service = _consumer.GetConsumeService();
+ var clientId = _consumer.GetClientId();
+
+ if (consumeResult == ConsumeResult.FAILURE && attempt < maxAttempts)
+ {
+ var nextAttemptDelay = retryPolicy.GetNextAttemptDelay(attempt);
+ attempt = messageView.IncrementAndGetDeliveryAttempt();
+ Logger.LogDebug($"Prepare to redeliver the fifo message because of the consumption failure," +
+ $" maxAttempt={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
+ $" messageId={messageId}, nextAttemptDelay={nextAttemptDelay}, clientId={clientId}");
+ var redeliverTask = service.Consume(messageView, nextAttemptDelay);
+ _ = redeliverTask.ContinueWith(async t =>
+ {
+ var result = await t;
+ await EraseFifoMessage(messageView, result);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ else
+ {
+ var success = consumeResult == ConsumeResult.SUCCESS;
+ if (!success)
+ {
+ Logger.LogInformation($"Failed to consume fifo message finally, run out of attempt times," +
+ $" maxAttempts={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
+ $" messageId={messageId}, clientId={clientId}");
+ }
+
+ var task = ConsumeResult.SUCCESS.Equals(consumeResult)
+ ? AckMessage(messageView)
+ : ForwardToDeadLetterQueue(messageView);
+
+ _ = task.ContinueWith(_ => { EvictCache(messageView); },
+ TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ return Task.CompletedTask;
+ }
+
+ private Task ForwardToDeadLetterQueue(MessageView messageView)
+ {
+ var tcs = new TaskCompletionSource<bool>();
+ ForwardToDeadLetterQueue(messageView, 1, tcs);
+ return tcs.Task;
+ }
+
+ private void ForwardToDeadLetterQueue(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+ {
+ var clientId = _consumer.GetClientId();
+ var consumerGroup = _consumer.GetConsumerGroup();
+ var messageId = messageView.MessageId;
+ var endpoints = messageView.MessageQueue.Broker.Endpoints;
+
+ var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView);
+ var task = _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request,
+ _consumer.GetClientConfig().RequestTimeout);
+
+ task.ContinueWith(responseTask =>
+ {
+ if (responseTask.IsFaulted)
+ {
+ // Log failure and retry later.
+ Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " +
+ $"clientId={_consumer.GetClientId()}," +
+ $" consumerGroup={_consumer.GetConsumerGroup()}," +
+ $" messageId={messageView.MessageId}, mq={_mq}", responseTask.Exception);
+
+ ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
+ }
+ else
+ {
+ var invocation = responseTask.Result;
+ var requestId = invocation.RequestId;
+ var status = invocation.Response.Status;
+ var statusCode = status.Code;
+
+ // Log failure and retry later.
+ if (statusCode != Code.Ok)
+ {
+ Logger.LogError($"Failed to forward message to dead letter queue," +
+ $" would attempt to re-forward later, clientId={clientId}," +
+ $" consumerGroup={consumerGroup}, messageId={messageId}," +
+ $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
+ $" requestId={requestId}, code={statusCode}," +
+ $" status message={status.Message}");
+ ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
+ return;
+ }
+
+ tcs.SetResult(true);
+
+ // Log success.
+ if (attempt > 1)
+ {
+ Logger.LogInformation($"Re-forward message to dead letter queue successfully, " +
+ $"clientId={clientId}, consumerGroup={consumerGroup}," +
+ $" attempt={attempt}, messageId={messageId}, mq={_mq}," +
+ $" endpoints={endpoints}, requestId={requestId}");
+ }
+ else
+ {
+ Logger.LogInformation($"Forward message to dead letter queue successfully, " +
+ $"clientId={clientId}, consumerGroup={consumerGroup}," +
+ $" messageId={messageId}, mq={_mq}, endpoints={endpoints}," +
+ $" requestId={requestId}");
+ }
+ }
+ });
+ }
+
+ private void ForwardToDeadLetterQueueLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
+ {
+ Task.Run(async () =>
+ {
+ try
+ {
+ await Task.Delay(ForwardMessageToDeadLetterQueueFailureBackoffDelay,
+ _forwardMessageToDeadLetterQueueCts.Token);
+ ForwardToDeadLetterQueue(messageView, attempt, tcs);
+ }
+ catch (Exception ex)
+ {
+ // Should never reach here.
+ Logger.LogError($"[Bug] Failed to schedule DLQ message request, " +
+ $"mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}", ex);
+
+ ForwardToDeadLetterQueueLater(messageView, attempt + 1, tcs);
+ }
+ });
+ }
+
+ /// <summary>
+ /// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly.
+ /// </summary>
+ /// <param name="messageView">the message to discard.</param>
+ public void DiscardMessage(MessageView messageView)
+ {
+ Logger.LogInformation($"Discard message, mq={_mq}, messageId={messageView.MessageId}," +
+ $" clientId={_consumer.GetClientId()}");
+ var task = NackMessage(messageView);
+ _ = task.ContinueWith(_ =>
+ {
+ EvictCache(messageView);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ /// <summary>
+ /// Discard the message(FIFO-consume-mode) which could not consumed properly.
+ /// </summary>
+ /// <param name="messageView">the FIFO message to discard.</param>
+ public void DiscardFifoMessage(MessageView messageView)
+ {
+ Logger.LogInformation($"Discard fifo message, mq={_mq}, messageId={messageView.MessageId}," +
+ $" clientId={_consumer.GetClientId()}");
+ var task = ForwardToDeadLetterQueue(messageView);
+ _ = task.ContinueWith(_ =>
+ {
+ EvictCache(messageView);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ private void EvictCache(MessageView messageView)
+ {
+ _cachedMessageLock.EnterWriteLock();
+ try
+ {
+ if (_cachedMessages.Remove(messageView))
+ {
+ Interlocked.Add(ref _cachedMessagesBytes, -messageView.Body.Length);
+ }
+ }
+ finally
+ {
+ _cachedMessageLock.ExitWriteLock();
+ }
+ }
+
+ public int CachedMessagesCount()
+ {
+ _cachedMessageLock.EnterReadLock();
+ try
+ {
+ return _cachedMessages.Count;
+ }
+ finally
+ {
+ _cachedMessageLock.ExitReadLock();
+ }
+ }
+
+ public long CachedMessageBytes()
+ {
+ return Interlocked.Read(ref _cachedMessagesBytes);
+ }
+
+ /// <summary>
+ /// Get the count of cached messages.
+ /// </summary>
+ /// <returns>count of pending messages.</returns>
+ public long GetCachedMessageCount()
+ {
+ _cachedMessageLock.EnterReadLock();
+ try
+ {
+ return _cachedMessages.Count;
+ }
+ finally
+ {
+ _cachedMessageLock.ExitReadLock();
+ }
+ }
+
+ /// <summary>
+ /// Get the bytes of cached message memory footprint.
+ /// </summary>
+ /// <returns>bytes of cached message memory footprint.</returns>
+ public long GetCachedMessageBytes()
+ {
+ return _cachedMessagesBytes;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 136bdad..24f1a0a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -21,28 +21,31 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
+using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto = Apache.Rocketmq.V2;
using Org.Apache.Rocketmq.Error;
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public class Producer : Client, IAsyncDisposable, IDisposable
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<Producer>();
- private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
+ internal readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
internal readonly PublishingSettings PublishingSettings;
private readonly ConcurrentDictionary<string, bool> _publishingTopics;
private readonly ITransactionChecker _checker;
private readonly Histogram<double> _sendCostTimeHistogram;
- private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
+ internal Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
int maxAttempts, ITransactionChecker checker) : base(clientConfig)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
- PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy,
+ PublishingSettings = new PublishingSettings(ClientConfig.Namespace, ClientId, Endpoints, retryPolicy,
clientConfig.RequestTimeout, publishingTopics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
_publishingTopics = publishingTopics;
@@ -102,7 +105,7 @@
}
}
- protected override Proto::HeartbeatRequest WrapHeartbeatRequest()
+ internal override Proto::HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
@@ -110,7 +113,7 @@
};
}
- protected override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+ internal override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new Proto::NotifyClientTerminationRequest();
}
@@ -192,11 +195,11 @@
return sendReceipt;
}
- private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
+ private Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
{
return new Proto.SendMessageRequest
{
- Messages = { message.ToProtobuf(mq.QueueId) }
+ Messages = { message.ToProtobuf(ClientConfig.Namespace, mq.QueueId) }
};
}
@@ -331,6 +334,7 @@
{
var topicResource = new Proto.Resource
{
+ ResourceNamespace = ClientConfig.Namespace,
Name = topic
};
var request = new Proto.EndTransactionRequest
diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs b/csharp/rocketmq-client-csharp/PublishingMessage.cs
index 7937d93..d214598 100644
--- a/csharp/rocketmq-client-csharp/PublishingMessage.cs
+++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs
@@ -76,7 +76,7 @@
MessageType = MessageType.Transaction;
}
- public Proto::Message ToProtobuf(int queueId)
+ public Proto::Message ToProtobuf(string namespaceName, int queueId)
{
var systemProperties = new Proto.SystemProperties
{
@@ -105,6 +105,7 @@
var topicResource = new Proto.Resource
{
+ ResourceNamespace = namespaceName,
Name = Topic
};
return new Proto.Message
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index a5ab74d..cdd8418 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -31,8 +31,8 @@
private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
private volatile bool _validateMessageType = true;
- public PublishingSettings(string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
- TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer,
+ public PublishingSettings(string namespaceName, string clientId, Endpoints endpoints, IRetryPolicy retryPolicy,
+ TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(namespaceName, clientId, ClientType.Producer,
endpoints, retryPolicy, requestTimeout)
{
Topics = topics;
@@ -66,7 +66,8 @@
public override Proto.Settings ToProtobuf()
{
- var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList();
+ var topics = Topics.Select(topic =>
+ new Proto.Resource { ResourceNamespace = Namespace, Name = topic.Key }).ToList();
var publishing = new Proto.Publishing();
publishing.Topics.Add(topics);
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..c46523f
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Schedulers;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+using Microsoft.Extensions.Logging;
+
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
+namespace Org.Apache.Rocketmq
+{
+ public class PushConsumer : Consumer, IAsyncDisposable, IDisposable
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<PushConsumer>();
+
+ private static readonly TimeSpan AssignmentScanScheduleDelay = TimeSpan.FromSeconds(1);
+ private static readonly TimeSpan AssignmentScanSchedulePeriod = TimeSpan.FromSeconds(5);
+
+ private readonly ClientConfig _clientConfig;
+ private readonly PushSubscriptionSettings _pushSubscriptionSettings;
+ private readonly string _consumerGroup;
+ private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+ private readonly ConcurrentDictionary<string, Assignments> _cacheAssignments;
+ private readonly IMessageListener _messageListener;
+ private readonly int _maxCacheMessageCount;
+ private readonly int _maxCacheMessageSizeInBytes;
+
+ private readonly ConcurrentDictionary<MessageQueue, ProcessQueue> _processQueueTable;
+ private ConsumeService _consumeService;
+ private readonly TaskScheduler _consumptionTaskScheduler;
+ private readonly CancellationTokenSource _consumptionCts;
+
+ private readonly CancellationTokenSource _scanAssignmentCts;
+
+ private readonly CancellationTokenSource _receiveMsgCts;
+ private readonly CancellationTokenSource _ackMsgCts;
+ private readonly CancellationTokenSource _changeInvisibleDurationCts;
+ private readonly CancellationTokenSource _forwardMsgToDeadLetterQueueCts;
+
+ /// <summary>
+ /// The caller is supposed to have validated the arguments and handled throwing exception or
+ /// logging warnings already, so we avoid repeating args check here.
+ /// </summary>
+ public PushConsumer(ClientConfig clientConfig, string consumerGroup,
+ ConcurrentDictionary<string, FilterExpression> subscriptionExpressions, IMessageListener messageListener,
+ int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount)
+ : base(clientConfig, consumerGroup)
+ {
+ _clientConfig = clientConfig;
+ _consumerGroup = consumerGroup;
+ _subscriptionExpressions = subscriptionExpressions;
+ _pushSubscriptionSettings = new PushSubscriptionSettings(_clientConfig.Namespace, ClientId, Endpoints, consumerGroup,
+ clientConfig.RequestTimeout, subscriptionExpressions);
+ _cacheAssignments = new ConcurrentDictionary<string, Assignments>();
+ _messageListener = messageListener;
+ _maxCacheMessageCount = maxCacheMessageCount;
+ _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+
+ _scanAssignmentCts = new CancellationTokenSource();
+
+ _processQueueTable = new ConcurrentDictionary<MessageQueue, ProcessQueue>();
+ _consumptionTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(consumptionThreadCount);
+ _consumptionCts = new CancellationTokenSource();
+
+ _receiveMsgCts = new CancellationTokenSource();
+ _ackMsgCts = new CancellationTokenSource();
+ _changeInvisibleDurationCts = new CancellationTokenSource();
+ _forwardMsgToDeadLetterQueueCts = new CancellationTokenSource();
+ }
+
+ protected override async Task Start()
+ {
+ try
+ {
+ State = State.Starting;
+ Logger.LogInformation($"Begin to start the rocketmq push consumer, clientId={ClientId}");
+ await base.Start();
+ _consumeService = CreateConsumerService();
+ ScheduleWithFixedDelay(ScanAssignments, AssignmentScanScheduleDelay, AssignmentScanSchedulePeriod,
+ _scanAssignmentCts.Token);
+ Logger.LogInformation($"The rocketmq push consumer starts successfully, clientId={ClientId}");
+ State = State.Running;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await Shutdown().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose()
+ {
+ Shutdown().Wait();
+ GC.SuppressFinalize(this);
+ }
+
+ protected override async Task Shutdown()
+ {
+ try
+ {
+ State = State.Stopping;
+ Logger.LogInformation($"Begin to shutdown the rocketmq push consumer, clientId={ClientId}");
+ _receiveMsgCts.Cancel();
+ _ackMsgCts.Cancel();
+ _changeInvisibleDurationCts.Cancel();
+ _forwardMsgToDeadLetterQueueCts.Cancel();
+ _scanAssignmentCts.Cancel();
+ await base.Shutdown();
+ _consumptionCts.Cancel();
+ Logger.LogInformation($"Shutdown the rocketmq push consumer successfully, clientId={ClientId}");
+ State = State.Terminated;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
+ }
+
+ private ConsumeService CreateConsumerService()
+ {
+ if (_pushSubscriptionSettings.IsFifo())
+ {
+ Logger.LogInformation(
+ $"Create FIFO consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
+ return new FifoConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
+ }
+ Logger.LogInformation(
+ $"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
+ return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
+ }
+
+ /// <summary>
+ /// Adds a subscription expression dynamically.
+ /// </summary>
+ /// <param name="filterExpression">The new filter expression to add.</param>
+ /// <returns>The push consumer instance.</returns>
+ public async Task Subscribe(string topic, FilterExpression filterExpression)
+ {
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Push consumer is not running");
+ }
+
+ await GetRouteData(topic);
+ _subscriptionExpressions[topic] = filterExpression;
+ }
+
+ /// <summary>
+ /// Removes a subscription expression dynamically by topic.
+ /// </summary>
+ /// <remarks>
+ /// It stops the backend task to fetch messages from the server.
+ /// The locally cached messages whose topic was removed before would not be delivered
+ /// to the <see cref="IMessageListener"/> anymore.
+ ///
+ /// Nothing occurs if the specified topic does not exist in subscription expressions
+ /// of the push consumer.
+ /// </remarks>
+ /// <param name="topic">The topic to remove the subscription.</param>
+ /// <returns>The push consumer instance.</returns>
+ public void Unsubscribe(string topic)
+ {
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Push consumer is not running");
+ }
+
+ _subscriptionExpressions.TryRemove(topic, out _);
+ }
+
+ internal void ScanAssignments()
+ {
+ try
+ {
+ Logger.LogDebug($"Start to scan assignments periodically, clientId={ClientId}");
+ foreach (var (topic, filterExpression) in _subscriptionExpressions)
+ {
+ var existed = _cacheAssignments.GetValueOrDefault(topic);
+
+ var queryAssignmentTask = QueryAssignment(topic);
+ queryAssignmentTask.ContinueWith(task =>
+ {
+ if (task.IsFaulted)
+ {
+ Logger.LogError(task.Exception, "Exception raised while scanning the assignments," +
+ $" topic={topic}, clientId={ClientId}");
+ return;
+ }
+
+ var latest = task.Result;
+ if (latest.GetAssignmentList().Count == 0)
+ {
+ if (existed == null || existed.GetAssignmentList().Count == 0)
+ {
+ Logger.LogInformation("Acquired empty assignments from remote, would scan later," +
+ $" topic={topic}, clientId={ClientId}");
+ return;
+ }
+
+ Logger.LogInformation("Attention!!! acquired empty assignments from remote, but" +
+ $" existed assignments are not empty, topic={topic}," +
+ $" clientId={ClientId}");
+ }
+
+ if (!latest.Equals(existed))
+ {
+ Logger.LogInformation($"Assignments of topic={topic} has changed, {existed} =>" +
+ $" {latest}, clientId={ClientId}");
+ SyncProcessQueue(topic, latest, filterExpression);
+ _cacheAssignments[topic] = latest;
+ return;
+ }
+
+ Logger.LogDebug($"Assignments of topic={topic} remain the same," +
+ $" assignments={existed}, clientId={ClientId}");
+ // Process queue may be dropped, need to be synchronized anyway.
+ SyncProcessQueue(topic, latest, filterExpression);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Exception raised while scanning the assignments for all topics, clientId={ClientId}");
+ }
+ }
+
+ private void SyncProcessQueue(string topic, Assignments assignments, FilterExpression filterExpression)
+ {
+ var latest = new HashSet<MessageQueue>();
+ var assignmentList = assignments.GetAssignmentList();
+ foreach (var assignment in assignmentList)
+ {
+ latest.Add(assignment.MessageQueue);
+ }
+
+ var activeMqs = new HashSet<MessageQueue>();
+ foreach (var (mq, pq) in _processQueueTable)
+ {
+ if (!topic.Equals(mq.Topic))
+ {
+ continue;
+ }
+
+ if (!latest.Contains(mq))
+ {
+ Logger.LogInformation($"Drop message queue according to the latest assignmentList," +
+ $" mq={mq}, clientId={ClientId}");
+ DropProcessQueue(mq);
+ continue;
+ }
+
+ if (pq.Expired())
+ {
+ Logger.LogWarning($"Drop message queue because it is expired," +
+ $" mq={mq}, clientId={ClientId}");
+ DropProcessQueue(mq);
+ continue;
+ }
+ activeMqs.Add(mq);
+ }
+
+ foreach (var mq in latest)
+ {
+ if (activeMqs.Contains(mq))
+ {
+ continue;
+ }
+ var processQueue = CreateProcessQueue(mq, filterExpression);
+ if (processQueue != null)
+ {
+ Logger.LogInformation($"Start to fetch message from remote, mq={mq}, clientId={ClientId}");
+ processQueue.FetchMessageImmediately();
+ }
+ }
+ }
+
+ internal Task<Assignments> QueryAssignment(string topic)
+ {
+ var pickEndpointsTask = PickEndpointsToQueryAssignments(topic);
+ return pickEndpointsTask.ContinueWith(task0 =>
+ {
+ if (task0 is { IsFaulted: true, Exception: { } })
+ {
+ throw task0.Exception;
+ }
+
+ var endpoints = task0.Result;
+ var request = WrapQueryAssignmentRequest(topic);
+ var requestTimeout = _clientConfig.RequestTimeout;
+ var queryAssignmentTask = ClientManager.QueryAssignment(endpoints, request, requestTimeout);
+
+ return queryAssignmentTask.ContinueWith(task1 =>
+ {
+ if (task1 is { IsFaulted: true, Exception: { } })
+ {
+ throw task1.Exception;
+ }
+
+ var response = task1.Result.Response;
+ var status = response.Status;
+ StatusChecker.Check(status, request, task1.Result.RequestId);
+ var assignmentList = response.Assignments
+ .Select(assignment => new Assignment(new MessageQueue(assignment.MessageQueue)))
+ .ToList();
+ return Task.FromResult(new Assignments(assignmentList));
+ }, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
+ }, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
+ }
+
+ private Task<Endpoints> PickEndpointsToQueryAssignments(string topic)
+ {
+ var getRouteDataTask = GetRouteData(topic);
+ return getRouteDataTask.ContinueWith(task =>
+ {
+ if (task is { IsFaulted: true, Exception: { } })
+ {
+ throw task.Exception;
+ }
+
+ var topicRouteData = task.Result;
+ return topicRouteData.PickEndpointsToQueryAssignments();
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
+ private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic)
+ {
+ var topicResource = new Proto.Resource
+ {
+ ResourceNamespace = _clientConfig.Namespace,
+ Name = topic
+ };
+ return new QueryAssignmentRequest
+ {
+ Topic = topicResource,
+ Group = GetProtobufGroup(),
+ Endpoints = Endpoints.ToProtobuf()
+ };
+ }
+
+ /// <summary>
+ /// Drops the <see cref="ProcessQueue"/> by <see cref="MessageQueue"/>.
+ /// <see cref="ProcessQueue"/> must be removed before it is dropped.
+ /// </summary>
+ /// <param name="mq">The message queue.</param>
+ internal void DropProcessQueue(MessageQueue mq)
+ {
+ if (_processQueueTable.TryRemove(mq, out var pq))
+ {
+ pq.Drop();
+ }
+ }
+
+ /// <summary>
+ /// Creates a process queue and adds it into the <see cref="_processQueueTable"/>.
+ /// Returns <see cref="ProcessQueue"/> if the mapped process queue already exists.
+ /// </summary>
+ /// <remarks>
+ /// This function and <see cref="DropProcessQueue"/> ensure that a process queue is not dropped if
+ /// it is contained in <see cref="_processQueueTable"/>. Once a process queue is dropped, it must have been
+ /// removed from <see cref="_processQueueTable"/>.
+ /// </remarks>
+ /// <param name="mq">The message queue.</param>
+ /// <param name="filterExpression">The filter expression of the topic.</param>
+ /// <returns>A process queue.</returns>
+ protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression)
+ {
+ var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts,
+ _changeInvisibleDurationCts, _forwardMsgToDeadLetterQueueCts);
+ if (_processQueueTable.TryGetValue(mq, out var previous))
+ {
+ return null;
+ }
+ _processQueueTable.TryAdd(mq, processQueue);
+ return processQueue;
+ }
+
+ public async Task AckMessage(MessageView messageView)
+ {
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Push consumer is not running");
+ }
+
+ var request = WrapAckMessageRequest(messageView);
+ var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+ ClientConfig.RequestTimeout);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
+ }
+
+ protected override IEnumerable<string> GetTopics()
+ {
+ return _subscriptionExpressions.Keys;
+ }
+
+ internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
+ {
+ return new Proto::HeartbeatRequest
+ {
+ ClientType = Proto.ClientType.PushConsumer,
+ Group = GetProtobufGroup()
+ };
+ }
+
+ protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView,
+ TimeSpan invisibleDuration)
+ {
+ var topicResource = new Proto.Resource
+ {
+ ResourceNamespace = _clientConfig.Namespace,
+ Name = messageView.Topic
+ };
+ return new Proto.ChangeInvisibleDurationRequest
+ {
+ Topic = topicResource,
+ Group = GetProtobufGroup(),
+ ReceiptHandle = messageView.ReceiptHandle,
+ InvisibleDuration = Duration.FromTimeSpan(invisibleDuration),
+ MessageId = messageView.MessageId
+ };
+ }
+
+ protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageView)
+ {
+ var topicResource = new Proto.Resource
+ {
+ ResourceNamespace = _clientConfig.Namespace,
+ Name = messageView.Topic
+ };
+ var entry = new Proto.AckMessageEntry
+ {
+ MessageId = messageView.MessageId,
+ ReceiptHandle = messageView.ReceiptHandle,
+ };
+ return new Proto.AckMessageRequest
+ {
+ Group = GetProtobufGroup(),
+ Topic = topicResource,
+ Entries = { entry }
+ };
+ }
+
+ protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDeadLetterQueueRequest(MessageView messageView)
+ {
+ var topicResource = new Proto.Resource
+ {
+ ResourceNamespace = _clientConfig.Namespace,
+ Name = messageView.Topic
+ };
+
+ return new ForwardMessageToDeadLetterQueueRequest
+ {
+ Group = GetProtobufGroup(),
+ Topic = topicResource,
+ ReceiptHandle = messageView.ReceiptHandle,
+ MessageId = messageView.MessageId,
+ DeliveryAttempt = messageView.DeliveryAttempt,
+ MaxDeliveryAttempts = GetRetryPolicy().GetMaxAttempts()
+ };
+ }
+
+ protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+ {
+ }
+
+ internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command)
+ {
+ var nonce = command.Nonce;
+ var messageView = MessageView.FromProtobuf(command.Message);
+ var messageId = messageView.MessageId;
+ Proto.TelemetryCommand telemetryCommand = null;
+
+ try
+ {
+ var consumeResult = await _consumeService.Consume(messageView);
+ var code = consumeResult == ConsumeResult.SUCCESS ? Code.Ok : Code.FailedToConsumeMessage;
+ var status = new Status
+ {
+ Code = code
+ };
+ var verifyMessageResult = new VerifyMessageResult
+ {
+ Nonce = nonce
+ };
+ telemetryCommand = new TelemetryCommand
+ {
+ VerifyMessageResult = verifyMessageResult,
+ Status = status
+ };
+ var (_, session) = GetSession(endpoints);
+ await session.WriteAsync(telemetryCommand);
+ }
+ catch (Exception e)
+ {
+ Logger.LogError(e,
+ $"Failed to send message verification result command, endpoints={Endpoints}, command={telemetryCommand}, messageId={messageId}, clientId={ClientId}");
+ }
+ }
+
+ internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+ {
+ return new NotifyClientTerminationRequest()
+ {
+ Group = GetProtobufGroup()
+ };
+ }
+
+ internal int GetQueueSize()
+ {
+ return _processQueueTable.Count;
+ }
+
+ internal int CacheMessageBytesThresholdPerQueue()
+ {
+ var size = this.GetQueueSize();
+ // All process queues are removed, no need to cache messages.
+ return size <= 0 ? 0 : Math.Max(1, _maxCacheMessageSizeInBytes / size);
+ }
+
+ internal int CacheMessageCountThresholdPerQueue()
+ {
+ var size = this.GetQueueSize();
+ // All process queues are removed, no need to cache messages.
+ if (size <= 0)
+ {
+ return 0;
+ }
+
+ return Math.Max(1, _maxCacheMessageCount / size);
+ }
+
+ internal override Settings GetSettings()
+ {
+ return _pushSubscriptionSettings;
+ }
+
+ /// <summary>
+ /// Gets the load balancing group for the consumer.
+ /// </summary>
+ /// <returns>The consumer load balancing group.</returns>
+ public string GetConsumerGroup()
+ {
+ return _consumerGroup;
+ }
+
+ public PushSubscriptionSettings GetPushConsumerSettings()
+ {
+ return _pushSubscriptionSettings;
+ }
+
+ /// <summary>
+ /// Lists the existing subscription expressions in the push consumer.
+ /// </summary>
+ /// <returns>Collections of the subscription expressions.</returns>
+ public ConcurrentDictionary<string, FilterExpression> GetSubscriptionExpressions()
+ {
+ return _subscriptionExpressions;
+ }
+
+ public IRetryPolicy GetRetryPolicy()
+ {
+ return _pushSubscriptionSettings.GetRetryPolicy();
+ }
+
+ public ConsumeService GetConsumeService()
+ {
+ return _consumeService;
+ }
+
+ private Proto.Resource GetProtobufGroup()
+ {
+ return new Proto.Resource()
+ {
+ ResourceNamespace = _clientConfig.Namespace,
+ Name = ConsumerGroup
+ };
+ }
+
+ public class Builder
+ {
+ private ClientConfig _clientConfig;
+ private string _consumerGroup;
+ private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+ private IMessageListener _messageListener;
+ private int _maxCacheMessageCount = 1024;
+ private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
+ private int _consumptionThreadCount = 20;
+
+ public Builder SetClientConfig(ClientConfig clientConfig)
+ {
+ Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null");
+ _clientConfig = clientConfig;
+ return this;
+ }
+
+ public Builder SetConsumerGroup(string consumerGroup)
+ {
+ Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null");
+ Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success,
+ $"topic does not match the regex {ConsumerGroupRegex}");
+ _consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions)
+ {
+ Preconditions.CheckArgument(null != subscriptionExpressions,
+ "subscriptionExpressions should not be null");
+ Preconditions.CheckArgument(subscriptionExpressions!.Count != 0,
+ "subscriptionExpressions should not be empty");
+ _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions!);
+ return this;
+ }
+
+ public Builder SetMessageListener(IMessageListener messageListener)
+ {
+ Preconditions.CheckArgument(null != messageListener,
+ "messageListener should not be null");
+ _messageListener = messageListener;
+ return this;
+ }
+
+ public Builder SetMaxCacheMessageCount(int maxCacheMessageCount)
+ {
+ Preconditions.CheckArgument(maxCacheMessageCount > 0,
+ "maxCacheMessageCount should be positive");
+ _maxCacheMessageCount = maxCacheMessageCount;
+ return this;
+ }
+
+ public Builder SetMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes)
+ {
+ Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0,
+ "maxCacheMessageSizeInBytes should be positive");
+ _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+ return this;
+ }
+
+ public Builder SetConsumptionThreadCount(int consumptionThreadCount)
+ {
+ Preconditions.CheckArgument(consumptionThreadCount > 0,
+ "consumptionThreadCount should be positive");
+ _consumptionThreadCount = consumptionThreadCount;
+ return this;
+ }
+
+ public async Task<PushConsumer> Build()
+ {
+ Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet");
+ Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet");
+ Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty,
+ "subscriptionExpressions has not been set yet");
+ Preconditions.CheckArgument(null != _messageListener, "messageListener has not been set yet");
+ var pushConsumer = new PushConsumer(_clientConfig, _consumerGroup, _subscriptionExpressions,
+ _messageListener, _maxCacheMessageCount,
+ _maxCacheMessageSizeInBytes, _consumptionThreadCount);
+ await pushConsumer.Start();
+ return pushConsumer;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
new file mode 100644
index 0000000..b2ff519
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.Extensions.Logging;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class PushSubscriptionSettings : Settings
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<PushSubscriptionSettings>();
+
+ private readonly Resource _group;
+ private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+ private volatile bool _fifo = false;
+ private volatile int _receiveBatchSize = 32;
+ private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30);
+
+ public PushSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
+ TimeSpan requestTimeout, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions)
+ : base(namespaceName, clientId, ClientType.PushConsumer, endpoints, requestTimeout)
+ {
+ _group = new Resource(namespaceName, consumerGroup);
+ _subscriptionExpressions = subscriptionExpressions;
+ }
+
+ public bool IsFifo()
+ {
+ return _fifo;
+ }
+
+ public int GetReceiveBatchSize()
+ {
+ return _receiveBatchSize;
+ }
+
+ public TimeSpan GetLongPollingTimeout()
+ {
+ return _longPollingTimeout;
+ }
+
+ public override Proto.Settings ToProtobuf()
+ {
+ var subscriptionEntries = new List<Proto.SubscriptionEntry>();
+ foreach (var (key, value) in _subscriptionExpressions)
+ {
+ var topic = new Proto.Resource()
+ {
+ ResourceNamespace = Namespace,
+ Name = key
+ };
+ var filterExpression = new Proto.FilterExpression()
+ {
+ Expression = value.Expression
+ };
+ switch (value.Type)
+ {
+ case ExpressionType.Tag:
+ filterExpression.Type = Proto.FilterType.Tag;
+ break;
+ case ExpressionType.Sql92:
+ filterExpression.Type = Proto.FilterType.Sql;
+ break;
+ default:
+ Logger.LogWarning($"[Bug] Unrecognized filter type={value.Type} for push consumer");
+ break;
+ }
+
+ var subscriptionEntry = new Proto.SubscriptionEntry
+ {
+ Topic = topic,
+ Expression = filterExpression
+ };
+
+ subscriptionEntries.Add(subscriptionEntry);
+ }
+
+ var subscription = new Proto.Subscription
+ {
+ Group = _group.ToProtobuf(),
+ Subscriptions = { subscriptionEntries }
+ };
+
+ return new Proto.Settings
+ {
+ AccessPoint = Endpoints.ToProtobuf(),
+ ClientType = ClientTypeHelper.ToProtobuf(ClientType),
+ RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
+ Subscription = subscription,
+ UserAgent = UserAgent.Instance.ToProtobuf()
+ };
+ }
+
+ public override void Sync(Proto.Settings settings)
+ {
+ if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase)
+ {
+ Logger.LogError($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " +
+ $"pubSubCase={settings.PubSubCase}, clientType={ClientType}");
+ }
+
+ var subscription = settings.Subscription ?? new Proto.Subscription();
+ _fifo = subscription.Fifo;
+ _receiveBatchSize = subscription.ReceiveBatchSize;
+ _longPollingTimeout = subscription.LongPollingTimeout?.ToTimeSpan() ?? TimeSpan.Zero;
+ var backoffPolicy = settings.BackoffPolicy ?? new Proto.RetryPolicy();
+ switch (backoffPolicy.StrategyCase)
+ {
+ case Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff:
+ RetryPolicy = ExponentialBackoffRetryPolicy.FromProtobuf(backoffPolicy);
+ break;
+ case Proto.RetryPolicy.StrategyOneofCase.CustomizedBackoff:
+ RetryPolicy = CustomizedBackoffRetryPolicy.FromProtobuf(backoffPolicy);
+ break;
+ default:
+ throw new ArgumentException("Unrecognized backoff policy strategy.");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs
index a0f27df..e2847e7 100644
--- a/csharp/rocketmq-client-csharp/Resource.cs
+++ b/csharp/rocketmq-client-csharp/Resource.cs
@@ -15,12 +15,19 @@
* limitations under the License.
*/
+using System;
using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
- public class Resource
+ public class Resource : IEquatable<Resource>
{
+ public Resource(string namespaceName, string name)
+ {
+ Namespace = namespaceName;
+ Name = name;
+ }
+
public Resource(Proto.Resource resource)
{
Namespace = resource.ResourceNamespace;
@@ -33,7 +40,7 @@
Name = name;
}
- private string Namespace { get; }
+ public string Namespace { get; }
public string Name { get; }
public Proto.Resource ToProtobuf()
@@ -45,9 +52,46 @@
};
}
+ public bool Equals(Resource other)
+ {
+ if (ReferenceEquals(null, other))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, other))
+ {
+ return true;
+ }
+
+ return Name == other.Name && Namespace == other.Namespace;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ return obj.GetType() == GetType() && Equals((Resource)obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return HashCode.Combine(Namespace, Name);
+ }
+
public override string ToString()
{
return string.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}";
}
+
+
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs
index 0ee95fb..d504a7a 100644
--- a/csharp/rocketmq-client-csharp/Settings.cs
+++ b/csharp/rocketmq-client-csharp/Settings.cs
@@ -22,15 +22,17 @@
{
public abstract class Settings
{
+ protected readonly string Namespace;
protected readonly string ClientId;
protected readonly ClientType ClientType;
protected readonly Endpoints Endpoints;
protected volatile IRetryPolicy RetryPolicy;
protected readonly TimeSpan RequestTimeout;
- protected Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
+ protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
TimeSpan requestTimeout)
{
+ Namespace = namespaceName;
ClientId = clientId;
ClientType = clientType;
Endpoints = endpoints;
@@ -38,8 +40,9 @@
RequestTimeout = requestTimeout;
}
- protected Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
+ protected Settings(string namespaceName, string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
{
+ Namespace = namespaceName;
ClientId = clientId;
ClientType = clientType;
Endpoints = endpoints;
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 949ac49..dfd1cc0 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -42,6 +42,7 @@
dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
dictionary.Add(MetadataConstants.RequestIdKey, Guid.NewGuid().ToString());
dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
+ dictionary.Add(MetadataConstants.NamespaceKey, client.GetClientConfig().Namespace);
var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
dictionary.Add(MetadataConstants.DateTimeKey, time);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d16a8c5..1ede707 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -36,6 +36,8 @@
private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings;
private int _topicRoundRobinIndex;
+ private readonly ClientConfig _clientConfig;
+
public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup,
awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions))
@@ -48,9 +50,10 @@
_awaitDuration = awaitDuration;
_subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>();
_subscriptionExpressions = subscriptionExpressions;
- _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, Endpoints,
+ _simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfig.Namespace, ClientId, Endpoints,
ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions);
_topicRoundRobinIndex = 0;
+ _clientConfig = clientConfig;
}
public async Task Subscribe(string topic, FilterExpression filterExpression)
@@ -125,7 +128,7 @@
return _subscriptionExpressions.Keys;
}
- protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
+ internal override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new Proto.NotifyClientTerminationRequest()
{
@@ -133,7 +136,7 @@
};
}
- protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
+ internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
@@ -209,7 +212,7 @@
return receiveMessageResult.Messages;
}
- public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
+ public async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
{
if (State.Running != State)
{
@@ -240,6 +243,7 @@
{
var topicResource = new Proto.Resource
{
+ ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
var entry = new Proto.AckMessageEntry
@@ -260,6 +264,7 @@
{
var topicResource = new Proto.Resource
{
+ ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
return new Proto.ChangeInvisibleDurationRequest
@@ -276,6 +281,7 @@
{
return new Proto.Resource()
{
+ ResourceNamespace = _clientConfig.Namespace,
Name = ConsumerGroup
};
}
diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
index 2b214fa..c5a0dfa 100644
--- a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
+++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
@@ -32,12 +32,12 @@
private readonly TimeSpan _longPollingTimeout;
private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
- public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
+ public SimpleSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup,
TimeSpan requestTimeout, TimeSpan longPollingTimeout,
ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(
- clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
+ namespaceName, clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
{
- _group = new Resource(consumerGroup);
+ _group = new Resource(namespaceName, consumerGroup);
_longPollingTimeout = longPollingTimeout;
_subscriptionExpressions = subscriptionExpressions;
}
@@ -58,6 +58,7 @@
{
var topic = new Proto.Resource()
{
+ ResourceNamespace = Namespace,
Name = key,
};
var subscriptionEntry = new Proto.SubscriptionEntry();
@@ -77,6 +78,7 @@
filterExpression.Expression = value.Expression;
subscriptionEntry.Topic = topic;
+ subscriptionEntry.Expression = filterExpression;
subscriptionEntries.Add(subscriptionEntry);
}
diff --git a/csharp/rocketmq-client-csharp/StandardConsumeService.cs b/csharp/rocketmq-client-csharp/StandardConsumeService.cs
new file mode 100644
index 0000000..da753e0
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/StandardConsumeService.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Org.Apache.Rocketmq
+{
+ public class StandardConsumeService : ConsumeService
+ {
+ private static readonly ILogger Logger = MqLogManager.CreateLogger<StandardConsumeService>();
+
+ public StandardConsumeService(string clientId, IMessageListener messageListener,
+ TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken) :
+ base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken)
+ {
+ }
+
+ public override void Consume(ProcessQueue pq, List<MessageView> messageViews)
+ {
+ foreach (var messageView in messageViews)
+ {
+ if (messageView.IsCorrupted())
+ {
+ Logger.LogError("Message is corrupted for standard consumption, prepare to discard it," +
+ $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
+ pq.DiscardMessage(messageView);
+ continue;
+ }
+
+ var consumeTask = Consume(messageView);
+
+ consumeTask.ContinueWith(task =>
+ {
+ if (task.IsFaulted)
+ {
+ // Should never reach here.
+ Logger.LogError(task.Exception,
+ $"[Bug] Exception raised in consumption callback, clientId={ClientId}");
+ }
+ else
+ {
+ pq.EraseMessage(messageView, task.Result);
+ }
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 885db5f..950aa09 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -18,12 +18,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
+using Org.Apache.Rocketmq.Error;
using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class TopicRouteData : IEquatable<TopicRouteData>
{
+ private int _index = 0;
+
public TopicRouteData(IEnumerable<Proto.MessageQueue> messageQueues)
{
var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList();
@@ -33,6 +37,36 @@
public List<MessageQueue> MessageQueues { get; }
+ public Endpoints PickEndpointsToQueryAssignments()
+ {
+ var nextIndex = Interlocked.Increment(ref _index) - 1;
+ foreach (var mq in MessageQueues)
+ {
+ var modIndex = Mod(nextIndex++, MessageQueues.Count);
+ var curMessageQueue = MessageQueues[modIndex];
+
+ if (Utilities.MasterBrokerId != curMessageQueue.Broker.Id)
+ {
+ continue;
+ }
+ if (Permission.None.Equals(curMessageQueue.Permission))
+ {
+ continue;
+ }
+ return curMessageQueue.Broker.Endpoints;
+ }
+ throw new NotFoundException("Failed to pick endpoints to query assignment");
+ }
+
+ private int Mod(int x, int m)
+ {
+ if (m <= 0)
+ {
+ throw new ArgumentException("Modulus must be positive", nameof(m));
+ }
+ var result = x % m;
+ return result >= 0 ? result : result + m;
+ }
public bool Equals(TopicRouteData other)
{
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
index 71d5b74..8a4df4a 100644
--- a/csharp/rocketmq-client-csharp/Transaction.cs
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -19,6 +19,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
@@ -44,7 +45,7 @@
_messagesLock.EnterReadLock();
try
{
- if (_messages.Count > MaxMessageNum)
+ if (_messages.Count >= MaxMessageNum)
{
throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
}
@@ -57,7 +58,7 @@
_messagesLock.EnterWriteLock();
try
{
- if (_messages.Count > MaxMessageNum)
+ if (_messages.Count >= MaxMessageNum)
{
throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
}
@@ -90,7 +91,7 @@
}
}
- public async void Commit()
+ public async Task Commit()
{
if (State.Running != _producer.State)
{
@@ -109,7 +110,7 @@
}
}
- public async void Rollback()
+ public async Task Rollback()
{
if (State.Running != _producer.State)
{
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 5fe87fc..597bf9f 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -34,9 +34,10 @@
<PackageReference Include="OpenTelemetry" Version="1.3.1" />
<PackageReference Include="OpenTelemetry.Api" Version="1.3.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.1" />
+ <PackageReference Include="ParallelExtensionsExtras" Version="1.2.0" />
<Protobuf Include="..\..\protos\apache\rocketmq\v2\definition.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />
- <Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />
+ <Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Both" />
<None Include="logo.png" Pack="true" PackagePath="" />
</ItemGroup>
</Project>
diff --git a/csharp/tests/AttemptIdIntegrationTest.cs b/csharp/tests/AttemptIdIntegrationTest.cs
new file mode 100644
index 0000000..99bfbbd
--- /dev/null
+++ b/csharp/tests/AttemptIdIntegrationTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using FilterExpression = Org.Apache.Rocketmq.FilterExpression;
+
+namespace tests
+{
+ [TestClass]
+ public class AttemptIdIntegrationTest : GrpcServerIntegrationTest
+ {
+ private const string Topic = "topic";
+ private const string Broker = "broker";
+
+ private Server _server;
+ private readonly List<string> _attemptIdList = new ConcurrentBag<string>().ToList();
+
+ [TestInitialize]
+ public void SetUp()
+ {
+ var mockServer = new MockServer(Topic, Broker, _attemptIdList);
+ _server = SetUpServer(mockServer);
+ mockServer.Port = Port;
+ }
+
+ [TestCleanup]
+ public void TearDown()
+ {
+ _server.ShutdownAsync();
+ }
+
+ [TestMethod]
+ public async Task Test()
+ {
+ var endpoint = "127.0.0.1" + ":" + Port;
+ var credentialsProvider = new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey");
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoint)
+ .SetCredentialsProvider(credentialsProvider)
+ .EnableSsl(false)
+ .SetRequestTimeout(TimeSpan.FromMilliseconds(1000))
+ .Build();
+
+ const string consumerGroup = "yourConsumerGroup";
+ const string topic = "yourTopic";
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression("*") } };
+
+ var pushConsumer = await new PushConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetSubscriptionExpression(subscription)
+ .SetMessageListener(new CustomMessageListener())
+ .Build();
+
+ await Task.Run(async () =>
+ {
+ await WaitForConditionAsync(() =>
+ {
+ Assert.IsTrue(_attemptIdList.Count >= 3);
+ Assert.AreEqual(_attemptIdList[0], _attemptIdList[1]);
+ Assert.AreNotEqual(_attemptIdList[0], _attemptIdList[2]);
+ }, TimeSpan.FromSeconds(5));
+ });
+ }
+
+ private async Task WaitForConditionAsync(Action assertCondition, TimeSpan timeout)
+ {
+ var startTime = DateTime.UtcNow;
+ while (DateTime.UtcNow - startTime < timeout)
+ {
+ try
+ {
+ assertCondition();
+ return; // Condition met, exit the method
+ }
+ catch
+ {
+ // Condition not met, ignore exception and try again after a delay
+ }
+
+ await Task.Delay(100); // Small delay to avoid tight loop
+ }
+
+ // Perform last check to throw the exception
+ assertCondition();
+ }
+
+ private class CustomMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs
new file mode 100644
index 0000000..5e4e7ee
--- /dev/null
+++ b/csharp/tests/ClientManagerTest.cs
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using Apache.Rocketmq.V2;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+
+namespace tests
+{
+ [TestClass]
+ public class ClientManagerTest
+ {
+ private static readonly Endpoints FakeEndpoints = new Endpoints("127.0.0.1:8080");
+ private static IClientManager _clientManager;
+
+ private readonly ClientConfig _clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8080")
+ .Build();
+
+ [TestInitialize]
+ public void Initialize()
+ {
+ _clientManager = new ClientManager(CreateTestClient());
+ }
+
+ [TestMethod]
+ public void TestHeartbeat()
+ {
+ var request = new HeartbeatRequest();
+ _clientManager.Heartbeat(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.Heartbeat(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestSendMessage()
+ {
+ var request = new SendMessageRequest();
+ _clientManager.SendMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.SendMessage(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestQueryAssignment()
+ {
+ var request = new QueryAssignmentRequest();
+ _clientManager.QueryAssignment(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.QueryAssignment(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestReceiveMessage()
+ {
+ var request = new ReceiveMessageRequest();
+ _clientManager.ReceiveMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.ReceiveMessage(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestAckMessage()
+ {
+ var request = new AckMessageRequest();
+ _clientManager.AckMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.AckMessage(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestChangeInvisibleDuration()
+ {
+ var request = new ChangeInvisibleDurationRequest();
+ _clientManager.ChangeInvisibleDuration(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.ChangeInvisibleDuration(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestForwardMessageToDeadLetterQueue()
+ {
+ var request = new ForwardMessageToDeadLetterQueueRequest();
+ _clientManager.ForwardMessageToDeadLetterQueue(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.ForwardMessageToDeadLetterQueue(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestEndTransaction()
+ {
+ var request = new EndTransactionRequest();
+ _clientManager.EndTransaction(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.EndTransaction(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ [TestMethod]
+ public void TestNotifyClientTermination()
+ {
+ var request = new NotifyClientTerminationRequest();
+ _clientManager.NotifyClientTermination(FakeEndpoints, request, TimeSpan.FromSeconds(1));
+ _clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
+ private Client CreateTestClient()
+ {
+ return new Producer(_clientConfig, new ConcurrentDictionary<string, bool>(), 1, null);
+ }
+ }
+}
diff --git a/csharp/tests/ClientMeterManagerTest.cs b/csharp/tests/ClientMeterManagerTest.cs
new file mode 100644
index 0000000..9713e43
--- /dev/null
+++ b/csharp/tests/ClientMeterManagerTest.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Metric = Org.Apache.Rocketmq.Metric;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class ClientMeterManagerTest
+ {
+ [TestMethod]
+ public void TestResetWithMetricOn()
+ {
+ var meterManager = CreateClientMeterManager();
+ var metric = CreateMetric(true);
+ meterManager.Reset(metric);
+ Assert.IsTrue(meterManager.IsEnabled());
+ }
+
+ [TestMethod]
+ public void TestResetWithMetricOff()
+ {
+ var meterManager = CreateClientMeterManager();
+ var metric = CreateMetric(false);
+ meterManager.Reset(metric);
+ Assert.IsFalse(meterManager.IsEnabled());
+ }
+
+ private ClientMeterManager CreateClientMeterManager()
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8080")
+ .Build();
+
+ return new ClientMeterManager(CreateTestClient(clientConfig));
+ }
+
+ private Client CreateTestClient(ClientConfig clientConfig)
+ {
+ return new PushConsumer(clientConfig, "testGroup",
+ new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+ 0, 0, 1);
+ }
+
+ private Metric CreateMetric(bool isOn)
+ {
+ var endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses =
+ {
+ new Proto.Address { Host = "127.0.0.1", Port = 8080 }
+ }
+ };
+
+ return new Metric(new Proto.Metric { On = isOn, Endpoints = endpoints });
+ }
+
+ private class TestMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientMeterTest.cs b/csharp/tests/ClientMeterTest.cs
new file mode 100644
index 0000000..aa3aecc
--- /dev/null
+++ b/csharp/tests/ClientMeterTest.cs
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
+using Org.Apache.Rocketmq;
+using Metric = Org.Apache.Rocketmq.Metric;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class ClientMeterTest
+ {
+ private MeterProvider CreateMeterProvider()
+ {
+ return Sdk.CreateMeterProviderBuilder()
+ .SetResourceBuilder(ResourceBuilder.CreateEmpty())
+ .Build();
+ }
+
+ [TestMethod]
+ public void TestShutdownWithEnabledMeter()
+ {
+ var endpoints = new Endpoints(new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+ });
+ var provider = CreateMeterProvider();
+ var clientId = "testClientId";
+ var clientMeter = new ClientMeter(endpoints, provider, clientId);
+ Assert.IsTrue(clientMeter.Enabled);
+ clientMeter.Shutdown();
+ }
+
+ [TestMethod]
+ public void TestShutdownWithDisabledMeter()
+ {
+ var clientId = "testClientId";
+ var clientMeter = ClientMeter.DisabledInstance(clientId);
+ Assert.IsFalse(clientMeter.Enabled);
+ clientMeter.Shutdown();
+ }
+
+ [TestMethod]
+ public void TestSatisfy()
+ {
+ var clientId = "testClientId";
+ var clientMeter = ClientMeter.DisabledInstance(clientId);
+
+ var metric = new Metric(new Proto.Metric { On = false });
+ Assert.IsTrue(clientMeter.Satisfy(metric));
+
+ metric = new Metric(new Proto.Metric { On = true });
+ Assert.IsTrue(clientMeter.Satisfy(metric));
+
+ var endpoints0 = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+ };
+
+ metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 });
+ Assert.IsTrue(clientMeter.Satisfy(metric));
+
+ metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 });
+ Assert.IsFalse(clientMeter.Satisfy(metric));
+
+ var endpoints = new Endpoints(endpoints0);
+ var provider = CreateMeterProvider();
+ clientMeter = new ClientMeter(endpoints, provider, clientId);
+
+ metric = new Metric(new Proto.Metric { On = false });
+ Assert.IsFalse(clientMeter.Satisfy(metric));
+
+ metric = new Metric(new Proto.Metric { On = true });
+ Assert.IsFalse(clientMeter.Satisfy(metric));
+
+ metric = new Metric(new Proto.Metric { On = false, Endpoints = endpoints0 });
+ Assert.IsFalse(clientMeter.Satisfy(metric));
+
+ metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints0 });
+ Assert.IsTrue(clientMeter.Satisfy(metric));
+
+ var endpoints1 = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = "127.0.0.2", Port = 8081 } }
+ };
+ metric = new Metric(new Proto.Metric { On = true, Endpoints = endpoints1 });
+ Assert.IsFalse(clientMeter.Satisfy(metric));
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ClientTest.cs b/csharp/tests/ClientTest.cs
new file mode 100644
index 0000000..8fdfa86
--- /dev/null
+++ b/csharp/tests/ClientTest.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class ClientTest
+ {
+ [TestMethod]
+ public async Task TestOnVerifyMessageCommand()
+ {
+ var testClient = CreateTestClient();
+ var endpoints = new Endpoints("testEndpoints");
+ var command = new VerifyMessageCommand { Nonce = "testNonce" };
+
+ var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+ new MockClientStreamWriter<TelemetryCommand>(),
+ new MockAsyncStreamReader<TelemetryCommand>(),
+ null,
+ null,
+ null,
+ null);
+ var mockClientManager = new Mock<IClientManager>();
+ mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+ testClient.SetClientManager(mockClientManager.Object);
+
+ testClient.OnVerifyMessageCommand(endpoints, command);
+
+ mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
+ }
+
+ [TestMethod]
+ public async Task TestOnTopicRouteDataFetchedFailure()
+ {
+ var testClient = CreateTestClient();
+ var endpoints = new Endpoints("testEndpoints");
+ var mq = new Proto.MessageQueue
+ {
+ Topic = new Proto::Resource
+ {
+ ResourceNamespace = "testNamespace",
+ Name = "testTopic"
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Broker = new Proto::Broker
+ {
+ Name = "testBroker",
+ Id = 0,
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+ }
+ }
+ };
+ var topicRouteData = new TopicRouteData(new[] { mq });
+
+ var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+ new MockClientStreamWriter<TelemetryCommand>(),
+ new MockAsyncStreamReader<TelemetryCommand>(),
+ null,
+ null,
+ null,
+ null);
+ var mockClientManager = new Mock<IClientManager>();
+ mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+ testClient.SetClientManager(mockClientManager.Object);
+
+ try
+ {
+ await testClient.OnTopicRouteDataFetched("testTopic", topicRouteData);
+ Assert.Fail();
+ }
+ catch (Exception e)
+ {
+ mockClientManager.Verify(cm => cm.Telemetry(It.IsAny<Endpoints>()), Times.Once);
+ }
+ }
+
+ [TestMethod]
+ public async Task TestOnPrintThreadStackTraceCommand()
+ {
+ var testClient = CreateTestClient();
+ var endpoints = new Endpoints("testEndpoints");
+ var command = new PrintThreadStackTraceCommand { Nonce = "testNonce" };
+ var mockCall = new AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand>(
+ new MockClientStreamWriter<TelemetryCommand>(),
+ new MockAsyncStreamReader<TelemetryCommand>(),
+ null,
+ null,
+ null,
+ null);
+
+ var mockClientManager = new Mock<IClientManager>();
+ mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockCall);
+
+ testClient.SetClientManager(mockClientManager.Object);
+
+ // Act
+ testClient.OnPrintThreadStackTraceCommand(endpoints, command);
+
+ // Assert
+ mockClientManager.Verify(cm => cm.Telemetry(endpoints), Times.Once);
+ }
+
+ private Client CreateTestClient()
+ {
+ return new Producer(new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build(),
+ new ConcurrentDictionary<string, bool>(), 1, null);
+ }
+
+ private class MockClientStreamWriter<T> : IClientStreamWriter<T>
+ {
+ public Task WriteAsync(T message)
+ {
+ // Simulate async operation
+ return Task.CompletedTask;
+ }
+
+ public WriteOptions WriteOptions { get; set; }
+
+ public Task CompleteAsync()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private class MockAsyncStreamReader<T> : IAsyncStreamReader<T>
+ {
+ public Task<bool> MoveNext(CancellationToken cancellationToken)
+ {
+ throw new System.NotImplementedException();
+ }
+
+ public T Current => throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ConsumeServiceTest.cs b/csharp/tests/ConsumeServiceTest.cs
new file mode 100644
index 0000000..8ef757e
--- /dev/null
+++ b/csharp/tests/ConsumeServiceTest.cs
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Schedulers;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class ConsumeServiceTest
+ {
+ private Proto.Digest _digest;
+ private Proto.SystemProperties _systemProperties;
+ private ByteString _body;
+ private Proto.Message _message;
+ private MessageView _messageView;
+
+ [TestInitialize]
+ public void SetUp()
+ {
+ _digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+ _systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = "127.0.0.1:8080",
+ BodyDigest = _digest,
+ BornTimestamp = new Timestamp()
+ };
+ _body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ _message = new Proto.Message
+ {
+ SystemProperties = _systemProperties,
+ Topic = new Proto.Resource { Name = "testTopic" },
+ Body = _body
+ };
+ _messageView = MessageView.FromProtobuf(_message);
+ }
+
+ [TestMethod]
+ public void TestConsumeSuccess()
+ {
+ var messageListener = new TestSuccessMessageListener();
+ var consumeService = CreateService(messageListener);
+ Assert.AreEqual(ConsumeResult.SUCCESS, consumeService.Consume(_messageView).Result);
+ }
+
+ [TestMethod]
+ public void TestConsumeFailure()
+ {
+ var messageListener = new TestFailureMessageListener();
+ var consumeService = CreateService(messageListener);
+ Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(_messageView).Result);
+ }
+
+ [TestMethod]
+ public void TestConsumeWithException()
+ {
+ var messageListener = new TestExceptionMessageListener();
+ var consumeService = CreateService(messageListener);
+ Assert.AreEqual(ConsumeResult.FAILURE, consumeService.Consume(_messageView).Result);
+ }
+
+ [TestMethod]
+ public void TestConsumeWithDelay()
+ {
+ var messageListener = new TestSuccessMessageListener();
+ var consumeService = CreateService(messageListener);
+ Assert.AreEqual(ConsumeResult.SUCCESS,
+ consumeService.Consume(_messageView, TimeSpan.FromMilliseconds(500)).Result);
+ }
+
+ private TestConsumeService CreateService(IMessageListener messageListener)
+ {
+ return new TestConsumeService("testClientId", messageListener,
+ new CurrentThreadTaskScheduler(), new CancellationToken());
+ }
+
+ private class TestSuccessMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView) => ConsumeResult.SUCCESS;
+ }
+
+ private class TestFailureMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView) => ConsumeResult.FAILURE;
+ }
+
+ private class TestExceptionMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView) => throw new Exception();
+ }
+
+ private class TestConsumeService : ConsumeService
+ {
+ public TestConsumeService(string clientId, IMessageListener messageListener,
+ TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken)
+ : base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken) { }
+
+ public override void Consume(ProcessQueue pq, List<MessageView> messageViews) => Task.FromResult(0);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/ConsumerTest.cs b/csharp/tests/ConsumerTest.cs
new file mode 100644
index 0000000..82ef09c
--- /dev/null
+++ b/csharp/tests/ConsumerTest.cs
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+[assembly: InternalsVisibleTo("tests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
+namespace tests
+{
+ [TestClass]
+ public class ConsumerTest
+ {
+ [TestMethod]
+ public async Task TestReceiveMessage()
+ {
+ var maxCacheMessageCount = 8;
+ var maxCacheMessageSizeInBytes = 1024;
+ var consumptionThreadCount = 4;
+
+ var consumer =
+ CreateTestClient(maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ var mockClientManager = new Mock<IClientManager>();
+ consumer.SetClientManager(mockClientManager.Object);
+
+ var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = "127.0.0.1",
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = "testNamespace",
+ Name = "testTopic"
+ },
+ Body = body
+ };
+ var receiveMessageResponse0 = new Proto.ReceiveMessageResponse
+ {
+ Status = new Proto.Status
+ {
+ Code = Proto.Code.Ok
+ }
+ };
+ var receiveMessageResponse1 = new Proto.ReceiveMessageResponse
+ {
+ Message = message
+ };
+ var metadata = consumer.Sign();
+ var receiveMessageResponseList = new List<Proto.ReceiveMessageResponse>
+ { receiveMessageResponse0, receiveMessageResponse1 };
+ var receiveMessageInvocation =
+ new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(null,
+ receiveMessageResponseList, metadata);
+ mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.ReceiveMessageRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(receiveMessageInvocation));
+
+ var receivedMessageCount = 1;
+ var mq = new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = "broker0",
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses =
+ {
+ new Proto.Address
+ {
+ Host = "127.0.0.1",
+ Port = 8080
+ }
+ }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = "testNamespace",
+ Name = "testTopic",
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ };
+ var request = consumer.WrapReceiveMessageRequest(1, new MessageQueue(mq), new FilterExpression("*"),
+ TimeSpan.FromSeconds(15), Guid.NewGuid().ToString());
+ var receiveMessageResult = await consumer.ReceiveMessage(request, new MessageQueue(mq),
+ TimeSpan.FromSeconds(15));
+ Assert.AreEqual(receiveMessageResult.Messages.Count, receivedMessageCount);
+ }
+
+ private PushConsumer CreateTestClient(int maxCacheMessageCount, int maxCacheMessageSizeInBytes,
+ int consumptionThreadCount)
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:9876")
+ .Build();
+ return new PushConsumer(clientConfig, "testGroup",
+ new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ }
+
+ private class TestMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/CustomizedBackoffRetryPolicyTest.cs b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs
new file mode 100644
index 0000000..dcbd4e3
--- /dev/null
+++ b/csharp/tests/CustomizedBackoffRetryPolicyTest.cs
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class CustomizedBackoffRetryPolicyTest
+ {
+ [TestMethod]
+ public void TestConstructWithValidDurationsAndMaxAttempts()
+ {
+ var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) };
+ var maxAttempts = 3;
+ var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+
+ Assert.AreEqual(maxAttempts, policy.GetMaxAttempts());
+ CollectionAssert.AreEqual(durations, policy.GetDurations());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestConstructWithEmptyDurations()
+ {
+ new CustomizedBackoffRetryPolicy(new List<TimeSpan>(), 3);
+ }
+
+ [TestMethod]
+ public void TestGetNextAttemptDelayWithValidAttempts()
+ {
+ var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5) };
+ var policy = new CustomizedBackoffRetryPolicy(durations, 5);
+
+ Assert.AreEqual(TimeSpan.FromSeconds(1), policy.GetNextAttemptDelay(1));
+ Assert.AreEqual(TimeSpan.FromSeconds(3), policy.GetNextAttemptDelay(2));
+ Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(3));
+ Assert.AreEqual(TimeSpan.FromSeconds(5), policy.GetNextAttemptDelay(4)); // Should inherit the last duration
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestGetNextAttemptDelayWithInvalidAttempt()
+ {
+ var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }, 3);
+ policy.GetNextAttemptDelay(0);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestGetNextAttemptDelayWithNegativeAttempt()
+ {
+ var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) }, 3);
+ policy.GetNextAttemptDelay(-1);
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithValidRetryPolicy()
+ {
+ var protoDurations = new List<Duration>
+ {
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(2))
+ };
+ var protoRetryPolicy = new RetryPolicy
+ {
+ MaxAttempts = 3,
+ CustomizedBackoff = new CustomizedBackoff { Next = { protoDurations } },
+ };
+ var policy = CustomizedBackoffRetryPolicy.FromProtobuf(protoRetryPolicy);
+
+ Assert.AreEqual(3, policy.GetMaxAttempts());
+ Assert.AreEqual(protoDurations.Count, policy.GetDurations().Count);
+ CollectionAssert.AreEqual(protoDurations.Select(d => d.ToTimeSpan()).ToList(), policy.GetDurations());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestFromProtobufWithInvalidRetryPolicy()
+ {
+ var retryPolicy = new RetryPolicy
+ {
+ MaxAttempts = 3,
+ ExponentialBackoff = new ExponentialBackoff
+ {
+ Initial = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+ Max = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+ Multiplier = 1.0f
+ }
+ };
+ CustomizedBackoffRetryPolicy.FromProtobuf(retryPolicy);
+ }
+
+ [TestMethod]
+ public void ToProtobuf_ShouldReturnCorrectProtobuf()
+ {
+ var durations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2) };
+ var maxAttempts = 3;
+ var policy = new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+ var proto = policy.ToProtobuf();
+
+ Assert.AreEqual(maxAttempts, proto.MaxAttempts);
+ CollectionAssert.AreEqual(durations, proto.CustomizedBackoff.Next.Select(d => d.ToTimeSpan()).ToList());
+ }
+
+ [TestMethod]
+ public void TestInheritBackoffWithValidCustomizedBackoffPolicy()
+ {
+ var originalDurations = new List<TimeSpan> { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3) };
+ var newDurations = new List<Duration>
+ {
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(2)),
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(4))
+ };
+ var backoff = new CustomizedBackoff { Next = { newDurations } };
+ var retryPolicy = new RetryPolicy
+ {
+ MaxAttempts = 5,
+ CustomizedBackoff = backoff,
+ };
+ var policy = new CustomizedBackoffRetryPolicy(originalDurations, 5);
+ var inheritedPolicy = policy.InheritBackoff(retryPolicy);
+ Assert.IsTrue(inheritedPolicy is CustomizedBackoffRetryPolicy);
+ var customizedBackoffRetryPolicy = (CustomizedBackoffRetryPolicy)inheritedPolicy;
+ Assert.AreEqual(policy.GetMaxAttempts(), inheritedPolicy.GetMaxAttempts());
+ var inheritedDurations = customizedBackoffRetryPolicy.GetDurations();
+ Assert.AreEqual(newDurations.Count, inheritedDurations.Count);
+ for (var i = 0; i < newDurations.Count; i++)
+ {
+ Assert.AreEqual(newDurations[i].ToTimeSpan(), inheritedDurations[i]);
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void TestInheritBackoffWithInvalidPolicy()
+ {
+ var policy = new CustomizedBackoffRetryPolicy(new List<TimeSpan>
+ {
+ TimeSpan.FromSeconds(3),
+ TimeSpan.FromSeconds(2),
+ TimeSpan.FromSeconds(1)
+ }, 3);
+ var retryPolicy = new RetryPolicy
+ {
+ ExponentialBackoff = new ExponentialBackoff()
+ };
+ policy.InheritBackoff(retryPolicy);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/EncodingTest.cs b/csharp/tests/EncodingTest.cs
new file mode 100644
index 0000000..198fec9
--- /dev/null
+++ b/csharp/tests/EncodingTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class EncodingTest
+ {
+ [TestMethod]
+ public void TestToProtobuf()
+ {
+ Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Identity), Apache.Rocketmq.V2.Encoding.Identity);
+ Assert.AreEqual(EncodingHelper.ToProtobuf(MqEncoding.Gzip), Apache.Rocketmq.V2.Encoding.Gzip);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ExponentialBackoffRetryPolicyTest.cs b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs
new file mode 100644
index 0000000..70b3254
--- /dev/null
+++ b/csharp/tests/ExponentialBackoffRetryPolicyTest.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.Rocketmq.V2;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class ExponentialBackoffRetryPolicyTest
+ {
+ private TimeSpan initialBackoff = TimeSpan.FromMilliseconds(5);
+ private TimeSpan maxBackoff = TimeSpan.FromSeconds(1);
+ private float backoffMultiplier = 5;
+ private int maxAttempts = 3;
+
+ [TestMethod]
+ public void TestNextAttemptDelayForImmediatelyRetryPolicy()
+ {
+ var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3);
+ for (int i = 1; i <= 4; i++)
+ {
+ Assert.AreEqual(TimeSpan.Zero, retryPolicy.GetNextAttemptDelay(i));
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestGetNextAttemptDelayWithIllegalAttempt()
+ {
+ var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+ retryPolicy.GetNextAttemptDelay(0);
+ }
+
+ [TestMethod]
+ public void TestGetNextAttemptDelay()
+ {
+ var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+ Assert.AreEqual(TimeSpan.FromMilliseconds(5), retryPolicy.GetNextAttemptDelay(1));
+ Assert.AreEqual(TimeSpan.FromMilliseconds(25), retryPolicy.GetNextAttemptDelay(2));
+ Assert.AreEqual(TimeSpan.FromMilliseconds(125), retryPolicy.GetNextAttemptDelay(3));
+ Assert.AreEqual(TimeSpan.FromMilliseconds(625), retryPolicy.GetNextAttemptDelay(4));
+ Assert.AreEqual(TimeSpan.FromSeconds(1), retryPolicy.GetNextAttemptDelay(5));
+ }
+
+ [TestMethod]
+ public void TestFromProtobuf()
+ {
+ var initialBackoffProto = Duration.FromTimeSpan(initialBackoff);
+ var maxBackoffProto = Duration.FromTimeSpan(maxBackoff);
+
+ var exponentialBackoff = new ExponentialBackoff
+ {
+ Initial = initialBackoffProto,
+ Max = maxBackoffProto,
+ Multiplier = backoffMultiplier
+ };
+ var retryPolicyProto = new RetryPolicy
+ {
+ MaxAttempts = maxAttempts,
+ ExponentialBackoff = exponentialBackoff
+ };
+
+ var policy = ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto);
+
+ Assert.AreEqual(maxAttempts, policy.GetMaxAttempts());
+ Assert.AreEqual(initialBackoff, policy.InitialBackoff);
+ Assert.AreEqual(maxBackoff, policy.MaxBackoff);
+ Assert.AreEqual(backoffMultiplier, policy.BackoffMultiplier);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestFromProtobufWithoutExponentialBackoff()
+ {
+ var customizedBackoff = new CustomizedBackoff();
+ var retryPolicyProto = new RetryPolicy
+ {
+ MaxAttempts = maxAttempts,
+ CustomizedBackoff = customizedBackoff
+ };
+ ExponentialBackoffRetryPolicy.FromProtobuf(retryPolicyProto);
+ }
+
+ [TestMethod]
+ public void TestToProtobuf()
+ {
+ var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+ var retryPolicyProto = retryPolicy.ToProtobuf();
+
+ Assert.IsNotNull(retryPolicyProto.ExponentialBackoff);
+
+ var exponentialBackoff = retryPolicyProto.ExponentialBackoff;
+ var initialBackoffProto = Duration.FromTimeSpan(initialBackoff);
+ var maxBackoffProto = Duration.FromTimeSpan(maxBackoff);
+
+ Assert.AreEqual(exponentialBackoff.Initial, initialBackoffProto);
+ Assert.AreEqual(exponentialBackoff.Max, maxBackoffProto);
+ Assert.AreEqual(exponentialBackoff.Multiplier, backoffMultiplier);
+ Assert.AreEqual(retryPolicyProto.MaxAttempts, maxAttempts);
+ }
+
+ [TestMethod]
+ public void TestInheritBackoff()
+ {
+ var retryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+
+ var initialBackoffProto = TimeSpan.FromMilliseconds(10);
+ var maxBackoffProto = TimeSpan.FromSeconds(3);
+ double backoffMultiplierProto = 10;
+
+ var exponentialBackoff = new ExponentialBackoff
+ {
+ Initial = Duration.FromTimeSpan(initialBackoffProto),
+ Max = Duration.FromTimeSpan(maxBackoffProto),
+ Multiplier = (float)backoffMultiplierProto
+ };
+ var retryPolicyProto = new RetryPolicy
+ {
+ ExponentialBackoff = exponentialBackoff
+ };
+
+ var inheritedRetryPolicy = retryPolicy.InheritBackoff(retryPolicyProto);
+
+ Assert.IsInstanceOfType(inheritedRetryPolicy, typeof(ExponentialBackoffRetryPolicy));
+
+ var exponentialBackoffRetryPolicy = (ExponentialBackoffRetryPolicy)inheritedRetryPolicy;
+
+ Assert.AreEqual(initialBackoffProto, exponentialBackoffRetryPolicy.InitialBackoff);
+ Assert.AreEqual(maxBackoffProto, exponentialBackoffRetryPolicy.MaxBackoff);
+ Assert.AreEqual(backoffMultiplierProto, exponentialBackoffRetryPolicy.BackoffMultiplier);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void TestInheritBackoffWithoutExponentialBackoff()
+ {
+ var customizedBackoff = new CustomizedBackoff();
+ var retryPolicyProto = new RetryPolicy
+ {
+ MaxAttempts = maxAttempts,
+ CustomizedBackoff = customizedBackoff
+ };
+
+ var exponentialBackoffRetryPolicy = new ExponentialBackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+ exponentialBackoffRetryPolicy.InheritBackoff(retryPolicyProto);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/GrpcServerIntegrationTest.cs b/csharp/tests/GrpcServerIntegrationTest.cs
new file mode 100644
index 0000000..f7f242f
--- /dev/null
+++ b/csharp/tests/GrpcServerIntegrationTest.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+
+namespace tests
+{
+ public abstract class GrpcServerIntegrationTest
+ {
+ protected int Port;
+
+ protected Server SetUpServer(MessagingService.MessagingServiceBase mockServer)
+ {
+ var server = new Server
+ {
+ Ports = { new ServerPort("127.0.0.1", Port, ServerCredentials.Insecure) },
+ Services = { MessagingService.BindService(mockServer) }
+ };
+ server.Start();
+ Port = server.Ports.First().BoundPort;
+ return server;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/MessageViewTest.cs b/csharp/tests/MessageViewTest.cs
new file mode 100644
index 0000000..1387b48
--- /dev/null
+++ b/csharp/tests/MessageViewTest.cs
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class MessageViewTests
+ {
+ private const string FakeHost = "127.0.0.1";
+ private const string FakeTopic = "test-topic";
+
+ [TestMethod]
+ public void TestFromProtobufWithCrc32()
+ {
+ var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.AreEqual(FakeHost, messageView.BornHost);
+ Assert.IsFalse(messageView.IsCorrupted());
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithWrongCrc32()
+ {
+ var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "00000000" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.IsTrue(messageView.IsCorrupted());
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithMd5()
+ {
+ var digest = new Proto.Digest
+ { Type = Proto.DigestType.Md5, Checksum = "3858F62230AC3C915F300C664312C63F" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.IsFalse(messageView.IsCorrupted());
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithWrongMd5()
+ {
+ var digest = new Proto.Digest
+ { Type = Proto.DigestType.Md5, Checksum = "00000000000000000000000000000000" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.IsTrue(messageView.IsCorrupted());
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithSha1()
+ {
+ var digest = new Proto.Digest
+ { Type = Proto.DigestType.Sha1, Checksum = "8843D7F92416211DE9EBB963FF4CE28125932878" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.IsFalse(messageView.IsCorrupted());
+ }
+
+ [TestMethod]
+ public void TestFromProtobufWithWrongSha1()
+ {
+ var digest = new Proto.Digest
+ { Type = Proto.DigestType.Sha1, Checksum = "0000000000000000000000000000000000000000" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = FakeHost,
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = FakeTopic },
+ Body = body
+ };
+
+ var messageView = MessageView.FromProtobuf(message);
+
+ CollectionAssert.AreEqual(body.ToByteArray(), messageView.Body);
+ Assert.AreEqual(FakeTopic, messageView.Topic);
+ Assert.IsTrue(messageView.IsCorrupted());
+ }
+ }
+}
diff --git a/csharp/tests/MockServer.cs b/csharp/tests/MockServer.cs
new file mode 100644
index 0000000..6f65842
--- /dev/null
+++ b/csharp/tests/MockServer.cs
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ public class MockServer : Proto.MessagingService.MessagingServiceBase
+ {
+ private readonly List<string> _attemptIdList;
+ private int _serverDeadlineFlag = 1;
+
+ private readonly Proto.Status _mockStatus = new Proto.Status
+ {
+ Code = Proto.Code.Ok,
+ Message = "mock test"
+ };
+
+ private readonly string _topic;
+ private readonly string _broker;
+
+ public MockServer(string topic, string broker, List<string> attemptIdList)
+ {
+ _topic = topic;
+ _broker = broker;
+ _attemptIdList = attemptIdList;
+ }
+
+ public int Port { get; set; }
+
+ public override Task<Proto.QueryRouteResponse> QueryRoute(Proto.QueryRouteRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.QueryRouteResponse
+ {
+ Status = _mockStatus,
+ MessageQueues =
+ {
+ new Proto.MessageQueue
+ {
+ Topic = new Proto.Resource { Name = _topic },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Broker = new Proto.Broker
+ {
+ Name = _broker,
+ Id = 0,
+ Endpoints = new Proto.Endpoints
+ {
+ Addresses =
+ {
+ new Proto.Address { Host = "127.0.0.1", Port = Port }
+ }
+ }
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ }
+ }
+ };
+ return Task.FromResult(response);
+ }
+
+ public override Task<Proto.HeartbeatResponse> Heartbeat(Proto.HeartbeatRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.HeartbeatResponse { Status = _mockStatus };
+ return Task.FromResult(response);
+ }
+
+ public override Task<Proto.QueryAssignmentResponse> QueryAssignment(Proto.QueryAssignmentRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.QueryAssignmentResponse
+ {
+ Status = _mockStatus,
+ Assignments =
+ {
+ new Proto.Assignment
+ {
+ MessageQueue = new Proto.MessageQueue
+ {
+ Topic = new Proto.Resource { Name = _topic },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Broker = new Proto.Broker
+ {
+ Name = _broker,
+ Id = 0,
+ Endpoints = new Proto.Endpoints
+ {
+ Addresses =
+ {
+ new Proto.Address { Host = "127.0.0.1", Port = Port }
+ }
+ }
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ }
+ }
+ }
+ };
+ return Task.FromResult(response);
+ }
+
+ public override async Task ReceiveMessage(Proto.ReceiveMessageRequest request,
+ IServerStreamWriter<Proto.ReceiveMessageResponse> responseStream, ServerCallContext context)
+ {
+ if (_attemptIdList.Count >= 3)
+ {
+ await Task.Delay(100);
+ }
+
+ _attemptIdList.Add(request.AttemptId);
+
+ if (CompareAndSetServerDeadlineFlag(true, false))
+ {
+ // timeout
+ await Task.Delay(TimeSpan.FromSeconds(3));
+ }
+ else
+ {
+ var response = new Proto.ReceiveMessageResponse { Status = _mockStatus };
+ await responseStream.WriteAsync(response);
+ }
+ }
+
+ public override async Task Telemetry(IAsyncStreamReader<Proto.TelemetryCommand> requestStream,
+ IServerStreamWriter<Proto.TelemetryCommand> responseStream, ServerCallContext context)
+ {
+ await foreach (var command in requestStream.ReadAllAsync())
+ {
+ var response = command.Clone();
+ response.Status = _mockStatus;
+ response.Settings = new Proto.Settings
+ {
+ BackoffPolicy = new Proto.RetryPolicy
+ {
+ MaxAttempts = 16,
+ ExponentialBackoff = new Proto.ExponentialBackoff
+ {
+ Initial = new Duration { Seconds = 1 },
+ Max = new Duration { Seconds = 10 },
+ Multiplier = 1.5f
+ }
+ }
+ };
+
+ await responseStream.WriteAsync(response);
+ }
+ }
+
+ private bool CompareAndSetServerDeadlineFlag(bool expectedValue, bool newValue)
+ {
+ var expected = expectedValue ? 1 : 0;
+ var newVal = newValue ? 1 : 0;
+ return Interlocked.CompareExchange(ref _serverDeadlineFlag, newVal, expected) == expected;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProcessQueueTest.cs b/csharp/tests/ProcessQueueTest.cs
new file mode 100644
index 0000000..d18188d
--- /dev/null
+++ b/csharp/tests/ProcessQueueTest.cs
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class ProcessQueueTest
+ {
+ private PushConsumer CreateAndSetupPushConsumer()
+ {
+ var pushConsumer = CreatePushConsumer("testTopic", 8, 1024, 4);
+ pushConsumer.State = State.Running;
+ return pushConsumer;
+ }
+
+ private Mock<IClientManager> SetupMockClientManager(PushConsumer pushConsumer)
+ {
+ var mockClientManager = new Mock<IClientManager>();
+ pushConsumer.SetClientManager(mockClientManager.Object);
+ return mockClientManager;
+ }
+
+ private static Proto.MessageQueue CreateMessageQueue()
+ {
+ return new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = "broker0",
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses =
+ {
+ new Proto.Address
+ {
+ Host = "127.0.0.1",
+ Port = 8080
+ }
+ }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = "testNamespace",
+ Name = "testTopic",
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ };
+ }
+
+ [TestMethod]
+ public void TestExpired()
+ {
+ var pushConsumer = CreateAndSetupPushConsumer();
+ var processQueue = CreateProcessQueue(pushConsumer);
+ Assert.IsFalse(processQueue.Expired());
+ }
+
+ [TestMethod]
+ public async Task TestReceiveMessageImmediately()
+ {
+ var pushConsumer = CreateAndSetupPushConsumer();
+ var processQueue = CreateProcessQueue(pushConsumer);
+ var mockClientManager = SetupMockClientManager(pushConsumer);
+
+ var message = CreateMessage();
+ var receiveMessageResponses = new List<Proto.ReceiveMessageResponse>
+ {
+ new Proto.ReceiveMessageResponse { Status = new Proto.Status { Code = Proto.Code.Ok } },
+ new Proto.ReceiveMessageResponse { Message = message }
+ };
+
+ MockReceiveMessage(mockClientManager, pushConsumer, receiveMessageResponses);
+
+ await Task.Delay(3000);
+ processQueue.FetchMessageImmediately();
+
+ Assert.AreEqual(processQueue.GetCachedMessageCount(), 1);
+ }
+
+ [TestMethod]
+ public async Task TestEraseMessageWithConsumeOk()
+ {
+ var pushConsumer = CreateAndSetupPushConsumer();
+ var messageView = CreateMessageView();
+ var processQueue = CreateProcessQueue(pushConsumer);
+ var mockClientManager = SetupMockClientManager(pushConsumer);
+
+ MockAckMessage(mockClientManager, pushConsumer, Proto.Code.Ok);
+
+ processQueue.CacheMessages(new List<MessageView> { messageView });
+
+ processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS);
+
+ mockClientManager.Verify(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
+ }
+
+ [TestMethod]
+ public async Task TestEraseMessageWithAckFailure()
+ {
+ var pushConsumer = CreateAndSetupPushConsumer();
+ var messageView = CreateMessageView();
+ var processQueue = CreateProcessQueue(pushConsumer);
+ var mockClientManager = SetupMockClientManager(pushConsumer);
+
+ MockAckMessage(mockClientManager, pushConsumer, Proto.Code.InternalServerError);
+
+ processQueue.CacheMessages(new List<MessageView> { messageView });
+
+ var ackTimes = 3;
+
+ processQueue.EraseMessage(messageView, ConsumeResult.SUCCESS);
+ await Task.Delay(ProcessQueue.AckMessageFailureBackoffDelay * ackTimes);
+
+ mockClientManager.Verify(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()), Times.AtLeast(ackTimes));
+ }
+
+ private void MockReceiveMessage(Mock<IClientManager> mockClientManager, PushConsumer pushConsumer, List<Proto.ReceiveMessageResponse> responses)
+ {
+ var metadata = pushConsumer.Sign();
+ var invocation = new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(null, responses, metadata);
+
+ mockClientManager.Setup(cm => cm.ReceiveMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.ReceiveMessageRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(invocation));
+ }
+
+ private void MockAckMessage(Mock<IClientManager> mockClientManager, PushConsumer pushConsumer, Proto.Code responseCode)
+ {
+ var metadata = pushConsumer.Sign();
+ var response = new Proto.AckMessageResponse { Status = new Proto.Status { Code = responseCode } };
+
+ var invocation = new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(null, response, metadata);
+
+ mockClientManager.Setup(cm => cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(invocation));
+ }
+
+ private MessageView CreateMessageView()
+ {
+ return MessageView.FromProtobuf(CreateMessage(), new MessageQueue(CreateMessageQueue()));
+ }
+
+ private static ProcessQueue CreateProcessQueue(PushConsumer pushConsumer)
+ {
+ var processQueue = new ProcessQueue(pushConsumer, new MessageQueue(CreateMessageQueue()),
+ pushConsumer.GetSubscriptionExpressions()["testTopic"], new CancellationTokenSource(),
+ new CancellationTokenSource(), new CancellationTokenSource(),
+ new CancellationTokenSource());
+ return processQueue;
+ }
+
+ private Proto.Message CreateMessage()
+ {
+ var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = "127.0.0.1",
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = "testTopic" },
+ Body = body
+ };
+ return message;
+ }
+
+ private PushConsumer CreatePushConsumer(string topic, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount)
+ {
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:8080").Build();
+ var subscription = new Dictionary<string, FilterExpression> { { topic, new FilterExpression("*") } };
+ return new PushConsumer(clientConfig, "testGroup",
+ new ConcurrentDictionary<string, FilterExpression>(subscription), new TestMessageListener(),
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ }
+
+ private class TestMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView) => ConsumeResult.SUCCESS;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProducerBuilderTest.cs b/csharp/tests/ProducerBuilderTest.cs
new file mode 100644
index 0000000..c318aaf
--- /dev/null
+++ b/csharp/tests/ProducerBuilderTest.cs
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class ProducerBuilderTest
+ {
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetClientConfigurationWithNull()
+ {
+ var builder = new Producer.Builder();
+ builder.SetClientConfig(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(NullReferenceException))]
+ public void TestSetTopicWithNull()
+ {
+ var builder = new Producer.Builder();
+ builder.SetTopics(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetIllegalTopic()
+ {
+ var builder = new Producer.Builder();
+ builder.SetTopics("\t");
+ }
+
+ [TestMethod]
+ public void TestSetTopic()
+ {
+ var builder = new Producer.Builder();
+ builder.SetTopics("abc");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetNegativeMaxAttempts()
+ {
+ var builder = new Producer.Builder();
+ builder.SetMaxAttempts(-1);
+ }
+
+ [TestMethod]
+ public void TestSetMaxAttempts()
+ {
+ var builder = new Producer.Builder();
+ builder.SetMaxAttempts(3);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetTransactionCheckerWithNull()
+ {
+ var builder = new Producer.Builder();
+ builder.SetTransactionChecker(null);
+ }
+
+ [TestMethod]
+ public void TestSetTransactionChecker()
+ {
+ var builder = new Producer.Builder();
+ builder.SetTransactionChecker(new TestTransactionChecker());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public async Task TestBuildWithoutClientConfiguration()
+ {
+ var builder = new Producer.Builder();
+ await builder.Build();
+ }
+
+ [TestMethod]
+ public void TestBuild()
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:9876").Build();
+ var builder = new Producer.Builder();
+ builder.SetClientConfig(clientConfig).Build();
+ }
+
+ private class TestTransactionChecker : ITransactionChecker
+ {
+ public TransactionResolution Check(MessageView messageView)
+ {
+ return TransactionResolution.Commit;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
new file mode 100644
index 0000000..ce0cca1
--- /dev/null
+++ b/csharp/tests/ProducerTest.cs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+using System.Collections.Concurrent;
+using System.Text;
+using System.Threading.Tasks;
+using Moq;
+
+namespace tests
+{
+ [TestClass]
+ public class ProducerTest
+ {
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestSendBeforeStartup()
+ {
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+ var publishingTopics = new ConcurrentDictionary<string, bool>();
+ publishingTopics.TryAdd("testTopic", true);
+ var producer = new Producer(clientConfig, publishingTopics, 1, null);
+ var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+ await producer.Send(message);
+ }
+
+ [TestMethod]
+ public async Task TestSendWithTopic()
+ {
+ var producer = CreateTestClient();
+ producer.State = State.Running;
+ var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+ var metadata = producer.Sign();
+ var sendResultEntry = new Proto.SendResultEntry
+ {
+ MessageId = "fakeMsgId",
+ Status = new Proto.Status
+ {
+ Code = Proto.Code.Ok
+ },
+ Offset = 1
+ };
+ var sendMessageResponse = new Proto.SendMessageResponse
+ {
+ Status = new Proto.Status
+ {
+ Code = Proto.Code.Ok
+ },
+ Entries = { sendResultEntry }
+ };
+ var sendMessageInvocation = new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(null,
+ sendMessageResponse, metadata);
+ var mockClientManager = new Mock<IClientManager>();
+ producer.SetClientManager(mockClientManager.Object);
+ mockClientManager.Setup(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(sendMessageInvocation));
+ await producer.Send(message);
+ mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public async Task TestSendFailureWithTopic()
+ {
+ var producer = CreateTestClient();
+ producer.State = State.Running;
+ var message = new Message.Builder().SetTopic("testTopic").SetBody(Encoding.UTF8.GetBytes("foobar")).Build();
+ var mockClientManager = new Mock<IClientManager>();
+ producer.SetClientManager(mockClientManager.Object);
+ var exception = new ArgumentException();
+ mockClientManager.Setup(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>())).Throws(exception);
+ await producer.Send(message);
+ var maxAttempts = producer.PublishingSettings.GetRetryPolicy().GetMaxAttempts();
+ mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Exactly(maxAttempts));
+ }
+
+ private Producer CreateTestClient()
+ {
+ const string host0 = "127.0.0.1";
+ var mqs = new List<Proto.MessageQueue>();
+ var mq0 = new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = "broker0",
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses =
+ {
+ new Proto.Address
+ {
+ Host = host0,
+ Port = 80
+ }
+ }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = "foo-bar-namespace",
+ Name = "testTopic",
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ };
+ mqs.Add(mq0);
+ var topicRouteData = new TopicRouteData(mqs);
+ var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+ var producer = new Producer(clientConfig, new ConcurrentDictionary<string, bool>(),
+ 1, null);
+ producer._publishingRouteDataCache.TryAdd("testTopic", publishingLoadBalancer);
+ return producer;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/PublishingLoadBalancerTest.cs b/csharp/tests/PublishingLoadBalancerTest.cs
index b86a095..6b4a74d 100644
--- a/csharp/tests/PublishingLoadBalancerTest.cs
+++ b/csharp/tests/PublishingLoadBalancerTest.cs
@@ -53,6 +53,7 @@
Permission = Proto.Permission.ReadWrite,
Topic = new Proto.Resource
{
+ ResourceNamespace = "foo-bar-namespace",
Name = "TestTopic",
}
};
@@ -78,6 +79,7 @@
Permission = Proto.Permission.ReadWrite,
Topic = new Proto.Resource
{
+ ResourceNamespace = "foo-bar-namespace",
Name = "TestTopic",
}
};
diff --git a/csharp/tests/PublishingMessageTest.cs b/csharp/tests/PublishingMessageTest.cs
index c12b908..02936a6 100644
--- a/csharp/tests/PublishingMessageTest.cs
+++ b/csharp/tests/PublishingMessageTest.cs
@@ -28,6 +28,7 @@
{
private const string ClientId = "fakeClientId";
private static readonly Endpoints Endpoints = new Endpoints("127.0.0.1:8081");
+ private const string Namespace = "fakeNamespace";
[TestMethod]
@@ -39,7 +40,7 @@
{
[topic] = true
};
- var settings = new PublishingSettings(ClientId, Endpoints,
+ var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3), TimeSpan.FromSeconds(3), topics);
var publishingMessage = new PublishingMessage(message, settings, false);
Assert.AreEqual(publishingMessage.MessageType, MessageType.Normal);
@@ -57,7 +58,7 @@
{
[topic] = true
};
- var settings = new PublishingSettings(ClientId, Endpoints,
+ var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3), TimeSpan.FromSeconds(3), topics);
var publishingMessage = new PublishingMessage(message, settings, false);
Assert.AreEqual(publishingMessage.MessageType, MessageType.Fifo);
@@ -75,7 +76,7 @@
{
[topic] = true
};
- var settings = new PublishingSettings(ClientId, Endpoints,
+ var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3),
TimeSpan.FromSeconds(3), topics);
var publishingMessage = new PublishingMessage(message, settings, false);
@@ -93,7 +94,7 @@
{
[topic] = true
};
- var settings = new PublishingSettings(ClientId, Endpoints,
+ var settings = new PublishingSettings(Namespace, ClientId, Endpoints,
ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(3),
TimeSpan.FromSeconds(3), topics);
var publishingMessage = new PublishingMessage(message, settings, true);
diff --git a/csharp/tests/PushConsumerBuilderTest.cs b/csharp/tests/PushConsumerBuilderTest.cs
new file mode 100644
index 0000000..ba33a1a
--- /dev/null
+++ b/csharp/tests/PushConsumerBuilderTest.cs
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class PushConsumerBuilderTest
+ {
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetClientConfigWithNull()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetClientConfig(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetConsumerGroupWithNull()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetConsumerGroup(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetConsumerGroupWithSpecialChar()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetConsumerGroup("#.testGroup#");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestBuildWithoutExpressions()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetSubscriptionExpression(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestBuildWithEmptyExpressions()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetSubscriptionExpression(new Dictionary<string, FilterExpression>());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestBuildWithNullMessageListener()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetMessageListener(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestNegativeMaxCacheMessageCount()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetMaxCacheMessageCount(-1);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestNegativeMaxCacheMessageSizeInBytes()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetMaxCacheMessageSizeInBytes(-1);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestNegativeConsumptionThreadCount()
+ {
+ var builder = new PushConsumer.Builder();
+ builder.SetMaxCacheMessageCount(-1);
+ }
+
+ [TestMethod]
+ public void TestBuild()
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:9876").Build();
+ var subscription = new Dictionary<string, FilterExpression>
+ {{ "fakeTopic", new FilterExpression("*") }};
+ var builder = new PushConsumer.Builder();
+ builder.SetClientConfig(clientConfig).SetSubscriptionExpression(subscription).SetConsumerGroup("testGroup")
+ .SetMessageListener(new TestMessageListener()).SetMaxCacheMessageCount(10)
+ .SetMaxCacheMessageSizeInBytes(10).SetConsumptionThreadCount(10).Build();
+ }
+
+ private class TestMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ // Handle the received message and return consume result.
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
new file mode 100644
index 0000000..c8a383c
--- /dev/null
+++ b/csharp/tests/PushConsumerTest.cs
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class PushConsumerTest
+ {
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestSubscribeBeforeStartup()
+ {
+ var pushConsumer = CreatePushConsumer();
+ await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void TestUnsubscribeBeforeStartup()
+ {
+ var pushConsumer = CreatePushConsumer();
+ pushConsumer.Unsubscribe("testTopic");
+ }
+
+ [TestMethod]
+ public async Task TestQueryAssignment()
+ {
+ var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+ var queryAssignmentResponse = CreateQueryAssignmentResponse();
+ var queryAssignmentInvocation =
+ new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+ queryAssignmentResponse, metadata);
+
+ SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+ await pushConsumer.QueryAssignment("testTopic");
+ }
+
+ [TestMethod]
+ public async Task TestScanAssignments()
+ {
+ var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+ var queryAssignmentResponse = CreateQueryAssignmentResponse(new Proto.Assignment
+ {
+ MessageQueue = queryRouteResponse.MessageQueues[0]
+ });
+ var queryAssignmentInvocation =
+ new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+ queryAssignmentResponse, metadata);
+
+ SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+
+ pushConsumer.State = State.Running;
+ await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+ pushConsumer.ScanAssignments();
+ }
+
+ [TestMethod]
+ public async Task TestScanAssignmentsWithoutResults()
+ {
+ var (pushConsumer, mockClientManager, queryRouteResponse, metadata) = SetupMockConsumer();
+
+ var queryAssignmentResponse = CreateQueryAssignmentResponse();
+ var queryAssignmentInvocation =
+ new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(null,
+ queryAssignmentResponse, metadata);
+
+ SetupMockClientManager(mockClientManager, queryRouteResponse, queryAssignmentInvocation, metadata);
+
+ pushConsumer.State = State.Running;
+ await pushConsumer.Subscribe("testTopic", new FilterExpression("*"));
+ pushConsumer.ScanAssignments();
+ }
+
+ private PushConsumer CreatePushConsumer()
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1")
+ .Build();
+ return new PushConsumer(clientConfig, "testGroup",
+ new ConcurrentDictionary<string, FilterExpression>(), new TestMessageListener(),
+ 10, 10, 1);
+ }
+
+ private class TestMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+
+ private class MockClientStreamWriter<T> : IClientStreamWriter<T>
+ {
+ public Task WriteAsync(T message)
+ {
+ return Task.CompletedTask;
+ }
+
+ public WriteOptions WriteOptions { get; set; }
+
+ public Task CompleteAsync()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private class MockAsyncStreamReader<T> : IAsyncStreamReader<T>
+ {
+ public Task<bool> MoveNext(CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+
+ public T Current => throw new NotImplementedException();
+ }
+
+ private (PushConsumer, Mock<IClientManager>, Proto.QueryRouteResponse, Metadata) SetupMockConsumer()
+ {
+ var pushConsumer = CreatePushConsumer();
+ var metadata = pushConsumer.Sign();
+
+ var mq = new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = "broker0",
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = "127.0.0.1", Port = 8080 } }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = "testNamespace",
+ Name = "testTopic",
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ };
+
+ var queryRouteResponse = new Proto.QueryRouteResponse
+ {
+ Status = new Proto.Status { Code = Proto.Code.Ok },
+ MessageQueues = { mq }
+ };
+
+ var mockClientManager = new Mock<IClientManager>();
+ pushConsumer.SetClientManager(mockClientManager.Object);
+ return (pushConsumer, mockClientManager, queryRouteResponse, metadata);
+ }
+
+ private Proto.QueryAssignmentResponse CreateQueryAssignmentResponse(params Proto.Assignment[] assignments)
+ {
+ return new Proto.QueryAssignmentResponse
+ {
+ Status = new Proto.Status { Code = Proto.Code.Ok },
+ Assignments = { assignments }
+ };
+ }
+
+ private void SetupMockClientManager(Mock<IClientManager> mockClientManager,
+ Proto.QueryRouteResponse queryRouteResponse,
+ RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse> queryAssignmentInvocation,
+ Metadata metadata)
+ {
+ var queryRouteInvocation = new RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>(null,
+ queryRouteResponse, metadata);
+
+ mockClientManager.Setup(cm =>
+ cm.QueryRoute(It.IsAny<Endpoints>(), It.IsAny<Proto.QueryRouteRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(queryRouteInvocation));
+
+ var mockCall = new AsyncDuplexStreamingCall<Proto.TelemetryCommand, Proto.TelemetryCommand>(
+ new MockClientStreamWriter<Proto.TelemetryCommand>(),
+ new MockAsyncStreamReader<Proto.TelemetryCommand>(),
+ null, null, null, null);
+
+ mockClientManager.Setup(cm =>
+ cm.QueryAssignment(It.IsAny<Endpoints>(), It.IsAny<Proto.QueryAssignmentRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(queryAssignmentInvocation));
+
+ mockClientManager.Setup(cm => cm.Telemetry(It.IsAny<Endpoints>())).Returns(mockCall);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/PushSubscriptionSettingsTest.cs b/csharp/tests/PushSubscriptionSettingsTest.cs
new file mode 100644
index 0000000..e998d0d
--- /dev/null
+++ b/csharp/tests/PushSubscriptionSettingsTest.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Castle.Core.Internal;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class PushSubscriptionSettingsTest
+ {
+ private const string Namespace = "testNamespace";
+ private const string GroupResource = "testConsumerGroup";
+ private const string ClientId = "testClientId";
+ private const string Endpoint = "127.0.0.1:8080";
+ private static readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(3);
+ private static readonly ConcurrentDictionary<string, FilterExpression> SubscriptionExpression =
+ new ConcurrentDictionary<string, FilterExpression>(new Dictionary<string, FilterExpression> { { "testTopic", new FilterExpression("*") } });
+
+ private PushSubscriptionSettings CreateSettings()
+ {
+ return new PushSubscriptionSettings(Namespace, ClientId, new Endpoints(Endpoint), GroupResource, RequestTimeout, SubscriptionExpression);
+ }
+
+ [TestMethod]
+ public void TestToProtobuf()
+ {
+ var pushSubscriptionSettings = CreateSettings();
+ var settings = pushSubscriptionSettings.ToProtobuf();
+
+ Assert.AreEqual(Proto.ClientType.PushConsumer, settings.ClientType);
+ Assert.AreEqual(Duration.FromTimeSpan(RequestTimeout), settings.RequestTimeout);
+ Assert.IsFalse(settings.Subscription.Subscriptions.Count == 0);
+
+ var subscription = settings.Subscription;
+ Assert.AreEqual(subscription.Group, new Proto.Resource
+ {
+ ResourceNamespace = Namespace,
+ Name = GroupResource
+ });
+
+ Assert.IsFalse(subscription.Fifo);
+
+ var subscriptionsList = subscription.Subscriptions;
+ Assert.AreEqual(1, subscriptionsList.Count);
+
+ var subscriptionEntry = subscriptionsList[0];
+ Assert.AreEqual(Proto.FilterType.Tag, subscriptionEntry.Expression.Type);
+ Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource
+ {
+ ResourceNamespace = Namespace,
+ Name = "testTopic"
+ });
+ }
+
+ [TestMethod]
+ public void TestSync()
+ {
+ var durations = new List<Duration>
+ {
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(2)),
+ Duration.FromTimeSpan(TimeSpan.FromSeconds(3))
+ };
+
+ var customizedBackoff = new Proto.CustomizedBackoff
+ {
+ Next = { durations }
+ };
+
+ var retryPolicy = new Proto.RetryPolicy
+ {
+ CustomizedBackoff = customizedBackoff,
+ MaxAttempts = 3
+ };
+
+ var subscription = new Proto.Subscription
+ {
+ Fifo = true,
+ ReceiveBatchSize = 96,
+ LongPollingTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(60))
+ };
+
+ var settings = new Proto.Settings
+ {
+ Subscription = subscription,
+ BackoffPolicy = retryPolicy
+ };
+
+ var pushSubscriptionSettings = new PushSubscriptionSettings(
+ "fakeNamespace", ClientId, new Endpoints(Endpoint), GroupResource, RequestTimeout,
+ new ConcurrentDictionary<string, FilterExpression>(SubscriptionExpression));
+
+ pushSubscriptionSettings.Sync(settings);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/ResourceTest.cs b/csharp/tests/ResourceTest.cs
new file mode 100644
index 0000000..006deb9
--- /dev/null
+++ b/csharp/tests/ResourceTest.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class ResourceTests
+ {
+ [TestMethod]
+ public void TestGetterAndSetter()
+ {
+ var resource = new Resource("foobar");
+ Assert.AreEqual("foobar", resource.Name);
+ Assert.AreEqual(string.Empty, resource.Namespace);
+
+ resource = new Resource("foo", "bar");
+ Assert.AreEqual("bar", resource.Name);
+ Assert.AreEqual("foo", resource.Namespace);
+ }
+
+ [TestMethod]
+ public void TestToProtobuf()
+ {
+ var resource = new Resource("foo", "bar");
+ var protobuf = resource.ToProtobuf();
+ Assert.AreEqual("foo", protobuf.ResourceNamespace);
+ Assert.AreEqual("bar", protobuf.Name);
+ }
+
+ [TestMethod]
+ public void TestEqual()
+ {
+ var resource0 = new Resource("foo", "bar");
+ var resource1 = new Resource("foo", "bar");
+ Assert.AreEqual(resource0, resource1);
+
+ var resource2 = new Resource("foo0", "bar");
+ Assert.AreNotEqual(resource0, resource2);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/SessionTest.cs b/csharp/tests/SessionTest.cs
new file mode 100644
index 0000000..ec53ef1
--- /dev/null
+++ b/csharp/tests/SessionTest.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class SessionTests
+ {
+ private static Client CreateTestClient()
+ {
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+ return new Producer(clientConfig, new ConcurrentDictionary<string, bool>(), 1, null);
+ }
+
+ [TestMethod]
+ public async Task TestSyncSettings()
+ {
+ var testClient = CreateTestClient();
+ var endpoints = new Endpoints(testClient.GetClientConfig().Endpoints);
+
+ var mockStreamWriter = new Mock<IClientStreamWriter<Proto.TelemetryCommand>>();
+ var mockAsyncStreamReader = new Mock<IAsyncStreamReader<Proto.TelemetryCommand>>();
+ var mockClientManager = new Mock<IClientManager>();
+ var mockGrpcCall = new AsyncDuplexStreamingCall<Proto.TelemetryCommand, Proto.TelemetryCommand>(
+ mockStreamWriter.Object, mockAsyncStreamReader.Object, null, null, null, null);
+
+ mockClientManager.Setup(cm => cm.Telemetry(endpoints)).Returns(mockGrpcCall);
+ var session = new Session(endpoints, mockGrpcCall, testClient);
+
+ var settings = new Proto.Settings();
+ mockStreamWriter.Setup(m => m.WriteAsync(It.Is<Proto.TelemetryCommand>(tc => tc.Settings == settings)))
+ .Returns(Task.CompletedTask);
+ testClient.SetClientManager(mockClientManager.Object);
+
+ await session.SyncSettings(true);
+
+ mockStreamWriter.Verify(m => m.WriteAsync(It.IsAny<Proto.TelemetryCommand>()), Times.Once);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerBuilderTest.cs b/csharp/tests/SimpleConsumerBuilderTest.cs
new file mode 100644
index 0000000..1031f91
--- /dev/null
+++ b/csharp/tests/SimpleConsumerBuilderTest.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class SimpleConsumerBuilderTest
+ {
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetClientConfigurationWithNull()
+ {
+ var builder = new SimpleConsumer.Builder();
+ builder.SetClientConfig(null);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestSetConsumerGroupWithNull()
+ {
+ var builder = new SimpleConsumer.Builder();
+ builder.SetConsumerGroup(null);
+ }
+
+ [TestMethod]
+ public void TestSetAwaitDuration()
+ {
+ var builder = new SimpleConsumer.Builder();
+ builder.SetAwaitDuration(TimeSpan.FromSeconds(5));
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestBuildWithEmptyExpressions()
+ {
+ var builder = new SimpleConsumer.Builder();
+ builder.SetSubscriptionExpression(new Dictionary<string, FilterExpression>());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestBuildWithoutExpressions()
+ {
+ var builder = new SimpleConsumer.Builder();
+ builder.SetSubscriptionExpression(null);
+ }
+
+ [TestMethod]
+ public void TestBuild()
+ {
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:9876").Build();
+ var subscription = new Dictionary<string, FilterExpression>
+ {{ "testTopic", new FilterExpression("*") }};
+ var builder = new SimpleConsumer.Builder();
+ builder.SetClientConfig(clientConfig).SetConsumerGroup("testGroup").
+ SetSubscriptionExpression(subscription).Build();
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs b/csharp/tests/SimpleConsumerTest.cs
new file mode 100644
index 0000000..b8ea64b
--- /dev/null
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Org.Apache.Rocketmq.Error;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class SimpleConsumerTest
+ {
+ // Helper method to mock the client manager and assert exceptions
+ private async Task MockAndAssertAckException<TException>(SimpleConsumer consumer, MessageView messageView, Proto.Code code) where TException : Exception
+ {
+ var mockClientManager = new Mock<IClientManager>();
+ consumer.SetClientManager(mockClientManager.Object);
+
+ var metadata = consumer.Sign();
+ var response = new Proto.AckMessageResponse
+ {
+ Status = new Proto.Status { Code = code }
+ };
+ var invocation = new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(null, response, metadata);
+ mockClientManager.Setup(cm =>
+ cm.AckMessage(It.IsAny<Endpoints>(), It.IsAny<Proto.AckMessageRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(invocation));
+ try
+ {
+ await consumer.Ack(messageView);
+ }
+ catch (Exception e)
+ {
+ Assert.IsInstanceOfType(e, typeof(TException));
+ }
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestReceiveWithoutStart()
+ {
+ var consumer = CreateSimpleConsumer();
+ await consumer.Receive(16, TimeSpan.FromSeconds(15));
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestAckWithoutStart()
+ {
+ var consumer = CreateSimpleConsumer();
+ var messageView = MessageView.FromProtobuf(CreateMessage());
+ await consumer.Ack(messageView);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestSubscribeWithoutStart()
+ {
+ var consumer = CreateSimpleConsumer();
+ await consumer.Subscribe("testTopic", new FilterExpression("*"));
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public void TestUnsubscribeWithoutStart()
+ {
+ var consumer = CreateSimpleConsumer();
+ consumer.Unsubscribe("testTopic");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InternalErrorException))]
+ public async Task TestReceiveWithZeroMaxMessageNum()
+ {
+ var consumer = CreateSimpleConsumer();
+ consumer.State = State.Running;
+ await consumer.Receive(0, TimeSpan.FromSeconds(15));
+ }
+
+ [TestMethod]
+ public async Task TestAck()
+ {
+ var consumer = CreateSimpleConsumer();
+ consumer.State = State.Running;
+ var messageView = CreateMessageView();
+
+ await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.BadRequest);
+ await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.IllegalTopic);
+ await MockAndAssertAckException<BadRequestException>(consumer, messageView,
+ Proto.Code.IllegalConsumerGroup);
+ await MockAndAssertAckException<BadRequestException>(consumer, messageView,
+ Proto.Code.InvalidReceiptHandle);
+ await MockAndAssertAckException<BadRequestException>(consumer, messageView, Proto.Code.ClientIdRequired);
+ await MockAndAssertAckException<UnauthorizedException>(consumer, messageView, Proto.Code.Unauthorized);
+ await MockAndAssertAckException<ForbiddenException>(consumer, messageView, Proto.Code.Forbidden);
+ await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.NotFound);
+ await MockAndAssertAckException<NotFoundException>(consumer, messageView, Proto.Code.TopicNotFound);
+ await MockAndAssertAckException<TooManyRequestsException>(consumer, messageView,
+ Proto.Code.TooManyRequests);
+ await MockAndAssertAckException<InternalErrorException>(consumer, messageView, Proto.Code.InternalError);
+ await MockAndAssertAckException<InternalErrorException>(consumer, messageView,
+ Proto.Code.InternalServerError);
+ await MockAndAssertAckException<ProxyTimeoutException>(consumer, messageView, Proto.Code.ProxyTimeout);
+ await MockAndAssertAckException<UnsupportedException>(consumer, messageView, Proto.Code.Unsupported);
+ }
+
+ [TestMethod]
+ public async Task TestChangeInvisibleDuration()
+ {
+ var consumer = CreateSimpleConsumer();
+ consumer.State = State.Running;
+ var messageView = CreateMessageView();
+ var invisibleDuration = TimeSpan.FromSeconds(3);
+
+ var mockClientManager = new Mock<IClientManager>();
+ consumer.SetClientManager(mockClientManager.Object);
+ var metadata = consumer.Sign();
+
+ var response = new Proto.ChangeInvisibleDurationResponse
+ {
+ Status = new Proto.Status { Code = Proto.Code.Ok }
+ };
+ var invocation =
+ new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
+ response, metadata);
+ mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(invocation));
+ await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
+
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.BadRequest);
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.IllegalTopic);
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.IllegalConsumerGroup);
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.IllegalInvisibleTime);
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.InvalidReceiptHandle);
+ await MockAndAssertChangeInvisibleDurationException<BadRequestException>(consumer, messageView,
+ invisibleDuration, Proto.Code.ClientIdRequired);
+ await MockAndAssertChangeInvisibleDurationException<UnauthorizedException>(consumer, messageView,
+ invisibleDuration, Proto.Code.Unauthorized);
+ await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
+ invisibleDuration, Proto.Code.NotFound);
+ await MockAndAssertChangeInvisibleDurationException<NotFoundException>(consumer, messageView,
+ invisibleDuration, Proto.Code.TopicNotFound);
+ await MockAndAssertChangeInvisibleDurationException<TooManyRequestsException>(consumer, messageView,
+ invisibleDuration, Proto.Code.TooManyRequests);
+ await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
+ invisibleDuration, Proto.Code.InternalError);
+ await MockAndAssertChangeInvisibleDurationException<InternalErrorException>(consumer, messageView,
+ invisibleDuration, Proto.Code.InternalServerError);
+ await MockAndAssertChangeInvisibleDurationException<ProxyTimeoutException>(consumer, messageView,
+ invisibleDuration, Proto.Code.ProxyTimeout);
+ await MockAndAssertChangeInvisibleDurationException<UnsupportedException>(consumer, messageView,
+ invisibleDuration, Proto.Code.Unsupported);
+ }
+
+ private async Task MockAndAssertChangeInvisibleDurationException<TException>(SimpleConsumer consumer,
+ MessageView messageView, TimeSpan invisibleDuration, Proto.Code code) where TException : Exception
+ {
+ var mockClientManager = new Mock<IClientManager>();
+ consumer.SetClientManager(mockClientManager.Object);
+
+ var metadata = consumer.Sign();
+ var response = new Proto.ChangeInvisibleDurationResponse
+ {
+ Status = new Proto.Status { Code = code }
+ };
+ var invocation =
+ new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(null,
+ response, metadata);
+ mockClientManager.Setup(cm => cm.ChangeInvisibleDuration(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.ChangeInvisibleDurationRequest>(), It.IsAny<TimeSpan>()))
+ .Returns(Task.FromResult(invocation));
+ try
+ {
+ await consumer.ChangeInvisibleDuration(messageView, invisibleDuration);
+ }
+ catch (Exception e)
+ {
+ Assert.IsInstanceOfType(e, typeof(TException));
+ }
+ }
+
+ private SimpleConsumer CreateSimpleConsumer()
+ {
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+ var subscription = new Dictionary<string, FilterExpression> { { "testTopic", new FilterExpression("*") } };
+ var consumer =
+ new SimpleConsumer(clientConfig, "testConsumerGroup", TimeSpan.FromSeconds(15), subscription);
+ return consumer;
+ }
+
+ private Proto.Message CreateMessage()
+ {
+ var digest = new Proto.Digest { Type = Proto.DigestType.Crc32, Checksum = "9EF61F95" };
+ var systemProperties = new Proto.SystemProperties
+ {
+ MessageType = Proto.MessageType.Normal,
+ MessageId = MessageIdGenerator.GetInstance().Next(),
+ BornHost = "127.0.0.1",
+ BodyDigest = digest,
+ BornTimestamp = new Timestamp()
+ };
+ var body = ByteString.CopyFrom("foobar", Encoding.UTF8);
+ var message = new Proto.Message
+ {
+ SystemProperties = systemProperties,
+ Topic = new Proto.Resource { Name = "testTopic" },
+ Body = body
+ };
+ return message;
+ }
+
+ private MessageView CreateMessageView()
+ {
+ var message = CreateMessage();
+ var messageQueue = new MessageQueue(new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = "broker0",
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses =
+ {
+ new Proto.Address
+ {
+ Host = "127.0.0.1",
+ Port = 8080
+ }
+ }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource { ResourceNamespace = "testNamespace", Name = "testTopic" }
+ });
+ return MessageView.FromProtobuf(message, messageQueue);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/SimpleSubscriptionSettingsTest.cs b/csharp/tests/SimpleSubscriptionSettingsTest.cs
new file mode 100644
index 0000000..46cd8aa
--- /dev/null
+++ b/csharp/tests/SimpleSubscriptionSettingsTest.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Rocketmq.V2;
+using Castle.Core.Internal;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+using Endpoints = Org.Apache.Rocketmq.Endpoints;
+using FilterExpression = Org.Apache.Rocketmq.FilterExpression;
+
+namespace tests
+{
+ [TestClass]
+ public class SimpleSubscriptionSettingsTest
+ {
+ private const string TestNamespace = "testNamespace";
+ private const string GroupResource = "testConsumerGroup";
+ private const string ClientId = "testClientId";
+ private const string TestTopic = "testTopic";
+ private static readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(3);
+ private static readonly TimeSpan LongPollingTimeout = TimeSpan.FromSeconds(15);
+ private SimpleSubscriptionSettings _simpleSubscriptionSettings;
+
+ [TestInitialize]
+ public void Setup()
+ {
+ var subscriptionExpression = new ConcurrentDictionary<string, FilterExpression>(
+ new Dictionary<string, FilterExpression> { { TestTopic, new FilterExpression("*") } });
+ _simpleSubscriptionSettings = new SimpleSubscriptionSettings(
+ TestNamespace,
+ ClientId,
+ new Endpoints("127.0.0.1:9876"),
+ GroupResource,
+ RequestTimeout,
+ LongPollingTimeout,
+ subscriptionExpression
+ );
+ }
+
+ [TestMethod]
+ public void TestToProtobuf()
+ {
+ var settings = _simpleSubscriptionSettings.ToProtobuf();
+
+ Assert.AreEqual(Proto.ClientType.SimpleConsumer, settings.ClientType);
+ Assert.AreEqual(Duration.FromTimeSpan(RequestTimeout), settings.RequestTimeout);
+ Assert.IsFalse(settings.Subscription.Subscriptions.Count == 0);
+
+ var subscription = settings.Subscription;
+
+ Assert.AreEqual(subscription.Group, new Proto.Resource
+ {
+ ResourceNamespace = TestNamespace,
+ Name = GroupResource
+ });
+ Assert.IsFalse(subscription.Fifo);
+ Assert.AreEqual(Duration.FromTimeSpan(LongPollingTimeout), subscription.LongPollingTimeout);
+
+ var subscriptionsList = subscription.Subscriptions;
+ Assert.AreEqual(1, subscriptionsList.Count);
+
+ var subscriptionEntry = subscriptionsList[0];
+ Assert.AreEqual(FilterType.Tag, subscriptionEntry.Expression.Type);
+ Assert.AreEqual(subscriptionEntry.Topic, new Proto.Resource
+ {
+ ResourceNamespace = TestNamespace,
+ Name = TestTopic
+ });
+ }
+
+ [TestMethod]
+ public void TestSync()
+ {
+ var subscription = new Proto.Subscription
+ {
+ Fifo = true
+ };
+
+ var settings = new Proto.Settings
+ {
+ Subscription = subscription
+ };
+
+ _simpleSubscriptionSettings.Sync(settings);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/StatusCheckerTest.cs b/csharp/tests/StatusCheckerTest.cs
new file mode 100644
index 0000000..b64ccc7
--- /dev/null
+++ b/csharp/tests/StatusCheckerTest.cs
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Org.Apache.Rocketmq.Error;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class StatusCheckerTests
+ {
+ [TestMethod]
+ public void TestCheckStatusOk()
+ {
+ var status = new Proto.Status { Code = Proto.Code.Ok, Message = "OK" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Exception exception = null;
+ try
+ {
+ StatusChecker.Check(status, request, requestId);
+ }
+ catch (Exception ex)
+ {
+ exception = ex;
+ }
+
+ Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception);
+ }
+
+ [TestMethod]
+ public void TestCheckStatusMultipleResults()
+ {
+ var status = new Proto.Status { Code = Proto.Code.MultipleResults, Message = "Multiple Results" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Exception exception = null;
+ try
+ {
+ StatusChecker.Check(status, request, requestId);
+ }
+ catch (Exception ex)
+ {
+ exception = ex;
+ }
+
+ Assert.IsNull(exception, "Expected no exception to be thrown, but got: " + exception);
+ }
+
+ [TestMethod]
+ public void TestCheckStatusBadRequest()
+ {
+ var status = new Proto.Status { Code = Proto.Code.BadRequest, Message = "Bad Request" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<BadRequestException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusUnauthorized()
+ {
+ var status = new Proto.Status { Code = Proto.Code.Unauthorized, Message = "Unauthorized" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<UnauthorizedException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusPaymentRequired()
+ {
+ var status = new Proto.Status { Code = Proto.Code.PaymentRequired, Message = "Payment Required" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<PaymentRequiredException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusForbidden()
+ {
+ var status = new Proto.Status { Code = Proto.Code.Forbidden, Message = "Forbidden" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<ForbiddenException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusMessageNotFoundForNonReceiveRequest()
+ {
+ var status = new Proto.Status { Code = Proto.Code.MessageNotFound, Message = "Message Not Found" };
+ var request = new Proto.SendMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<NotFoundException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusNotFound()
+ {
+ var status = new Proto.Status { Code = Proto.Code.NotFound, Message = "Not Found" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<NotFoundException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusPayloadTooLarge()
+ {
+ var status = new Proto.Status { Code = Proto.Code.PayloadTooLarge, Message = "Payload Too Large" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<PayloadTooLargeException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusTooManyRequests()
+ {
+ var status = new Proto.Status { Code = Proto.Code.TooManyRequests, Message = "Too Many Requests" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<TooManyRequestsException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusRequestHeaderFieldsTooLarge()
+ {
+ var status = new Proto.Status { Code = Proto.Code.RequestHeaderFieldsTooLarge, Message = "Request Header Fields Too Large" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<RequestHeaderFieldsTooLargeException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusInternalError()
+ {
+ var status = new Proto.Status { Code = Proto.Code.InternalError, Message = "Internal Error" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<InternalErrorException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusProxyTimeout()
+ {
+ var status = new Proto.Status { Code = Proto.Code.ProxyTimeout, Message = "Proxy Timeout" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<ProxyTimeoutException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusUnsupported()
+ {
+ var status = new Proto.Status { Code = Proto.Code.Unsupported, Message = "Unsupported" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<UnsupportedException>(() => StatusChecker.Check(status, request, requestId));
+ }
+
+ [TestMethod]
+ public void TestCheckStatusUnrecognized()
+ {
+ var status = new Proto.Status { Code = (Proto.Code)999, Message = "Unrecognized" };
+ var request = new Proto.ReceiveMessageRequest();
+ var requestId = "requestId";
+
+ Assert.ThrowsException<UnsupportedException>(() => StatusChecker.Check(status, request, requestId));
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/TransactionTest.cs b/csharp/tests/TransactionTest.cs
new file mode 100644
index 0000000..231acd2
--- /dev/null
+++ b/csharp/tests/TransactionTest.cs
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ [TestClass]
+ public class TransactionTest
+ {
+ private const string FakeTag = "fakeTag";
+ private const string FakeTopic = "fakeTopic";
+ private const string FakeMsgKey = "fakeMsgKey";
+ private const string BrokerName = "broker0";
+ private const string Host = "127.0.0.1";
+ private const int Port = 8080;
+ private Producer _producer;
+ private byte[] _bytes;
+
+ [TestInitialize]
+ public void SetUp()
+ {
+ _producer = CreateTestClient();
+ _bytes = Encoding.UTF8.GetBytes("fakeBytes");
+ }
+
+ [TestMethod]
+ public void TestTryAddMessage()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ var publishingMessage = transaction.TryAddMessage(message);
+ Assert.AreEqual(MessageType.Transaction, publishingMessage.MessageType);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestTryAddExceededMessages()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ transaction.TryAddMessage(message);
+ transaction.TryAddMessage(message);
+ }
+
+ [TestMethod]
+ public void TestTryAddReceipt()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ var publishingMessage = transaction.TryAddMessage(message);
+ var mq0 = CreateMessageQueue();
+
+ var sendReceipt = CreateSendReceipt(mq0);
+ transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestTryAddReceiptNotContained()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ var publishingMessage = new PublishingMessage(message, new PublishingSettings("fakeNamespace",
+ "fakeClientId", new Endpoints("fakeEndpoints"), new Mock<IRetryPolicy>().Object,
+ TimeSpan.FromSeconds(10), new ConcurrentDictionary<string, bool>()), true);
+ var mq0 = CreateMessageQueue();
+
+ var sendReceipt = CreateSendReceipt(mq0);
+ transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestCommitWithNoReceipts()
+ {
+ var transaction = new Transaction(_producer);
+ await transaction.Commit();
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(InvalidOperationException))]
+ public async Task TestRollbackWithNoReceipts()
+ {
+ var transaction = new Transaction(_producer);
+ await transaction.Rollback();
+ }
+
+ [TestMethod]
+ public async Task TestCommit()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ var publishingMessage = transaction.TryAddMessage(message);
+ var mq0 = CreateMessageQueue();
+
+ var sendReceipt = CreateSendReceipt(mq0);
+ transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+
+ var mockClientManager = new Mock<IClientManager>();
+ _producer.SetClientManager(mockClientManager.Object);
+
+ SetupCommitOrRollback(mockClientManager, true);
+
+ await transaction.Commit();
+ }
+
+ [TestMethod]
+ public async Task TestRollback()
+ {
+ var transaction = new Transaction(_producer);
+ var message = CreateMessage();
+ var publishingMessage = transaction.TryAddMessage(message);
+ var mq0 = CreateMessageQueue();
+
+ var sendReceipt = CreateSendReceipt(mq0);
+ transaction.TryAddReceipt(publishingMessage, sendReceipt.First());
+
+ var mockClientManager = new Mock<IClientManager>();
+ _producer.SetClientManager(mockClientManager.Object);
+
+ SetupCommitOrRollback(mockClientManager, false);
+
+ await transaction.Rollback();
+ }
+
+ private Producer CreateTestClient()
+ {
+ var clientConfig = new ClientConfig.Builder().SetEndpoints("127.0.0.1:9876").Build();
+ return new Producer(clientConfig, new ConcurrentDictionary<string, bool>(),
+ 1, null);
+ }
+
+ private Message CreateMessage()
+ {
+ return new Message.Builder()
+ .SetTopic(FakeTopic)
+ .SetBody(_bytes)
+ .SetTag(FakeTag)
+ .SetKeys(FakeMsgKey)
+ .Build();
+ }
+
+ private Proto.MessageQueue CreateMessageQueue()
+ {
+ return new Proto.MessageQueue
+ {
+ Broker = new Proto.Broker
+ {
+ Name = BrokerName,
+ Endpoints = new Proto.Endpoints
+ {
+ Scheme = Proto.AddressScheme.Ipv4,
+ Addresses = { new Proto.Address { Host = Host, Port = Port } }
+ }
+ },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Topic = new Proto.Resource { ResourceNamespace = "foo-bar-namespace", Name = "TestTopic" }
+ };
+ }
+
+ private IEnumerable<SendReceipt> CreateSendReceipt(Proto.MessageQueue mq0)
+ {
+ var metadata = _producer.Sign();
+ var sendResultEntry = new Proto.SendResultEntry
+ {
+ MessageId = "fakeMsgId",
+ TransactionId = "fakeTxId",
+ Status = new Proto.Status { Code = Proto.Code.Ok },
+ Offset = 1
+ };
+ var sendMessageResponse = new Proto.SendMessageResponse
+ {
+ Status = new Proto.Status { Code = Proto.Code.Ok },
+ Entries = { sendResultEntry }
+ };
+ var invocation = new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(null, sendMessageResponse, metadata);
+ return SendReceipt.ProcessSendMessageResponse(new MessageQueue(mq0), invocation);
+ }
+
+ private void SetupCommitOrRollback(Mock<IClientManager> mockClientManager, bool commit)
+ {
+ var endTransactionMetadata = _producer.Sign();
+ var endTransactionResponse = new Proto.EndTransactionResponse
+ {
+ Status = new Proto.Status { Code = Proto.Code.Ok }
+ };
+ var endTransactionInvocation = new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(null,
+ endTransactionResponse, endTransactionMetadata);
+ mockClientManager.Setup(cm => cm.EndTransaction(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.EndTransactionRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(endTransactionInvocation));
+
+ _producer.State = State.Running;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/tests.csproj b/csharp/tests/tests.csproj
index c16faa0..6cb51b3 100644
--- a/csharp/tests/tests.csproj
+++ b/csharp/tests/tests.csproj
@@ -7,6 +7,8 @@
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Contrib.Grpc.Core.M1" Version="2.46.7" />
+ <PackageReference Include="Grpc.Core" Version="2.46.6" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Moq" Version="4.20.70" />
diff --git a/style/codespell/ignore_words.txt b/style/codespell/ignore_words.txt
index 583d907..f997894 100644
--- a/style/codespell/ignore_words.txt
+++ b/style/codespell/ignore_words.txt
@@ -1,4 +1,6 @@
# guava
errorprone
# rust keyword
-crate
\ No newline at end of file
+crate
+# csharp keyword
+atleast