[FIX] cancel ongoing processing for inactive channels
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 4e9e598..5f35d3a 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -1,447 +1,445 @@
-/****************************************************************

- * Licensed to the Apache Software Foundation (ASF) under one   *

- * or more contributor license agreements.  See the NOTICE file *

- * distributed with this work for additional information        *

- * regarding copyright ownership.  The ASF licenses this file   *

- * to you under the Apache License, Version 2.0 (the            *

- * "License"); you may not use this file except in compliance   *

- * with the License.  You may obtain a copy of the License at   *

- *                                                              *

- *   http://www.apache.org/licenses/LICENSE-2.0                 *

- *                                                              *

- * Unless required by applicable law or agreed to in writing,   *

- * software distributed under the License is distributed on an  *

- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *

- * KIND, either express or implied.  See the License for the    *

- * specific language governing permissions and limitations      *

- * under the License.                                           *

- ****************************************************************/

-package org.apache.james.imapserver.netty;

-

-import static org.apache.james.imapserver.netty.IMAPServer.AuthenticationConfiguration;

-

-import java.io.Closeable;

-import java.io.IOException;

-import java.net.InetSocketAddress;

-import java.net.SocketException;

-import java.nio.channels.ClosedChannelException;

-import java.time.Duration;

-import java.util.NoSuchElementException;

-import java.util.Optional;

-import java.util.Set;

-

-import javax.net.ssl.SSLHandshakeException;

-

-import org.apache.james.imap.api.ConnectionCheck;

-import org.apache.james.imap.api.ImapConstants;

-import org.apache.james.imap.api.ImapMessage;

-import org.apache.james.imap.api.ImapSessionState;

-import org.apache.james.imap.api.display.HumanReadableText;

-import org.apache.james.imap.api.message.response.StatusResponse;

-import org.apache.james.imap.api.process.ImapProcessor;

-import org.apache.james.imap.api.process.ImapSession;

-import org.apache.james.imap.api.process.ImapSession.SessionId;

-import org.apache.james.imap.encode.ImapEncoder;

-import org.apache.james.imap.encode.base.ImapResponseComposerImpl;

-import org.apache.james.imap.main.ResponseEncoder;

-import org.apache.james.imap.message.request.AbstractImapRequest;

-import org.apache.james.imap.message.response.ImmutableStatusResponse;

-import org.apache.james.metrics.api.Metric;

-import org.apache.james.protocols.netty.Encryption;

-import org.apache.james.util.MDCBuilder;

-import org.apache.james.util.ReactorUtils;

-import org.slf4j.Logger;

-import org.slf4j.LoggerFactory;

-

-import com.github.fge.lambdas.Throwing;

-

-import io.netty.buffer.Unpooled;

-import io.netty.channel.Channel;

-import io.netty.channel.ChannelFutureListener;

-import io.netty.channel.ChannelHandler;

-import io.netty.channel.ChannelHandlerContext;

-import io.netty.channel.ChannelInboundHandlerAdapter;

-import io.netty.handler.codec.DecoderException;

-import io.netty.handler.codec.TooLongFrameException;

-import io.netty.util.Attribute;

-import reactor.core.Disposable;

-import reactor.core.publisher.Flux;

-import reactor.core.publisher.Mono;

-

-/**

- * {@link ChannelInboundHandlerAdapter} which handles IMAP

- */

-@ChannelHandler.Sharable

