[RABBITMQ] Reactify some RabbitMQ related health checks
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
index ceb3c8b..8478561 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
@@ -34,7 +34,6 @@
 import reactor.core.publisher.Mono;
 
 public class RabbitEventBusConsumerHealthCheck implements HealthCheck {
-    public static final ComponentName COMPONENT_NAME = new ComponentName("EventbusConsumersHealthCheck");
     public static final String COMPONENT = "EventbusConsumers";
 
     private final RabbitMQEventBus eventBus;
@@ -57,7 +56,6 @@
     public Mono<Result> check() {
         return connectionPool.getResilientConnection()
             .map(Throwing.function(connection -> {
-
                 try (Channel channel = connection.createChannel()) {
                     return check(channel);
                 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
index c3b3a44..e66fc7a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
@@ -27,10 +27,10 @@
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
-import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -58,14 +58,13 @@
     @Override
     public Mono<Result> check() {
         return connectionPool.getResilientConnection()
-            .map(Throwing.function(connection -> {
-                try (Channel channel = connection.createChannel()) {
-                    return check(channel);
-                }
-            })).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+            .flatMap(connection -> Mono.using(connection::createChannel,
+                channel -> check(connection, channel),
+                Throwing.consumer(Channel::close)))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
-    private Result check(Channel channel) {
+    private Mono<Result> check(Connection connection, Channel channel) {
         boolean queueWithoutConsumers = queueFactory.listCreatedMailQueues()
             .stream()
             .map(org.apache.james.queue.api.MailQueueName::asString)
@@ -74,14 +73,10 @@
             .anyMatch(Throwing.predicate(queue -> channel.consumerCount(queue) == 0));
 
         if (queueWithoutConsumers) {
-            reconnectionHandlers.forEach(r -> connectionPool.getResilientConnection()
-                .flatMap(c -> Mono.from(r.handleReconnection(c)))
-                .subscribeOn(Schedulers.boundedElastic())
-                .block());
-
-            return Result.degraded(COMPONENT, "No consumers");
+            return Mono.fromRunnable(() -> reconnectionHandlers.forEach(r -> r.handleReconnection(connection)))
+                .thenReturn(Result.degraded(COMPONENT, "No consumers"));
         } else {
-            return Result.healthy(COMPONENT);
+            return Mono.just(Result.healthy(COMPONENT));
         }
     }
 }