blob: 3e3e26d04363425f623aba05f2e5e3c20d4877e2 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.imapserver.netty;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.imap.api.ImapMessage;
import org.apache.james.imap.api.ImapSessionState;
import org.apache.james.imap.api.process.ImapSession;
import org.apache.james.imap.decode.DecodingException;
import org.apache.james.imap.decode.ImapDecoder;
import org.apache.james.imap.decode.ImapRequestLineReader;
import org.apache.james.protocols.netty.LineHandlerAware;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
/**
* {@link ByteToMessageDecoder} which will decode via and {@link ImapDecoder} instance
*/
public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements NettyConstants, LineHandlerAware {
@VisibleForTesting
static final String NEEDED_DATA = "NEEDED_DATA";
private static final boolean RETRY = true;
private static final String SINK = "SINK";
private static final String SUBSCRIPTION = "SUBSCRIPTION";
private final ImapDecoder decoder;
private final int inMemorySizeLimit;
private final int literalSizeLimit;
private final Deque<ChannelInboundHandlerAdapter> behaviourOverrides = new ConcurrentLinkedDeque<>();
private final int maxFrameLength;
private final AtomicBoolean framingEnabled = new AtomicBoolean(true);
public ImapRequestFrameDecoder(ImapDecoder decoder, int inMemorySizeLimit, int literalSizeLimit, int maxFrameLength) {
this.decoder = decoder;
this.inMemorySizeLimit = inMemorySizeLimit;
this.literalSizeLimit = literalSizeLimit;
this.maxFrameLength = maxFrameLength;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).set(new HashMap<>());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Object subscription = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get()
.get(SUBSCRIPTION);
if (subscription instanceof Disposable) {
((Disposable) subscription).dispose();
}
super.channelInactive(ctx);
}
@Override
@SuppressWarnings("unchecked")
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ChannelInboundHandlerAdapter override = behaviourOverrides.peekFirst();
if (override != null) {
override.channelRead(ctx, in);
return;
}
int readerIndex = in.readerIndex();
Map<String, Object> attachment = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get();
Pair<ImapRequestLineReader, Integer> readerAndSize = obtainReader(ctx, in, attachment, readerIndex);
if (readerAndSize == null) {
return;
}
parseImapMessage(ctx, in, attachment, readerAndSize, readerIndex)
.ifPresent(out::add);
}
private Optional<ImapMessage> parseImapMessage(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, Pair<ImapRequestLineReader, Integer> readerAndSize, int readerIndex) throws DecodingException {
ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
// check if the session was removed before to prevent a harmless NPE. See JAMES-1312
// Also check if the session was logged out if so there is not need to try to decode it. See JAMES-1341
if (session != null && session.getState() != ImapSessionState.LOGOUT) {
try {
ImapMessage message = decoder.decode(readerAndSize.getLeft(), session);
// if size is != -1 the case was a literal. if thats the case we
// should not consume the line
// See JAMES-1199
if (readerAndSize.getRight() == -1) {
readerAndSize.getLeft().consumeLine();
}
enableFraming(ctx);
attachment.clear();
return Optional.of(message);
} catch (NettyImapRequestLineReader.NotEnoughDataException e) {
// this exception was thrown because we don't have enough data yet
requestMoreData(ctx, in, attachment, e.getNeededSize(), readerIndex);
}
} else {
// The session was null so may be the case because the channel was already closed but there were still bytes in the buffer.
// We now try to disconnect the client if still connected
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
return Optional.empty();
}
private void requestMoreData(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int neededData, int readerIndex) {
// store the needed data size for later usage
attachment.put(NEEDED_DATA, neededData);
// SwitchableDelimiterBasedFrameDecoder added further to JAMES-1436.
disableFraming(ctx);
in.readerIndex(readerIndex);
}
private Pair<ImapRequestLineReader, Integer> obtainReader(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int readerIndex) throws IOException {
boolean retry = false;
ImapRequestLineReader reader;
// check if we failed before and if we already know how much data we
// need to sucess next run
int size = -1;
final Object rawSize = attachment.get(NEEDED_DATA);
if (rawSize != null) {
retry = true;
size = (Integer) rawSize;
// now see if the buffer hold enough data to process.
if (size != NettyImapRequestLineReader.NotEnoughDataException.UNKNOWN_SIZE && size > in.readableBytes()) {
// check if we have a inMemorySize limit and if so if the
// expected size will fit into it
if (inMemorySizeLimit > 0 && inMemorySizeLimit < size) {
// ok seems like it will not fit in the memory limit so we
// need to store it in a temporary file
uploadToAFile(ctx, in, attachment, size, readerIndex);
return null;
} else {
in.resetReaderIndex();
return null;
}
} else {
reader = new NettyImapRequestLineReader(ctx.channel(), in, retry, literalSizeLimit);
}
} else {
reader = new NettyImapRequestLineReader(ctx.channel(), in, retry, literalSizeLimit);
}
return Pair.of(reader, size);
}
private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size, int readerIndex) throws IOException {
Sinks.Many<byte[]> sink;
// check if we have created a temporary file already or if
// we need to create a new one
if (attachment.containsKey(SINK)) {
sink = (Sinks.Many<byte[]>) attachment.get(SINK);
} else {
sink = Sinks.many().unicast().onBackpressureBuffer();
attachment.put(SINK, sink);
FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size,
(file, written) -> {
ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), file, RETRY);
try {
parseImapMessage(ctx, null, attachment, Pair.of(reader, size), readerIndex)
.ifPresent(message -> {
ctx.fireChannelRead(message);
// Remove ongoing subscription: now on lifecycle of the message will be managed by ImapChannelUpstreamHandler.
// Not doing this causes IDLEd IMAP connections to clear IMAP append literal while they are processed.
attachment.remove(SUBSCRIPTION);
});
} catch (DecodingException e) {
ctx.fireExceptionCaught(e);
}
});
Disposable subscribe = sink.asFlux()
.publishOn(Schedulers.boundedElastic())
.subscribe(fileChunkConsumer,
e -> {
fileChunkConsumer.discard();
ctx.fireExceptionCaught(e);
},
() -> {
});
attachment.put(SUBSCRIPTION, (Disposable) () -> {
// Clear the file if the connection is reset while buffering the litteral.
subscribe.dispose();
fileChunkConsumer.discard();
});
}
int readableBytes = in.readableBytes();
byte[] bytes = new byte[readableBytes];
in.readBytes(bytes);
sink.emitNext(bytes, FAIL_FAST);
}
static class FileChunkConsumer implements Consumer<byte[]> {
private final int size;
private final AtomicInteger written = new AtomicInteger(0);
private final BiConsumer<File, Integer> callback;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private OutputStream outputStream;
private File f;
FileChunkConsumer(int size, BiConsumer<File, Integer> callback) {
this.size = size;
this.callback = callback;
}
@Override
public void accept(byte[] next) {
if (!initialized.get()) {
initialize();
}
writeChunk(next);
// Check if all needed data was streamed to the file.
if (isComplete()) {
finalizeDataTransfer();
}
}
private void initialize() {
try {
f = Files.createTempFile("imap-literal", ".tmp").toFile();
outputStream = new FileOutputStream(f, true);
initialized.set(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void writeChunk(byte[] next) {
try {
int amount = Math.min(next.length, size - written.get());
outputStream.write(next, 0, amount);
written.addAndGet(amount);
} catch (Exception e) {
try {
outputStream.close();
} catch (IOException ignored) {
//ignore exception during close
}
throw new RuntimeException(e);
}
}
private boolean isComplete() {
return written.get() == size;
}
private void finalizeDataTransfer() {
try {
outputStream.close();
} catch (IOException ignored) {
//ignore exception during close
}
callback.accept(f, written.get());
}
void discard() {
Mono.fromRunnable(Throwing.runnable(() -> {
if (outputStream != null) {
outputStream.close();
}
if (f != null) {
Files.delete(f.toPath());
}
})).subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
public void disableFraming(ChannelHandlerContext ctx) {
if (framingEnabled.getAndSet(false)) {
ctx.channel().pipeline().remove(FRAMER);
}
}
public void enableFraming(ChannelHandlerContext ctx) {
if (!framingEnabled.getAndSet(true)) {
ctx.channel().pipeline().addBefore(REQUEST_DECODER, FRAMER,
new SwitchableLineBasedFrameDecoder(ctx.channel().pipeline(), maxFrameLength, false));
}
}
@Override
public void pushLineHandler(ChannelInboundHandlerAdapter lineHandlerUpstreamHandler) {
behaviourOverrides.addFirst(lineHandlerUpstreamHandler);
}
@Override
public void popLineHandler() {
if (!behaviourOverrides.isEmpty()) {
behaviourOverrides.removeFirst();
}
}
}