-public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements NettyConstants {

-    private static final Logger LOGGER = LoggerFactory.getLogger(ImapChannelUpstreamHandler.class);

-    public static final String MDC_KEY = "bound_MDC";

-

-    public static class ImapChannelUpstreamHandlerBuilder {

-        private String hello;

-        private Encryption secure;

-        private boolean compress;

-        private ImapProcessor processor;

-        private ImapEncoder encoder;

-        private IMAPServer.AuthenticationConfiguration authenticationConfiguration;

-        private ImapMetrics imapMetrics;

-        private boolean ignoreIDLEUponProcessing;

-        private Duration heartbeatInterval;

-        private ReactiveThrottler reactiveThrottler;

-        private Set<ConnectionCheck> connectionChecks;

-        private boolean proxyRequired;

-

-        public ImapChannelUpstreamHandlerBuilder reactiveThrottler(ReactiveThrottler reactiveThrottler) {

-            this.reactiveThrottler = reactiveThrottler;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder hello(String hello) {

-            this.hello = hello;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder secure(Encryption secure) {

-            this.secure = secure;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder compress(boolean compress) {

-            this.compress = compress;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder processor(ImapProcessor processor) {

-            this.processor = processor;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder encoder(ImapEncoder encoder) {

-            this.encoder = encoder;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder authenticationConfiguration(IMAPServer.AuthenticationConfiguration authenticationConfiguration) {

-            this.authenticationConfiguration = authenticationConfiguration;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder connectionChecks(Set<ConnectionCheck> connectionChecks) {

-            this.connectionChecks = connectionChecks;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder imapMetrics(ImapMetrics imapMetrics) {

-            this.imapMetrics = imapMetrics;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder ignoreIDLEUponProcessing(boolean ignoreIDLEUponProcessing) {

-            this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder heartbeatInterval(Duration heartbeatInterval) {

-            this.heartbeatInterval = heartbeatInterval;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandlerBuilder proxyRequired(boolean proxyRequired) {

-            this.proxyRequired = proxyRequired;

-            return this;

-        }

-

-        public ImapChannelUpstreamHandler build() {

-            return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler, connectionChecks, proxyRequired);

-        }

-    }

-

-    public static ImapChannelUpstreamHandlerBuilder builder() {

-        return new ImapChannelUpstreamHandlerBuilder();

-    }

-

-    private final String hello;

-    private final Encryption secure;

-    private final boolean compress;

-    private final ImapProcessor processor;

-    private final ImapEncoder encoder;

-    private final ImapHeartbeatHandler heartbeatHandler;

-    private final AuthenticationConfiguration authenticationConfiguration;

-    private final Metric imapConnectionsMetric;

-    private final Metric imapCommandsMetric;

-    private final boolean ignoreIDLEUponProcessing;

-    private final ReactiveThrottler reactiveThrottler;

-    private final Set<ConnectionCheck> connectionChecks;

-    private final boolean proxyRequired;

-

-    public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, ImapEncoder encoder, boolean compress,

-                                      Encryption secure, ImapMetrics imapMetrics, AuthenticationConfiguration authenticationConfiguration,

-                                      boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler,

-                                      Set<ConnectionCheck> connectionChecks, boolean proxyRequired) {

-        this.hello = hello;

-        this.processor = processor;

-        this.encoder = encoder;

-        this.secure = secure;

-        this.compress = compress;

-        this.authenticationConfiguration = authenticationConfiguration;

-        this.imapConnectionsMetric = imapMetrics.getConnectionsMetric();

-        this.imapCommandsMetric = imapMetrics.getCommandsMetric();

-        this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing;

-        this.heartbeatHandler = new ImapHeartbeatHandler(heartbeatIntervalSeconds, heartbeatIntervalSeconds, heartbeatIntervalSeconds);

-        this.reactiveThrottler = reactiveThrottler;

-        this.connectionChecks = connectionChecks;

-        this.proxyRequired = proxyRequired;

-    }

-

-    @Override

-    public void channelActive(ChannelHandlerContext ctx) throws Exception {

-        SessionId sessionId = SessionId.generate();

-        ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, compress, authenticationConfiguration.isSSLRequired(),

-            authenticationConfiguration.isPlainAuthEnabled(), sessionId,

-            authenticationConfiguration.getOidcSASLConfiguration());

-        ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession);

-        ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer());

-        MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx)

-            .addToContext(MDCBuilder.SESSION_ID, sessionId.asString());

-        imapsession.setAttribute(MDC_KEY, boundMDC);

-

-        performConnectionCheck(imapsession.getRemoteAddress());

-

-        try (Closeable closeable = mdc(imapsession).build()) {

-            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();

-            LOGGER.info("Connection established from {}", address.getAddress().getHostAddress());

-            imapConnectionsMetric.increment();

-

-            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());

-            ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);

-            // write hello to client

-            response.untagged().message("OK").message(hello).end();

-            response.flush();

-            super.channelActive(ctx);

-        }

-    }

-

-    @Override

-    public void channelWritabilityChanged(ChannelHandlerContext ctx) {

-        if (ctx.channel().isWritable()) {

-            Optional.ofNullable(ctx.channel().attr(BACKPRESSURE_CALLBACK).get())

-                .ifPresent(Runnable::run);

-        }

-    }

-

-    private void performConnectionCheck(InetSocketAddress clientIp) {

-        if (!connectionChecks.isEmpty() && !proxyRequired) {

-            Flux.fromIterable(connectionChecks)

-                .concatMap(connectionCheck -> connectionCheck.validate(clientIp))

-                .then()

-                .block();

-        }

-    }

-

-    private MDCBuilder mdc(ChannelHandlerContext ctx) {

-        ImapSession maybeSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();

-

-        return mdc(maybeSession);

-    }

-

-    private MDCBuilder mdc(ImapSession imapSession) {

-        return Optional.ofNullable(imapSession)

-            .map(session -> {

-                MDCBuilder boundMDC = (MDCBuilder) session.getAttribute(MDC_KEY);

-

-                return IMAPMDCContext.from(session)

-                    .addToContext(boundMDC);

-            })

-            .orElseGet(MDCBuilder::create);

-    }

-

-    @Override

-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

-        // remove the stored attribute for the channel to free up resources

-        // See JAMES-1195

-        ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null);

-        try (Closeable closeable = mdc(imapSession).build()) {

-            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();

-            LOGGER.info("Connection closed for {}", address.getAddress().getHostAddress());

-

-            Disposable disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null);

-

-            Optional.ofNullable(imapSession)

-                .map(ImapSession::logout)

-                .orElse(Mono.empty())

-                .doFinally(Throwing.consumer(signal -> {

-                    imapConnectionsMetric.decrement();

-                    super.channelInactive(ctx);

-                }))

-                .subscribe(any -> {

-

-                }, ctx::fireExceptionCaught);

-            Optional.ofNullable(disposableAttribute).ifPresent(Disposable::dispose);

-        }

-    }

-

-    @Override

-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

-        try (Closeable closeable = mdc(ctx).build()) {

-            if (cause instanceof SocketException) {

-                LOGGER.info("Socket exception encountered: {}", cause.getMessage());

-            } else if (isSslHandshkeException(cause)) {

-                LOGGER.info("SSH handshake rejected {}", cause.getMessage());

-            } else if (!(cause instanceof ClosedChannelException)) {

-                LOGGER.warn("Error while processing imap request", cause);

-            }

-

-            if (cause instanceof TooLongFrameException) {

-

-                // Max line length exceeded

-                // See RFC 2683 section 3.2.1

-                //

-                // "For its part, a server should allow for a command line of at

-                // least

-                // 8000 octets. This provides plenty of leeway for accepting

-                // reasonable

-                // length commands from clients. The server should send a BAD

-                // response

-                // to a command that does not end within the server's maximum

-                // accepted

-                // command length."

-                //

-                // See also JAMES-1190

-                ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());

-                ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);

-                response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded");

-                response.flush();

-

-            } else if (cause instanceof ReactiveThrottler.RejectedException) {

-                manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause);

