JAMES-3977 Improve IMAP FETCH concurrency control
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index d90a147..6619064 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -79,7 +79,6 @@
public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> {
static class FetchSubscriber implements Subscriber<FetchResponse> {
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
- private final AtomicBoolean requested = new AtomicBoolean(false);
private final Sinks.One<Void> sink = Sinks.one();
private final ImapSession imapSession;
private final Responder responder;
@@ -97,21 +96,23 @@
@Override
public void onNext(FetchResponse fetchResponse) {
- requested.getAndSet(false);
+ AtomicBoolean mustRequestOne = new AtomicBoolean(true);
responder.respond(fetchResponse);
- if (imapSession.backpressureNeeded(this::requestOne)) {
+ Runnable requestOne = () -> {
+ if (mustRequestOne.getAndSet(false)) {
+ requestOne();
+ }
+ };
+ if (imapSession.backpressureNeeded(requestOne)) {
LOGGER.debug("Applying backpressure as we encounter a slow reader");
} else {
- requestOne();
+ requestOne.run();
}
}
private void requestOne() {
- boolean alreadyRequested = requested.getAndSet(true);
- if (!alreadyRequested) {
- Optional.ofNullable(subscription.get())
- .ifPresent(s -> s.request(1));
- }
+ Optional.ofNullable(subscription.get())
+ .ifPresent(s -> s.request(1));
}
@Override