JAMES-3139 Configuration for SimpleConnectionPool retries
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index b6125f2..6941009 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -38,13 +38,51 @@
import reactor.util.retry.Retry;
public class SimpleConnectionPool implements AutoCloseable {
+ public static class Configuration {
+ @FunctionalInterface
+ public interface RequiresRetries {
+ RequiresInitialDelay retries(int retries);
+ }
+
+ @FunctionalInterface
+ public interface RequiresInitialDelay {
+ Configuration initialDelay(Duration minBorrowDelay);
+ }
+
+ public static final Configuration DEFAULT = builder()
+ .retries(10)
+ .initialDelay(Duration.ofMillis(100));
+
+ public static RequiresRetries builder() {
+ return retries -> initialDelay -> new Configuration(retries, initialDelay);
+ }
+
+ private final int numRetries;
+ private final Duration initialDelay;
+
+ public Configuration(int numRetries, Duration initialDelay) {
+ this.numRetries = numRetries;
+ this.initialDelay = initialDelay;
+ }
+
+ public int getNumRetries() {
+ return numRetries;
+ }
+
+ public Duration getInitialDelay() {
+ return initialDelay;
+ }
+ }
+
private final AtomicReference<Connection> connectionReference;
private final RabbitMQConnectionFactory connectionFactory;
+ private final Configuration configuration;
@Inject
@VisibleForTesting
- public SimpleConnectionPool(RabbitMQConnectionFactory factory) {
+ public SimpleConnectionPool(RabbitMQConnectionFactory factory, Configuration configuration) {
this.connectionFactory = factory;
+ this.configuration = configuration;
this.connectionReference = new AtomicReference<>();
}
@@ -57,14 +95,8 @@
}
public Mono<Connection> getResilientConnection() {
- int numRetries = 10;
- Duration initialDelay = Duration.ofMillis(100);
- return getResilientConnection(numRetries, initialDelay);
- }
-
- public Mono<Connection> getResilientConnection(int numRetries, Duration initialDelay) {
return Mono.defer(this::getOpenConnection)
- .retryWhen(Retry.backoff(numRetries, initialDelay).scheduler(Schedulers.elastic()));
+ .retryWhen(Retry.backoff(configuration.getNumRetries(), configuration.getInitialDelay()).scheduler(Schedulers.elastic()));
}
private Mono<Connection> getOpenConnection() {
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index d5a3769..85fde71 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -139,8 +139,11 @@
dockerRestartPolicy.beforeEach(rabbitMQ);
RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
- connectionPool = new SimpleConnectionPool(connectionFactory);
- channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)),
+ connectionPool = new SimpleConnectionPool(connectionFactory,
+ SimpleConnectionPool.Configuration.builder()
+ .retries(2)
+ .initialDelay(Duration.ofMillis(5)));
+ channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
ReactorRabbitMQChannelPool.Configuration.builder()
.retries(2)
.minBorrowDelay(Duration.ofMillis(5))
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 7923b37..b994c98 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -20,6 +20,7 @@
package org.apache.james.mpt.imapmailbox.rabbitmq.host;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
@@ -73,7 +74,10 @@
public void beforeTest() throws Exception {
super.beforeTest();
- connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
+ connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory(),
+ SimpleConnectionPool.Configuration.builder()
+ .retries(2)
+ .initialDelay(Duration.ofMillis(5)));
reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
ReactorRabbitMQChannelPool.Configuration.DEFAULT);
reactorRabbitMQChannelPool.start();
diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
index 8f42fb6..18ae72e 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
@@ -92,8 +92,6 @@
Multibinder.newSetBinder(binder(), StartUpCheck.class).addBinding().to(CassandraMailQueueViewStartUpCheck.class);
Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
-
- bind(ReactorRabbitMQChannelPool.Configuration.class).toInstance(ReactorRabbitMQChannelPool.Configuration.DEFAULT);
}
@Provides
@@ -177,4 +175,16 @@
public ReceiverProvider provideRabbitMQReceiver(SimpleConnectionPool simpleConnectionPool) {
return () -> RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(simpleConnectionPool.getResilientConnection()));
}
+
+ @Provides
+ @Singleton
+ public SimpleConnectionPool.Configuration provideConnectionPoolConfiguration() {
+ return SimpleConnectionPool.Configuration.DEFAULT;
+ }
+
+ @Provides
+ @Singleton
+ public ReactorRabbitMQChannelPool.Configuration provideChannelPoolConfiguration() {
+ return ReactorRabbitMQChannelPool.Configuration.DEFAULT;
+ }
}