Implement ExponentialBackoffRetryPolicy
diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index 9ad5fbb..edb356a 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -34,7 +34,7 @@
const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ const string endpoints = "foobar.com:8080";
var clientConfig = new ClientConfig(endpoints)
{
CredentialsProvider = credentialsProvider
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index 87f953c..bfca32f 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -30,12 +30,12 @@
internal static async Task QuickStart()
{
- const string accessKey = "5jFk0wK7OU6Uq395";
- const string secretKey = "V1u8z19URHs4o6RQ";
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ const string endpoints = "foobar.com:8080";
var clientConfig = new ClientConfig(endpoints)
{
CredentialsProvider = credentialsProvider
@@ -43,7 +43,7 @@
// In most case, you don't need to create too many producers, single pattern is recommended.
var producer = new Producer(clientConfig);
- const string topic = "lingchu_fifo_topic";
+ const string topic = "yourFifoTopic";
producer.SetTopics(topic);
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch
// the topic route before message publishing.
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 3274ade..09e4dff 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -30,12 +30,12 @@
internal static async Task QuickStart()
{
- const string accessKey = "5jFk0wK7OU6Uq395";
- const string secretKey = "V1u8z19URHs4o6RQ";
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ const string endpoints = "foobar.com:8080";
var clientConfig = new ClientConfig(endpoints)
{
CredentialsProvider = credentialsProvider
@@ -43,7 +43,7 @@
// In most case, you don't need to create too many producers, single pattern is recommended.
var producer = new Producer(clientConfig);
- const string topic = "lingchu_normal_topic";
+ const string topic = "yourNormalTopic";
producer.SetTopics(topic);
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch
// the topic route before message publishing.
diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index 8b33172..10b6114 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -41,7 +41,7 @@
const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ const string endpoints = "foobar.com:8080";
var clientConfig = new ClientConfig(endpoints)
{
CredentialsProvider = credentialsProvider
@@ -49,7 +49,7 @@
// In most case, you don't need to create too many producers, single pattern is recommended.
var producer = new Producer(clientConfig);
- const string topic = "lingchu_transactional_topic";
+ const string topic = "yourTransactionTopic";
producer.SetTopics(topic);
producer.SetTransactionChecker(new TransactionChecker());
diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
index 094c260..ddc4d28 100644
--- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
@@ -1,5 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
-using Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using Google.Protobuf.WellKnownTypes;
namespace Org.Apache.Rocketmq
@@ -28,9 +45,28 @@
public double BackoffMultiplier { get; }
+ public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy)
+ {
+ if (retryPolicy.StrategyCase != Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff)
+ {
+ throw new InvalidOperationException("Strategy must be exponential backoff");
+ }
+
+ return InheritBackoff(retryPolicy.ExponentialBackoff);
+ }
+
+ private IRetryPolicy InheritBackoff(Proto.ExponentialBackoff retryPolicy)
+ {
+ return new ExponentialBackoffRetryPolicy(_maxAttempts, retryPolicy.Initial.ToTimeSpan(),
+ retryPolicy.Max.ToTimeSpan(), retryPolicy.Multiplier);
+ }
+
public TimeSpan GetNextAttemptDelay(int attempt)
{
- return TimeSpan.Zero;
+ var delayMillis = Math.Min(
+ InitialBackoff.TotalMilliseconds * Math.Pow(BackoffMultiplier, 1.0 * (attempt - 1)),
+ MaxBackoff.TotalMilliseconds);
+ return delayMillis < 0 ? TimeSpan.Zero : TimeSpan.FromMilliseconds(delayMillis);
}
public static ExponentialBackoffRetryPolicy ImmediatelyRetryPolicy(int maxAttempts)
@@ -40,7 +76,7 @@
public global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf()
{
- var exponentialBackoff = new ExponentialBackoff
+ var exponentialBackoff = new Proto.ExponentialBackoff
{
Multiplier = (float)BackoffMultiplier,
Max = Duration.FromTimeSpan(MaxBackoff),
diff --git a/csharp/rocketmq-client-csharp/IRetryPolicy.cs b/csharp/rocketmq-client-csharp/IRetryPolicy.cs
index c006b1b..86d280e 100644
--- a/csharp/rocketmq-client-csharp/IRetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/IRetryPolicy.cs
@@ -17,6 +17,7 @@
using System;
using Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -43,5 +44,7 @@
/// </summary>
/// <returns></returns>
RetryPolicy ToProtobuf();
+
+ IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy);
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index b9f8f45..17830a0 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -19,18 +19,21 @@
using System.Collections.Concurrent;
using System.Linq;
using Google.Protobuf.WellKnownTypes;
+using NLog;
using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class PublishingSettings : Settings
{
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
private volatile bool _validateMessageType = true;
public PublishingSettings(string clientId, Endpoints endpoints, ExponentialBackoffRetryPolicy retryPolicy,
- TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, endpoints,
- retryPolicy, requestTimeout)
+ TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer,
+ endpoints, retryPolicy, requestTimeout)
{
Topics = topics;
}
@@ -49,7 +52,16 @@
public override void Sync(Proto::Settings settings)
{
- // TODO
+ if (Proto.Settings.PubSubOneofCase.Publishing != settings.PubSubCase)
+ {
+ Logger.Error($"[Bug] Issued settings does not match with the client type, clientId={ClientId}, " +
+ $"pubSubCase={settings.PubSubCase}, clientType={ClientType}");
+ return;
+ }
+
+ RetryPolicy = RetryPolicy.InheritBackoff(settings.BackoffPolicy);
+ _validateMessageType = settings.Publishing.ValidateMessageType;
+ _maxBodySizeBytes = settings.Publishing.MaxBodySize;
}
public override Proto.Settings ToProtobuf()
diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
index a6a409d..c83cca7 100644
--- a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
+++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
@@ -44,7 +44,11 @@
public override void Sync(Proto::Settings settings)
{
- // TODO
+ if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase)
+ {
+ Logger.Error($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " +
+ $"pubSubCase={settings.PubSubCase}, clientType={ClientType}");
+ }
}
public override Proto.Settings ToProtobuf()