-            } else {

-                manageUnknownError(ctx);

-            }

-        }

-    }

-

-    private boolean isSslHandshkeException(Throwable cause) {

-        return cause instanceof DecoderException

-            && cause.getCause() instanceof SSLHandshakeException;

-    }

-

-    private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) throws IOException {

-        if (cause.getImapMessage() instanceof AbstractImapRequest) {

-            AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage();

-            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());

-            ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);

-            new ResponseEncoder(encoder, response)

-                .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(),

-                    new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null));

-            response.flush();

-        } else {

-            manageUnknownError(ctx);

-        }

-    }

-

-    private void manageUnknownError(ChannelHandlerContext ctx) {

-        // logout on error not sure if that is the best way to handle it

-        final ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();

-

-        Optional.ofNullable(ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null))

-            .ifPresent(Disposable::dispose);

-

-        Optional.ofNullable(imapSession)

-            .map(ImapSession::logout)

-            .orElse(Mono.empty())

-            .doFinally(Throwing.consumer(signal -> {

-                // Make sure we close the channel after all the buffers were flushed out

-                Channel channel = ctx.channel();

-                if (channel.isActive()) {

-                    channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

-                }

-                super.channelInactive(ctx);

-            }))

-            .subscribe(any -> {

-

-            }, e -> {

-                LOGGER.error("Exception while handling errors for channel {}", ctx.channel(), e);

-                Channel channel = ctx.channel();

-                if (channel.isActive()) {

-                    channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

-                }

-            });

