JAMES-3117 SimpleConnectionPool::getOpenConnection was blocking on RabbitMQHealthCheck parallel thread
Subscribe directly SimpleConnectionPool::getOpenConnection on(ReactorUtils.BLOCKING_CALL_WRAPPER) to better avoid blocking call in other places.
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index ea3e60b..06c7261 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -32,6 +32,7 @@
import jakarta.inject.Inject;
import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,33 +120,36 @@
}
public Mono<Connection> getResilientConnection() {
- return Mono.defer(this::getOpenConnection)
+ return getOpenConnection()
.retryWhen(Retry.backoff(configuration.getNumRetries(), configuration.getInitialDelay()).scheduler(Schedulers.boundedElastic()));
}
private Mono<Connection> getOpenConnection() {
- Connection previous = connectionReference.get();
- Connection current = Optional.ofNullable(previous)
- .filter(Connection::isOpen)
- .orElseGet(connectionFactory::create);
- boolean updated = connectionReference.compareAndSet(previous, current);
- if (updated) {
- if (previous != null && previous != current) {
- LOGGER.warn("Replacing current RabbitMQ connection...");
- return Flux.fromIterable(reconnectionHandlers)
- .concatMap(handler -> handler.handleReconnection(current))
- .then()
- .thenReturn(current);
- }
- return Mono.just(current);
- } else {
- try {
- current.close();
- } catch (IOException e) {
- //error below
- }
- return Mono.error(new RuntimeException("unable to create and register a new Connection"));
- }
+ return Mono.defer(() -> {
+ Connection previous = connectionReference.get();
+ Connection current = Optional.ofNullable(previous)
+ .filter(Connection::isOpen)
+ .orElseGet(connectionFactory::create);
+ boolean updated = connectionReference.compareAndSet(previous, current);
+ if (updated) {
+ if (previous != null && previous != current) {
+ LOGGER.warn("Replacing current RabbitMQ connection...");
+ return Flux.fromIterable(reconnectionHandlers)
+ .concatMap(handler -> handler.handleReconnection(current))
+ .then()
+ .thenReturn(current);
+ }
+ return Mono.just(current);
+ } else {
+ try {
+ current.close();
+ } catch (IOException e) {
+ //error below
+ }
+ return Mono.error(new RuntimeException("unable to create and register a new Connection"));
+ }
+ })
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
}
public Mono<Boolean> tryConnection() {