[RABBITMQ] Reactify some RabbitMQ related health checks
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
index ceb3c8b..8478561 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
@@ -34,7 +34,6 @@
import reactor.core.publisher.Mono;
public class RabbitEventBusConsumerHealthCheck implements HealthCheck {
- public static final ComponentName COMPONENT_NAME = new ComponentName("EventbusConsumersHealthCheck");
public static final String COMPONENT = "EventbusConsumers";
private final RabbitMQEventBus eventBus;
@@ -57,7 +56,6 @@
public Mono<Result> check() {
return connectionPool.getResilientConnection()
.map(Throwing.function(connection -> {
-
try (Channel channel = connection.createChannel()) {
return check(channel);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
index c3b3a44..e66fc7a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
@@ -27,10 +27,10 @@
import org.apache.james.core.healthcheck.ComponentName;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.core.healthcheck.Result;
-import org.apache.james.util.ReactorUtils;
import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -58,14 +58,13 @@
@Override
public Mono<Result> check() {
return connectionPool.getResilientConnection()
- .map(Throwing.function(connection -> {
- try (Channel channel = connection.createChannel()) {
- return check(channel);
- }
- })).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ .flatMap(connection -> Mono.using(connection::createChannel,
+ channel -> check(connection, channel),
+ Throwing.consumer(Channel::close)))
+ .subscribeOn(Schedulers.boundedElastic());
}
- private Result check(Channel channel) {
+ private Mono<Result> check(Connection connection, Channel channel) {
boolean queueWithoutConsumers = queueFactory.listCreatedMailQueues()
.stream()
.map(org.apache.james.queue.api.MailQueueName::asString)
@@ -74,14 +73,10 @@
.anyMatch(Throwing.predicate(queue -> channel.consumerCount(queue) == 0));
if (queueWithoutConsumers) {
- reconnectionHandlers.forEach(r -> connectionPool.getResilientConnection()
- .flatMap(c -> Mono.from(r.handleReconnection(c)))
- .subscribeOn(Schedulers.boundedElastic())
- .block());
-
- return Result.degraded(COMPONENT, "No consumers");
+ return Mono.fromRunnable(() -> reconnectionHandlers.forEach(r -> r.handleReconnection(connection)))
+ .thenReturn(Result.degraded(COMPONENT, "No consumers"));
} else {
- return Result.healthy(COMPONENT);
+ return Mono.just(Result.healthy(COMPONENT));
}
}
}