-    }

-

-    @Override

-    public void channelRead(ChannelHandlerContext ctx, Object msg) {

-        imapCommandsMetric.increment();

-        ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();

-        Linearalizer linearalizer = ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get();

-        Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);

-        ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());

-        ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);

-        writer.setFlushCallback(response::flush);

-        ImapMessage message = (ImapMessage) msg;

-

-        beforeIDLEUponProcessing(ctx);

-        ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);

-        Disposable disposable = reactiveThrottler.throttle(

-            linearalizer.execute(processor.processReactive(message, responseEncoder, session))

-                .doOnEach(Throwing.consumer(signal -> {

-                    if (session.getState() == ImapSessionState.LOGOUT) {

-                        // Make sure we close the channel after all the buffers were flushed out

-                        Channel channel = ctx.channel();

-                        if (channel.isActive()) {

-                            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

-                        }

-                    }

-                    if (signal.isOnComplete()) {

-                        IOException failure = responseEncoder.getFailure();

-                        if (failure != null) {

-                            try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {

-                                LOGGER.info(failure.getMessage());

-                                LOGGER.debug("Failed to write {}", message, failure);

-                            } catch (IOException e) {

-                                throw new RuntimeException(e);

-                            }

-

-                            ctx.fireExceptionCaught(failure);

-                        }

-                    }

-                    if (signal.isOnComplete() || signal.isOnError()) {

-                        afterIDLEUponProcessing(ctx);

-                    }

-                    if (signal.hasError()) {

-                        ctx.fireExceptionCaught(signal.getThrowable());

-                    }

-                    disposableAttribute.set(null);

-                    response.flush();

-                    ctx.fireChannelReadComplete();

-                }))

-                .contextWrite(ReactorUtils.context("imap", mdc(session))), message)

-            // Manage throttling errors

-            .doOnError(ctx::fireExceptionCaught)

-            .doFinally(Throwing.consumer(any -> {

-                if (message instanceof Closeable) {

-                    ((Closeable) message).close();

-                }

-            }))

-            .subscribe();

-        disposableAttribute.set(disposable);

-    }

-

-    private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) {

-        if (!ignoreIDLEUponProcessing) {

-            try {

-                ctx.pipeline().addBefore(NettyConstants.CORE_HANDLER, NettyConstants.HEARTBEAT_HANDLER, heartbeatHandler);

-            } catch (IllegalArgumentException e) {

-                LOGGER.info("heartbeat handler is already part of this pipeline", e);

-            }

-        }

-    }

-

-    private void afterIDLEUponProcessing(ChannelHandlerContext ctx) {

-        if (!ignoreIDLEUponProcessing) {

-            try {

-                ctx.pipeline().remove(NettyConstants.HEARTBEAT_HANDLER);

-            } catch (NoSuchElementException e) {

-                LOGGER.info("Heartbeat handler was concurrently removed");

-            }

-        }

-    }

-}

+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import static org.apache.james.imapserver.netty.IMAPServer.AuthenticationConfiguration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.time.Duration;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import org.apache.james.imap.api.ConnectionCheck;
+import org.apache.james.imap.api.ImapConstants;
+import org.apache.james.imap.api.ImapMessage;
+import org.apache.james.imap.api.ImapSessionState;
+import org.apache.james.imap.api.display.HumanReadableText;
+import org.apache.james.imap.api.message.response.StatusResponse;
+import org.apache.james.imap.api.process.ImapProcessor;
+import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.api.process.ImapSession.SessionId;
+import org.apache.james.imap.encode.ImapEncoder;
+import org.apache.james.imap.encode.base.ImapResponseComposerImpl;
+import org.apache.james.imap.main.ResponseEncoder;
+import org.apache.james.imap.message.request.AbstractImapRequest;
+import org.apache.james.imap.message.response.ImmutableStatusResponse;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.protocols.netty.Encryption;
+import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.DecoderException;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.Attribute;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * {@link ChannelInboundHandlerAdapter} which handles IMAP
+ */
+@ChannelHandler.Sharable
+public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements NettyConstants {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ImapChannelUpstreamHandler.class);
+    public static final String MDC_KEY = "bound_MDC";
+
+    public static class ImapChannelUpstreamHandlerBuilder {
+        private String hello;
+        private Encryption secure;
+        private boolean compress;
+        private ImapProcessor processor;
+        private ImapEncoder encoder;
+        private IMAPServer.AuthenticationConfiguration authenticationConfiguration;
+        private ImapMetrics imapMetrics;
+        private boolean ignoreIDLEUponProcessing;
+        private Duration heartbeatInterval;
+        private ReactiveThrottler reactiveThrottler;
+        private Set<ConnectionCheck> connectionChecks;
+        private boolean proxyRequired;
+
+        public ImapChannelUpstreamHandlerBuilder reactiveThrottler(ReactiveThrottler reactiveThrottler) {
+            this.reactiveThrottler = reactiveThrottler;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder hello(String hello) {
+            this.hello = hello;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder secure(Encryption secure) {
+            this.secure = secure;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder compress(boolean compress) {
+            this.compress = compress;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder processor(ImapProcessor processor) {
+            this.processor = processor;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder encoder(ImapEncoder encoder) {
+            this.encoder = encoder;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder authenticationConfiguration(IMAPServer.AuthenticationConfiguration authenticationConfiguration) {
+            this.authenticationConfiguration = authenticationConfiguration;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder connectionChecks(Set<ConnectionCheck> connectionChecks) {
+            this.connectionChecks = connectionChecks;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder imapMetrics(ImapMetrics imapMetrics) {
+            this.imapMetrics = imapMetrics;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder ignoreIDLEUponProcessing(boolean ignoreIDLEUponProcessing) {
+            this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder heartbeatInterval(Duration heartbeatInterval) {
+            this.heartbeatInterval = heartbeatInterval;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandlerBuilder proxyRequired(boolean proxyRequired) {
+            this.proxyRequired = proxyRequired;
+            return this;
+        }
+
+        public ImapChannelUpstreamHandler build() {
+            return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler, connectionChecks, proxyRequired);
+        }
+    }
+
+    public static ImapChannelUpstreamHandlerBuilder builder() {
+        return new ImapChannelUpstreamHandlerBuilder();
+    }
+
+    private final String hello;
+    private final Encryption secure;
+    private final boolean compress;
+    private final ImapProcessor processor;
+    private final ImapEncoder encoder;
+    private final ImapHeartbeatHandler heartbeatHandler;
+    private final AuthenticationConfiguration authenticationConfiguration;
+    private final Metric imapConnectionsMetric;
+    private final Metric imapCommandsMetric;
+    private final boolean ignoreIDLEUponProcessing;
+    private final ReactiveThrottler reactiveThrottler;
+    private final Set<ConnectionCheck> connectionChecks;
+    private final boolean proxyRequired;
+
+    public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, ImapEncoder encoder, boolean compress,
+                                      Encryption secure, ImapMetrics imapMetrics, AuthenticationConfiguration authenticationConfiguration,
+                                      boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler,
+                                      Set<ConnectionCheck> connectionChecks, boolean proxyRequired) {
+        this.hello = hello;
+        this.processor = processor;
+        this.encoder = encoder;
+        this.secure = secure;
+        this.compress = compress;
+        this.authenticationConfiguration = authenticationConfiguration;
+        this.imapConnectionsMetric = imapMetrics.getConnectionsMetric();
+        this.imapCommandsMetric = imapMetrics.getCommandsMetric();
+        this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing;
+        this.heartbeatHandler = new ImapHeartbeatHandler(heartbeatIntervalSeconds, heartbeatIntervalSeconds, heartbeatIntervalSeconds);
+        this.reactiveThrottler = reactiveThrottler;
+        this.connectionChecks = connectionChecks;
+        this.proxyRequired = proxyRequired;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        SessionId sessionId = SessionId.generate();
+        ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, compress, authenticationConfiguration.isSSLRequired(),
+            authenticationConfiguration.isPlainAuthEnabled(), sessionId,
+            authenticationConfiguration.getOidcSASLConfiguration());
+        ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession);
+        ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer());
+        MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx)
+            .addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
+        imapsession.setAttribute(MDC_KEY, boundMDC);
+
+        performConnectionCheck(imapsession.getRemoteAddress());
+
+        try (Closeable closeable = mdc(imapsession).build()) {
+            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
+            LOGGER.info("Connection established from {}", address.getAddress().getHostAddress());
+            imapConnectionsMetric.increment();
+
+            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+            ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
+            // write hello to client
+            response.untagged().message("OK").message(hello).end();
+            response.flush();
+            super.channelActive(ctx);
+        }
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
+        if (ctx.channel().isWritable()) {
+            Optional.ofNullable(ctx.channel().attr(BACKPRESSURE_CALLBACK).get())
+                .ifPresent(Runnable::run);
+        }
+    }
+
+    private void performConnectionCheck(InetSocketAddress clientIp) {
+        if (!connectionChecks.isEmpty() && !proxyRequired) {
+            Flux.fromIterable(connectionChecks)
+                .concatMap(connectionCheck -> connectionCheck.validate(clientIp))
+                .then()
+                .block();
+        }
+    }
+
+    private MDCBuilder mdc(ChannelHandlerContext ctx) {
+        ImapSession maybeSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+
+        return mdc(maybeSession);
+    }
+
+    private MDCBuilder mdc(ImapSession imapSession) {
+        return Optional.ofNullable(imapSession)
+            .map(session -> {
+                MDCBuilder boundMDC = (MDCBuilder) session.getAttribute(MDC_KEY);
+
+                return IMAPMDCContext.from(session)
+                    .addToContext(boundMDC);
+            })
+            .orElseGet(MDCBuilder::create);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // remove the stored attribute for the channel to free up resources
+        // See JAMES-1195
+        ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null);
+        try (Closeable closeable = mdc(imapSession).build()) {
+            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
+            LOGGER.info("Connection closed for {}", address.getAddress().getHostAddress());
+
+            Optional.ofNullable(imapSession).ifPresent(ImapSession::cancelOngoingProcessing);
+            Optional.ofNullable(imapSession)
+                .map(ImapSession::logout)
+                .orElse(Mono.empty())
+                .doFinally(Throwing.consumer(signal -> {
+                    imapConnectionsMetric.decrement();
+                    super.channelInactive(ctx);
+                }))
+                .subscribe(any -> {
+
+                }, ctx::fireExceptionCaught);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        try (Closeable closeable = mdc(ctx).build()) {
+            if (cause instanceof SocketException) {
+                LOGGER.info("Socket exception encountered: {}", cause.getMessage());
+            } else if (isSslHandshkeException(cause)) {
+                LOGGER.info("SSH handshake rejected {}", cause.getMessage());
+            } else if (!(cause instanceof ClosedChannelException)) {
+                LOGGER.warn("Error while processing imap request", cause);
+            }
+
+            if (cause instanceof TooLongFrameException) {
+
+                // Max line length exceeded
+                // See RFC 2683 section 3.2.1
+                //
+                // "For its part, a server should allow for a command line of at
+                // least
+                // 8000 octets. This provides plenty of leeway for accepting
+                // reasonable
+                // length commands from clients. The server should send a BAD
+                // response
+                // to a command that does not end within the server's maximum
+                // accepted
+                // command length."
+                //
+                // See also JAMES-1190
+                ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+                ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
+                response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded");
+                response.flush();
+
+            } else if (cause instanceof ReactiveThrottler.RejectedException) {
+                manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause);
+            } else {
+                manageUnknownError(ctx);
+            }
+        }
+    }
+
+    private boolean isSslHandshkeException(Throwable cause) {
+        return cause instanceof DecoderException
+            && cause.getCause() instanceof SSLHandshakeException;
+    }
+
+    private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) throws IOException {
+        if (cause.getImapMessage() instanceof AbstractImapRequest) {
+            AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage();
+            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+            ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
+            new ResponseEncoder(encoder, response)
+                .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(),
+                    new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null));
+            response.flush();
+        } else {
+            manageUnknownError(ctx);
+        }
+    }
+
+    private void manageUnknownError(ChannelHandlerContext ctx) {
+        // logout on error not sure if that is the best way to handle it
+        final ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+
+        Optional.ofNullable(ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null))
+            .ifPresent(Disposable::dispose);
+
+        Optional.ofNullable(imapSession)
+            .map(ImapSession::logout)
+            .orElse(Mono.empty())
+            .doFinally(Throwing.consumer(signal -> {
+                // Make sure we close the channel after all the buffers were flushed out
+                Channel channel = ctx.channel();
+                if (channel.isActive()) {
+                    channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                }
+                super.channelInactive(ctx);
+            }))
+            .subscribe(any -> {
+
+            }, e -> {
+                LOGGER.error("Exception while handling errors for channel {}", ctx.channel(), e);
+                Channel channel = ctx.channel();
+                if (channel.isActive()) {
+                    channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                }
+            });
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        imapCommandsMetric.increment();
+        ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+        Linearalizer linearalizer = ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get();
+        Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
+        ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+        ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer);
+        writer.setFlushCallback(response::flush);
+        ImapMessage message = (ImapMessage) msg;
+
+        beforeIDLEUponProcessing(ctx);
+        ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response);
+        Disposable disposable = reactiveThrottler.throttle(
+            linearalizer.execute(processor.processReactive(message, responseEncoder, session))
+                .doOnEach(Throwing.consumer(signal -> {
+                    if (session.getState() == ImapSessionState.LOGOUT) {
+                        // Make sure we close the channel after all the buffers were flushed out
+                        Channel channel = ctx.channel();
+                        if (channel.isActive()) {
+                            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                        }
+                    }
+                    if (signal.isOnComplete()) {
+                        IOException failure = responseEncoder.getFailure();
+                        if (failure != null) {
+                            try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) {
+                                LOGGER.info(failure.getMessage());
+                                LOGGER.debug("Failed to write {}", message, failure);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            ctx.fireExceptionCaught(failure);
+                        }
+                    }
+                    if (signal.isOnComplete() || signal.isOnError()) {
+                        afterIDLEUponProcessing(ctx);
+                    }
+                    if (signal.hasError()) {
+                        ctx.fireExceptionCaught(signal.getThrowable());
+                    }
+                    disposableAttribute.set(null);
+                    response.flush();
+                    ctx.fireChannelReadComplete();
+                }))
+                .contextWrite(ReactorUtils.context("imap", mdc(session))), message)
+            // Manage throttling errors
+            .doOnError(ctx::fireExceptionCaught)
+            .doFinally(Throwing.consumer(any -> {
+                if (message instanceof Closeable) {
+                    ((Closeable) message).close();
+                }
+            }))
+            .subscribe();
+        disposableAttribute.set(disposable);
+    }
+
+    private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) {
+        if (!ignoreIDLEUponProcessing) {
+            try {
+                ctx.pipeline().addBefore(NettyConstants.CORE_HANDLER, NettyConstants.HEARTBEAT_HANDLER, heartbeatHandler);
+            } catch (IllegalArgumentException e) {
+                LOGGER.info("heartbeat handler is already part of this pipeline", e);
+            }
+        }
+    }
+
+    private void afterIDLEUponProcessing(ChannelHandlerContext ctx) {
+        if (!ignoreIDLEUponProcessing) {
+            try {
+                ctx.pipeline().remove(NettyConstants.HEARTBEAT_HANDLER);
+            } catch (NoSuchElementException e) {
+                LOGGER.info("Heartbeat handler was concurrently removed");
+            }
+        }
+    }
+}