blob: 73bccc8f91e963a0a36cc258971311ffbd3c40b4 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.imapserver.netty;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.configuration2.tree.ImmutableNode;
import org.apache.james.imap.api.ConnectionCheck;
import org.apache.james.imap.api.ImapConfiguration;
import org.apache.james.imap.api.ImapConstants;
import org.apache.james.imap.api.process.ImapProcessor;
import org.apache.james.imap.decode.ImapDecoder;
import org.apache.james.imap.encode.ImapEncoder;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.protocols.api.OidcSASLConfiguration;
import org.apache.james.protocols.lib.netty.AbstractConfigurableAsyncServer;
import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
import org.apache.james.protocols.netty.ChannelHandlerFactory;
import org.apache.james.protocols.netty.ConnectionLimitUpstreamHandler;
import org.apache.james.protocols.netty.ConnectionPerIpLimitUpstreamHandler;
import org.apache.james.protocols.netty.Encryption;
import org.apache.james.protocols.netty.HandlerConstants;
import org.apache.james.util.Size;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* NIO IMAP Server which use Netty.
*/
public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapConstants, IMAPServerMBean, NettyConstants {
private static final Logger LOG = LoggerFactory.getLogger(IMAPServer.class);
public static class AuthenticationConfiguration {
private static final boolean PLAIN_AUTH_DISALLOWED_DEFAULT = true;
private static final boolean PLAIN_AUTH_ENABLED_DEFAULT = true;
private static final String OIDC_PATH = "auth.oidc";
public static AuthenticationConfiguration parse(HierarchicalConfiguration<ImmutableNode> configuration) throws ConfigurationException {
boolean isRequireSSL = configuration.getBoolean("auth.requireSSL", fallback(configuration));
boolean isPlainAuthEnabled = configuration.getBoolean("auth.plainAuthEnabled", PLAIN_AUTH_ENABLED_DEFAULT);
if (configuration.immutableConfigurationsAt(OIDC_PATH).isEmpty()) {
return new AuthenticationConfiguration(
isRequireSSL,
isPlainAuthEnabled);
} else {
try {
return new AuthenticationConfiguration(
isRequireSSL,
isPlainAuthEnabled,
OidcSASLConfiguration.parse(configuration.configurationAt(OIDC_PATH)));
} catch (MalformedURLException | NullPointerException | URISyntaxException exception) {
throw new ConfigurationException("Failed to retrieve oauth component", exception);
}
}
}
private static boolean fallback(HierarchicalConfiguration<ImmutableNode> configuration) {
return configuration.getBoolean("plainAuthDisallowed", PLAIN_AUTH_DISALLOWED_DEFAULT);
}
private final boolean isSSLRequired;
private final boolean plainAuthEnabled;
private final Optional<OidcSASLConfiguration> oidcSASLConfiguration;
public AuthenticationConfiguration(boolean isSSLRequired, boolean plainAuthEnabled) {
this.isSSLRequired = isSSLRequired;
this.plainAuthEnabled = plainAuthEnabled;
this.oidcSASLConfiguration = Optional.empty();
}
public AuthenticationConfiguration(boolean isSSLRequired, boolean plainAuthEnabled, OidcSASLConfiguration oidcSASLConfiguration) {
this.isSSLRequired = isSSLRequired;
this.plainAuthEnabled = plainAuthEnabled;
this.oidcSASLConfiguration = Optional.of(oidcSASLConfiguration);
}
public boolean isSSLRequired() {
return isSSLRequired;
}
public boolean isPlainAuthEnabled() {
return plainAuthEnabled;
}
public Optional<OidcSASLConfiguration> getOidcSASLConfiguration() {
return oidcSASLConfiguration;
}
}
private static final String SOFTWARE_TYPE = "JAMES " + VERSION + " Server ";
private static final String DEFAULT_TIME_UNIT = "SECONDS";
private static final String CAPABILITY_SEPARATOR = "|";
public static final int DEFAULT_MAX_LINE_LENGTH = 65536; // Use a big default
public static final Size DEFAULT_IN_MEMORY_SIZE_LIMIT = Size.of(10L, Size.Unit.M); // Use 10MB as default
public static final int DEFAULT_TIMEOUT = 30 * 60; // default timeout is 30 minutes
public static final int DEFAULT_LITERAL_SIZE_LIMIT = 0;
private final ImapProcessor processor;
private final ImapEncoder encoder;
private final ImapDecoder decoder;
private final ImapMetrics imapMetrics;
private final GaugeRegistry gaugeRegistry;
private final Set<ConnectionCheck> connectionChecks;
private String hello;
private boolean compress;
private int maxLineLength;
private int inMemorySizeLimit;
private int timeout;
private int literalSizeLimit;
private AuthenticationConfiguration authenticationConfiguration;
private Optional<ConnectionLimitUpstreamHandler> connectionLimitUpstreamHandler = Optional.empty();
private Optional<ConnectionPerIpLimitUpstreamHandler> connectionPerIpLimitUpstreamHandler = Optional.empty();
private boolean ignoreIDLEUponProcessing;
private Duration heartbeatInterval;
private ReactiveThrottler reactiveThrottler;
public IMAPServer(ImapDecoder decoder, ImapEncoder encoder, ImapProcessor processor, ImapMetrics imapMetrics, GaugeRegistry gaugeRegistry, Set<ConnectionCheck> connectionChecks) {
this.processor = processor;
this.encoder = encoder;
this.decoder = decoder;
this.imapMetrics = imapMetrics;
this.gaugeRegistry = gaugeRegistry;
this.connectionChecks = connectionChecks;
}
@Override
public void doConfigure(HierarchicalConfiguration<ImmutableNode> configuration) throws ConfigurationException {
super.doConfigure(configuration);
hello = SOFTWARE_TYPE + getHelloName() + " is ready.";
compress = configuration.getBoolean("compress", false);
maxLineLength = configuration.getInt("maxLineLength", DEFAULT_MAX_LINE_LENGTH);
inMemorySizeLimit = Math.toIntExact(Optional.ofNullable(configuration.getString("inMemorySizeLimit", null))
.map(Size::parse)
.orElse(DEFAULT_IN_MEMORY_SIZE_LIMIT)
.asBytes());
literalSizeLimit = parseLiteralSizeLimit(configuration);
timeout = configuration.getInt("timeout", DEFAULT_TIMEOUT);
if (timeout < DEFAULT_TIMEOUT) {
throw new ConfigurationException("Minimum timeout of 30 minutes required. See rfc2060 5.4 for details");
}
authenticationConfiguration = AuthenticationConfiguration.parse(configuration);
connectionLimitUpstreamHandler = ConnectionLimitUpstreamHandler.forCount(connectionLimit);
connectionPerIpLimitUpstreamHandler = ConnectionPerIpLimitUpstreamHandler.forCount(connPerIP);
ignoreIDLEUponProcessing = configuration.getBoolean("ignoreIDLEUponProcessing", true);
ImapConfiguration imapConfiguration = getImapConfiguration(configuration);
heartbeatInterval = imapConfiguration.idleTimeIntervalAsDuration();
reactiveThrottler = new ReactiveThrottler(gaugeRegistry, imapConfiguration.getConcurrentRequests(), imapConfiguration.getMaxQueueSize());
processor.configure(imapConfiguration);
}
private static Integer parseLiteralSizeLimit(HierarchicalConfiguration<ImmutableNode> configuration) {
return Optional.ofNullable(configuration.getString("literalSizeLimit", null))
.map(Size::parse)
.map(Size::asBytes)
.map(Math::toIntExact)
.orElse(DEFAULT_LITERAL_SIZE_LIMIT);
}
@VisibleForTesting static ImapConfiguration getImapConfiguration(HierarchicalConfiguration<ImmutableNode> configuration) {
ImmutableSet<String> disabledCaps = ImmutableSet.copyOf(Splitter.on(CAPABILITY_SEPARATOR).split(configuration.getString("disabledCaps", "")));
return ImapConfiguration.builder()
.enableIdle(configuration.getBoolean("enableIdle", ImapConfiguration.DEFAULT_ENABLE_IDLE))
.idleTimeInterval(configuration.getLong("idleTimeInterval", ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS))
.idleTimeIntervalUnit(getTimeIntervalUnit(configuration.getString("idleTimeIntervalUnit", DEFAULT_TIME_UNIT)))
.disabledCaps(disabledCaps)
.appendLimit(Optional.of(parseLiteralSizeLimit(configuration)).filter(i -> i > 0))
.maxQueueSize(configuration.getInteger("maxQueueSize", ImapConfiguration.DEFAULT_QUEUE_SIZE))
.concurrentRequests(configuration.getInteger("concurrentRequests", ImapConfiguration.DEFAULT_CONCURRENT_REQUESTS))
.isProvisionDefaultMailboxes(configuration.getBoolean("provisionDefaultMailboxes", ImapConfiguration.DEFAULT_PROVISION_DEFAULT_MAILBOXES))
.withCustomProperties(configuration.getProperties("customProperties"))
.build();
}
private static TimeUnit getTimeIntervalUnit(String timeIntervalUnit) {
try {
return TimeUnit.valueOf(timeIntervalUnit);
} catch (IllegalArgumentException e) {
LOG.info("Time interval unit is not valid {}, the default {} value should be used", timeIntervalUnit, ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_UNIT);
return ImapConfiguration.DEFAULT_HEARTBEAT_INTERVAL_UNIT;
}
}
@Override
public int getDefaultPort() {
return 143;
}
@Override
public String getServiceType() {
return "IMAP Service";
}
@Override
protected AbstractChannelPipelineFactory createPipelineFactory() {
return new AbstractChannelPipelineFactory(getFrameHandlerFactory(), getExecutorGroup()) {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
return createCoreHandler();
}
@Override
public void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
pipeline.addLast(TIMEOUT_HANDLER, new ImapIdleStateHandler(timeout));
connectionLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(HandlerConstants.CONNECTION_LIMIT_HANDLER, handler));
connectionPerIpLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(HandlerConstants.CONNECTION_LIMIT_PER_IP_HANDLER, handler));
if (proxyRequired) {
pipeline.addLast(HandlerConstants.PROXY_HANDLER, new HAProxyMessageDecoder());
pipeline.addLast("proxyInformationHandler", new HAProxyMessageHandler(connectionChecks));
}
// Add the text line decoder which limit the max line length,
// don't strip the delimiter and use CRLF as delimiter
// Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline));
Encryption secure = getEncryption();
if (secure != null && !secure.isStartTLS()) {
if (proxyRequired) {
channel.pipeline().addAfter("proxyInformationHandler", SSL_HANDLER, secure.sslHandler());
} else {
channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
}
}
pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit,
literalSizeLimit, maxLineLength));
pipeline.addLast(CORE_HANDLER, createCoreHandler());
}
};
}
@Override
protected String getDefaultJMXName() {
return "imapserver";
}
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
Encryption secure = getEncryption();
return ImapChannelUpstreamHandler.builder()
.reactiveThrottler(reactiveThrottler)
.hello(hello)
.processor(processor)
.encoder(encoder)
.compress(compress)
.authenticationConfiguration(authenticationConfiguration)
.connectionChecks(connectionChecks)
.secure(secure)
.imapMetrics(imapMetrics)
.heartbeatInterval(heartbeatInterval)
.ignoreIDLEUponProcessing(ignoreIDLEUponProcessing)
.proxyRequired(proxyRequired)
.build();
}
@Override
protected ChannelHandlerFactory createFrameHandlerFactory() {
return new SwitchableLineBasedFrameDecoderFactory(maxLineLength);
}
public Set<ConnectionCheck> getConnectionChecks() {
return this.connectionChecks;
}
}