JAMES-3977 Improve IMAP FETCH concurrency control
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index d90a147..6619064 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -79,7 +79,6 @@
 public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> {
     static class FetchSubscriber implements Subscriber<FetchResponse> {
         private final AtomicReference<Subscription> subscription = new AtomicReference<>();
-        private final AtomicBoolean requested = new AtomicBoolean(false);
         private final Sinks.One<Void> sink = Sinks.one();
         private final ImapSession imapSession;
         private final Responder responder;
@@ -97,21 +96,23 @@
 
         @Override
         public void onNext(FetchResponse fetchResponse) {
-            requested.getAndSet(false);
+            AtomicBoolean mustRequestOne = new AtomicBoolean(true);
             responder.respond(fetchResponse);
-            if (imapSession.backpressureNeeded(this::requestOne)) {
+            Runnable requestOne = () -> {
+                if (mustRequestOne.getAndSet(false)) {
+                    requestOne();
+                }
+            };
+            if (imapSession.backpressureNeeded(requestOne)) {
                 LOGGER.debug("Applying backpressure as we encounter a slow reader");
             } else {
-                requestOne();
+                requestOne.run();
             }
         }
 
         private void requestOne() {
-            boolean alreadyRequested = requested.getAndSet(true);
-            if (!alreadyRequested) {
-                Optional.ofNullable(subscription.get())
-                    .ifPresent(s -> s.request(1));
-            }
+            Optional.ofNullable(subscription.get())
+                .ifPresent(s -> s.request(1));
         }
 
         @Override