[REFACTORING] Dequeuer should not aggressively enforce a scheduler
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 0241c46..d3f0e66 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -42,7 +42,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
@@ -105,21 +104,20 @@
}
Flux<? extends MailQueue.MailQueueItem> deQueue() {
- return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
- .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
+ return flux.flatMapSequential(this::loadItem)
+ .concatMap(this::filterIfDeleted);
}
private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
return mailQueueView.isPresent(item.getEnqueueId())
- .flatMap(isPresent -> keepWhenPresent(item, isPresent));
- }
-
- private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem item, Boolean isPresent) {
- if (isPresent) {
- return Mono.just(item);
- }
- item.done(true);
- return Mono.empty();
+ .handle((isPresent, sink) -> {
+ if (isPresent) {
+ sink.next(item);
+ } else {
+ item.done(true);
+ sink.complete();
+ }
+ });
}
private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {