JAMES-3117 SimpleConnectionPool::getOpenConnection was blocking on RabbitMQHealthCheck parallel thread

Subscribe directly SimpleConnectionPool::getOpenConnection on(ReactorUtils.BLOCKING_CALL_WRAPPER) to better avoid blocking call in other places.
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index ea3e60b..06c7261 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -32,6 +32,7 @@
 import jakarta.inject.Inject;
 
 import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,33 +120,36 @@
     }
 
     public Mono<Connection> getResilientConnection() {
-        return Mono.defer(this::getOpenConnection)
+        return getOpenConnection()
             .retryWhen(Retry.backoff(configuration.getNumRetries(), configuration.getInitialDelay()).scheduler(Schedulers.boundedElastic()));
     }
 
     private Mono<Connection> getOpenConnection() {
-        Connection previous = connectionReference.get();
-        Connection current = Optional.ofNullable(previous)
-            .filter(Connection::isOpen)
-            .orElseGet(connectionFactory::create);
-        boolean updated = connectionReference.compareAndSet(previous, current);
-        if (updated) {
-            if (previous != null && previous != current) {
-                LOGGER.warn("Replacing current RabbitMQ connection...");
-                return Flux.fromIterable(reconnectionHandlers)
-                    .concatMap(handler -> handler.handleReconnection(current))
-                    .then()
-                    .thenReturn(current);
-            }
-            return Mono.just(current);
-        } else {
-            try {
-                current.close();
-            } catch (IOException e) {
-                //error below
-            }
-            return Mono.error(new RuntimeException("unable to create and register a new Connection"));
-        }
+        return Mono.defer(() -> {
+                Connection previous = connectionReference.get();
+                Connection current = Optional.ofNullable(previous)
+                    .filter(Connection::isOpen)
+                    .orElseGet(connectionFactory::create);
+                boolean updated = connectionReference.compareAndSet(previous, current);
+                if (updated) {
+                    if (previous != null && previous != current) {
+                        LOGGER.warn("Replacing current RabbitMQ connection...");
+                        return Flux.fromIterable(reconnectionHandlers)
+                            .concatMap(handler -> handler.handleReconnection(current))
+                            .then()
+                            .thenReturn(current);
+                    }
+                    return Mono.just(current);
+                } else {
+                    try {
+                        current.close();
+                    } catch (IOException e) {
+                        //error below
+                    }
+                    return Mono.error(new RuntimeException("unable to create and register a new Connection"));
+                }
+            })
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
     }
 
     public Mono<Boolean> tryConnection() {