| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| package org.apache.james.imapserver.netty; |
| |
| import static org.apache.james.imapserver.netty.IMAPServer.AuthenticationConfiguration; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.SocketException; |
| import java.nio.channels.ClosedChannelException; |
| import java.time.Duration; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.net.ssl.SSLHandshakeException; |
| |
| import org.apache.james.core.Username; |
| import org.apache.james.imap.api.ConnectionCheck; |
| import org.apache.james.imap.api.ImapConstants; |
| import org.apache.james.imap.api.ImapMessage; |
| import org.apache.james.imap.api.ImapSessionState; |
| import org.apache.james.imap.api.display.HumanReadableText; |
| import org.apache.james.imap.api.message.response.StatusResponse; |
| import org.apache.james.imap.api.process.ImapProcessor; |
| import org.apache.james.imap.api.process.ImapSession; |
| import org.apache.james.imap.api.process.ImapSession.SessionId; |
| import org.apache.james.imap.encode.ImapEncoder; |
| import org.apache.james.imap.encode.base.ImapResponseComposerImpl; |
| import org.apache.james.imap.main.ResponseEncoder; |
| import org.apache.james.imap.message.request.AbstractImapRequest; |
| import org.apache.james.imap.message.response.ImmutableStatusResponse; |
| import org.apache.james.metrics.api.Metric; |
| import org.apache.james.protocols.netty.Encryption; |
| import org.apache.james.util.MDCBuilder; |
| import org.apache.james.util.ReactorUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.github.fge.lambdas.Throwing; |
| |
| import io.netty.buffer.Unpooled; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandler; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.handler.codec.DecoderException; |
| import io.netty.handler.codec.TooLongFrameException; |
| import io.netty.handler.ssl.NotSslRecordException; |
| import io.netty.util.Attribute; |
| import reactor.core.Disposable; |
| import reactor.core.publisher.Flux; |
| import reactor.core.publisher.Mono; |
| |
| /** |
| * {@link ChannelInboundHandlerAdapter} which handles IMAP |
| */ |
| @ChannelHandler.Sharable |
| public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements NettyConstants { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ImapChannelUpstreamHandler.class); |
| public static final String MDC_KEY = "bound_MDC"; |
| |
| public static class ImapChannelUpstreamHandlerBuilder { |
| private String hello; |
| private Encryption secure; |
| private boolean compress; |
| private ImapProcessor processor; |
| private ImapEncoder encoder; |
| private IMAPServer.AuthenticationConfiguration authenticationConfiguration; |
| private ImapMetrics imapMetrics; |
| private boolean ignoreIDLEUponProcessing; |
| private Duration heartbeatInterval; |
| private ReactiveThrottler reactiveThrottler; |
| private Set<ConnectionCheck> connectionChecks; |
| private boolean proxyRequired; |
| |
| public ImapChannelUpstreamHandlerBuilder reactiveThrottler(ReactiveThrottler reactiveThrottler) { |
| this.reactiveThrottler = reactiveThrottler; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder hello(String hello) { |
| this.hello = hello; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder secure(Encryption secure) { |
| this.secure = secure; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder compress(boolean compress) { |
| this.compress = compress; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder processor(ImapProcessor processor) { |
| this.processor = processor; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder encoder(ImapEncoder encoder) { |
| this.encoder = encoder; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder authenticationConfiguration(IMAPServer.AuthenticationConfiguration authenticationConfiguration) { |
| this.authenticationConfiguration = authenticationConfiguration; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder connectionChecks(Set<ConnectionCheck> connectionChecks) { |
| this.connectionChecks = connectionChecks; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder imapMetrics(ImapMetrics imapMetrics) { |
| this.imapMetrics = imapMetrics; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder ignoreIDLEUponProcessing(boolean ignoreIDLEUponProcessing) { |
| this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder heartbeatInterval(Duration heartbeatInterval) { |
| this.heartbeatInterval = heartbeatInterval; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandlerBuilder proxyRequired(boolean proxyRequired) { |
| this.proxyRequired = proxyRequired; |
| return this; |
| } |
| |
| public ImapChannelUpstreamHandler build() { |
| return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler, connectionChecks, proxyRequired); |
| } |
| } |
| |
| static class ImapLinerarizer { |
| private final AtomicBoolean isExecutingRequest = new AtomicBoolean(false); |
| private final ConcurrentLinkedQueue<Object> throttled = new ConcurrentLinkedQueue<>(); |
| } |
| |
| public static ImapChannelUpstreamHandlerBuilder builder() { |
| return new ImapChannelUpstreamHandlerBuilder(); |
| } |
| |
| private final String hello; |
| private final Encryption secure; |
| private final boolean compress; |
| private final ImapProcessor processor; |
| private final ImapEncoder encoder; |
| private final ImapHeartbeatHandler heartbeatHandler; |
| private final AuthenticationConfiguration authenticationConfiguration; |
| private final Metric imapConnectionsMetric; |
| private final Metric imapCommandsMetric; |
| private final boolean ignoreIDLEUponProcessing; |
| private final ReactiveThrottler reactiveThrottler; |
| private final Set<ConnectionCheck> connectionChecks; |
| private final boolean proxyRequired; |
| |
| public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, ImapEncoder encoder, boolean compress, |
| Encryption secure, ImapMetrics imapMetrics, AuthenticationConfiguration authenticationConfiguration, |
| boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler, |
| Set<ConnectionCheck> connectionChecks, boolean proxyRequired) { |
| this.hello = hello; |
| this.processor = processor; |
| this.encoder = encoder; |
| this.secure = secure; |
| this.compress = compress; |
| this.authenticationConfiguration = authenticationConfiguration; |
| this.imapConnectionsMetric = imapMetrics.getConnectionsMetric(); |
| this.imapCommandsMetric = imapMetrics.getCommandsMetric(); |
| this.ignoreIDLEUponProcessing = ignoreIDLEUponProcessing; |
| this.heartbeatHandler = new ImapHeartbeatHandler(heartbeatIntervalSeconds, heartbeatIntervalSeconds, heartbeatIntervalSeconds); |
| this.reactiveThrottler = reactiveThrottler; |
| this.connectionChecks = connectionChecks; |
| this.proxyRequired = proxyRequired; |
| } |
| |
| @Override |
| public void channelActive(ChannelHandlerContext ctx) throws Exception { |
| SessionId sessionId = SessionId.generate(); |
| ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, compress, authenticationConfiguration.isSSLRequired(), |
| authenticationConfiguration.isPlainAuthEnabled(), sessionId, |
| authenticationConfiguration.getOidcSASLConfiguration()); |
| ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession); |
| ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).set(new ImapLinerarizer()); |
| MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx) |
| .addToContext(MDCBuilder.SESSION_ID, sessionId.asString()); |
| imapsession.setAttribute(MDC_KEY, boundMDC); |
| |
| performConnectionCheck(imapsession.getRemoteAddress()); |
| |
| try (Closeable closeable = mdc(imapsession).build()) { |
| InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); |
| LOGGER.info("Connection established from {}", address.getAddress().getHostAddress()); |
| imapConnectionsMetric.increment(); |
| |
| ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); |
| ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); |
| // write hello to client |
| response.untagged().message("OK").message(hello).end(); |
| response.flush(); |
| super.channelActive(ctx); |
| } |
| } |
| |
| @Override |
| public void channelWritabilityChanged(ChannelHandlerContext ctx) { |
| if (ctx.channel().isWritable()) { |
| Optional.ofNullable(ctx.channel().attr(BACKPRESSURE_CALLBACK).get()) |
| .ifPresent(Runnable::run); |
| } |
| } |
| |
| private void performConnectionCheck(InetSocketAddress clientIp) { |
| if (!connectionChecks.isEmpty() && !proxyRequired) { |
| Flux.fromIterable(connectionChecks) |
| .concatMap(connectionCheck -> connectionCheck.validate(clientIp)) |
| .then() |
| .block(); |
| } |
| } |
| |
| private MDCBuilder mdc(ChannelHandlerContext ctx) { |
| ImapSession maybeSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); |
| |
| return mdc(maybeSession); |
| } |
| |
| private MDCBuilder mdc(ImapSession imapSession) { |
| return Optional.ofNullable(imapSession) |
| .map(session -> { |
| MDCBuilder boundMDC = (MDCBuilder) session.getAttribute(MDC_KEY); |
| |
| return IMAPMDCContext.from(session) |
| .addToContext(boundMDC); |
| }) |
| .orElseGet(MDCBuilder::create); |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
| // remove the stored attribute for the channel to free up resources |
| // See JAMES-1195 |
| ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null); |
| try (Closeable closeable = mdc(imapSession).build()) { |
| InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); |
| LOGGER.info("Connection closed for {} and user {}", address.getAddress().getHostAddress(), retrieveUsername(imapSession)); |
| |
| Optional.ofNullable(imapSession).ifPresent(ImapSession::cancelOngoingProcessing); |
| Optional.ofNullable(imapSession) |
| .map(ImapSession::logout) |
| .orElse(Mono.empty()) |
| .doFinally(Throwing.consumer(signal -> { |
| imapConnectionsMetric.decrement(); |
| super.channelInactive(ctx); |
| })) |
| .subscribe(any -> { |
| |
| }, ctx::fireExceptionCaught); |
| } |
| } |
| |
| private static String retrieveUsername(ImapSession imapSession) { |
| return Optional.ofNullable(imapSession) |
| .flatMap(session -> Optional.ofNullable(session.getUserName())) |
| .map(Username::asString) |
| .orElse(""); |
| } |
| |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
| ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null); |
| String username = retrieveUsername(imapSession); |
| try (Closeable closeable = mdc(ctx).build()) { |
| if (cause instanceof SocketException) { |
| LOGGER.info("Socket exception encountered for user {}: {}", username, cause.getMessage()); |
| } else if (isSslHandshkeException(cause)) { |
| LOGGER.info("SSH handshake rejected {}", cause.getMessage()); |
| } else if (isNotSslRecordException(cause)) { |
| LOGGER.info("Not an SSL record {}", cause.getMessage()); |
| } else if (!(cause instanceof ClosedChannelException)) { |
| LOGGER.warn("Error while processing imap request", cause); |
| } |
| |
| if (cause instanceof TooLongFrameException) { |
| |
| // Max line length exceeded |
| // See RFC 2683 section 3.2.1 |
| // |
| // "For its part, a server should allow for a command line of at |
| // least |
| // 8000 octets. This provides plenty of leeway for accepting |
| // reasonable |
| // length commands from clients. The server should send a BAD |
| // response |
| // to a command that does not end within the server's maximum |
| // accepted |
| // command length." |
| // |
| // See also JAMES-1190 |
| ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); |
| ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); |
| response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded"); |
| response.flush(); |
| |
| } else if (cause instanceof ReactiveThrottler.RejectedException) { |
| manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause); |
| } else { |
| manageUnknownError(ctx); |
| } |
| } |
| } |
| |
| private boolean isSslHandshkeException(Throwable cause) { |
| return cause instanceof DecoderException |
| && cause.getCause() instanceof SSLHandshakeException; |
| } |
| |
| private boolean isNotSslRecordException(Throwable cause) { |
| return cause instanceof DecoderException && |
| cause.getCause() instanceof NotSslRecordException; |
| } |
| |
| private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) throws IOException { |
| if (cause.getImapMessage() instanceof AbstractImapRequest) { |
| AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage(); |
| ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); |
| ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); |
| new ResponseEncoder(encoder, response) |
| .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(), |
| new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null)); |
| response.flush(); |
| } else { |
| manageUnknownError(ctx); |
| } |
| } |
| |
| private void manageUnknownError(ChannelHandlerContext ctx) { |
| // logout on error not sure if that is the best way to handle it |
| ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); |
| Optional.ofNullable(imapSession).ifPresent(ImapSession::cancelOngoingProcessing); |
| |
| Optional.ofNullable(imapSession) |
| .map(ImapSession::logout) |
| .orElse(Mono.empty()) |
| .doFinally(Throwing.consumer(signal -> { |
| // Make sure we close the channel after all the buffers were flushed out |
| Channel channel = ctx.channel(); |
| if (channel.isActive()) { |
| channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); |
| } |
| super.channelInactive(ctx); |
| })) |
| .subscribe(any -> { |
| |
| }, e -> { |
| LOGGER.error("Exception while handling errors for channel {} and user {}", ctx.channel(), |
| Optional.ofNullable(imapSession).map(u -> u.getUserName().asString()).orElse(""), e); |
| Channel channel = ctx.channel(); |
| if (channel.isActive()) { |
| channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); |
| } |
| }); |
| } |
| |
| @Override |
| public void channelRead(ChannelHandlerContext ctx, Object msg) { |
| imapCommandsMetric.increment(); |
| ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); |
| Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY); |
| |
| ImapLinerarizer linearalizer = ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).get(); |
| synchronized (linearalizer) { |
| if (linearalizer.isExecutingRequest.get()) { |
| linearalizer.throttled.add(msg); |
| return; |
| } |
| linearalizer.isExecutingRequest.set(true); |
| } |
| |
| ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); |
| ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); |
| writer.setFlushCallback(response::flush); |
| ImapMessage message = (ImapMessage) msg; |
| |
| beforeIDLEUponProcessing(ctx); |
| ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response); |
| Disposable disposable = reactiveThrottler.throttle(processor.processReactive(message, responseEncoder, session) |
| .doOnEach(Throwing.consumer(signal -> { |
| if (session.getState() == ImapSessionState.LOGOUT) { |
| // Make sure we close the channel after all the buffers were flushed out |
| Channel channel = ctx.channel(); |
| if (channel.isActive()) { |
| channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); |
| } |
| } |
| if (signal.isOnComplete()) { |
| IOException failure = responseEncoder.getFailure(); |
| if (failure != null) { |
| try (Closeable mdc = ReactorUtils.retrieveMDCBuilder(signal).build()) { |
| LOGGER.info(failure.getMessage()); |
| LOGGER.debug("Failed to write {}", message, failure); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ctx.fireExceptionCaught(failure); |
| } |
| } |
| Object waitingMessage; |
| synchronized (linearalizer) { |
| linearalizer.isExecutingRequest.set(false); |
| waitingMessage = linearalizer.throttled.poll(); |
| } |
| if (signal.isOnComplete() || signal.isOnError()) { |
| afterIDLEUponProcessing(ctx); |
| } |
| if (signal.hasError()) { |
| ctx.fireExceptionCaught(signal.getThrowable()); |
| } |
| disposableAttribute.set(null); |
| response.flush(); |
| ctx.fireChannelReadComplete(); |
| if (signal.isOnComplete() || signal.isOnError()) { |
| if (waitingMessage != null && signal.isOnComplete()) { |
| channelRead(ctx, waitingMessage); |
| } |
| } |
| })) |
| .contextWrite(ReactorUtils.context("imap", mdc(session))), message) |
| // Manage throttling errors |
| .doOnError(ctx::fireExceptionCaught) |
| .doFinally(Throwing.consumer(any -> { |
| if (message instanceof Closeable) { |
| ((Closeable) message).close(); |
| } |
| })) |
| .subscribe(); |
| disposableAttribute.set(disposable); |
| } |
| |
| private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) { |
| if (!ignoreIDLEUponProcessing) { |
| try { |
| ctx.pipeline().addBefore(NettyConstants.CORE_HANDLER, NettyConstants.HEARTBEAT_HANDLER, heartbeatHandler); |
| } catch (IllegalArgumentException e) { |
| LOGGER.info("heartbeat handler is already part of this pipeline", e); |
| } |
| } |
| } |
| |
| private void afterIDLEUponProcessing(ChannelHandlerContext ctx) { |
| if (!ignoreIDLEUponProcessing) { |
| try { |
| ctx.pipeline().remove(NettyConstants.HEARTBEAT_HANDLER); |
| } catch (NoSuchElementException e) { |
| LOGGER.info("Heartbeat handler was concurrently removed"); |
| } |
| } |
| } |
| } |