[REFACTORING] Dequeuer should not aggressively enforce a scheduler
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 0241c46..d3f0e66 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -42,7 +42,6 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
@@ -105,21 +104,20 @@
     }
 
     Flux<? extends MailQueue.MailQueueItem> deQueue() {
-        return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
-            .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
+        return flux.flatMapSequential(this::loadItem)
+            .concatMap(this::filterIfDeleted);
     }
 
     private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
         return mailQueueView.isPresent(item.getEnqueueId())
-            .flatMap(isPresent -> keepWhenPresent(item, isPresent));
-    }
-
-    private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem item, Boolean isPresent) {
-        if (isPresent) {
-            return Mono.just(item);
-        }
-        item.done(true);
-        return Mono.empty();
+            .handle((isPresent, sink) -> {
+                if (isPresent) {
+                    sink.next(item);
+                } else {
+                    item.done(true);
+                    sink.complete();
+                }
+            });
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {