blob: c364e80498ad3f4efb83574388d699af5b2ab386 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.protocols.netty;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.james.protocols.api.ProtocolServer;
import org.apache.james.util.concurrent.NamedThreadFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
/**
* Abstract base class for Servers which want to use async io
*/
public abstract class AbstractAsyncServer implements ProtocolServer {
public static final int DEFAULT_IO_WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2;
public static final int DEFAULT_BOSS_WORKER_COUNT = 2;
private volatile int backlog = 250;
private volatile int timeout = 120;
private Optional<EventLoopGroup> bossGroup;
private EventLoopGroup workerGroup;
private volatile boolean started;
private final ChannelGroup channels = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private volatile int ioWorker = DEFAULT_IO_WORKER_COUNT;
private volatile Optional<Integer> bossWorker = Optional.of(DEFAULT_BOSS_WORKER_COUNT);
private List<InetSocketAddress> addresses = new ArrayList<>();
protected String jmxName;
private boolean gracefulShutdown = true;
private boolean useEpoll = false;
protected WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
public synchronized void setListenAddresses(InetSocketAddress... addresses) {
if (started) {
throw new IllegalStateException("Can only be set when the server is not running");
}
this.addresses = ImmutableList.copyOf(addresses);
}
public void setGracefulShutdown(boolean gracefulShutdown) {
this.gracefulShutdown = gracefulShutdown;
}
public void setUseEpoll(boolean useEpoll) {
this.useEpoll = useEpoll;
}
public void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
this.writeBufferWaterMark = writeBufferWaterMark;
}
/**
* Set the IO-worker thread count to use.
*
* IO threads are used for receiving and framing IMAP messages.
*
* Default is nCores * 2
*/
public void setIoWorkerCount(int ioWorker) {
if (started) {
throw new IllegalStateException("Can only be set when the server is not running");
}
this.ioWorker = ioWorker;
}
/**
* Set the Boss-worker thread count to use.
*
* Boss threads are responsible of accepting new connections.
*
* Default is to not use boss threads and let the IO threads hand over this responsibility.
*/
public void setBossWorkerCount(Optional<Integer> bossWorker) {
if (started) {
throw new IllegalStateException("Can only be set when the server is not running");
}
this.bossWorker = bossWorker;
}
@Override
public synchronized void bind() throws Exception {
if (started) {
throw new IllegalStateException("Server running already");
}
if (addresses.isEmpty()) {
throw new RuntimeException("Please specify at least on socketaddress to which the server should get bound!");
}
ServerBootstrap bootstrap = new ServerBootstrap();
if (useEpoll) {
bootstrap.channel(EpollServerSocketChannel.class);
bossGroup = bossWorker.map(count -> new EpollEventLoopGroup(count, NamedThreadFactory.withName(jmxName + "-boss")));
workerGroup = new EpollEventLoopGroup(ioWorker, NamedThreadFactory.withName(jmxName + "-io"));
} else {
bootstrap.channel(NioServerSocketChannel.class);
bossGroup = bossWorker.map(count -> new NioEventLoopGroup(count, NamedThreadFactory.withName(jmxName + "-boss")));
workerGroup = new NioEventLoopGroup(ioWorker, NamedThreadFactory.withName(jmxName + "-io"));
}
bossGroup.<Runnable>map(boss -> () -> bootstrap.group(boss, workerGroup))
.orElse(() -> bootstrap.group(workerGroup))
.run();
ChannelInitializer<SocketChannel> factory = createPipelineFactory();
// Configure the pipeline factory.
bootstrap.childHandler(factory);
configureBootstrap(bootstrap);
for (InetSocketAddress address : addresses) {
Channel channel = bootstrap.bind(address).sync().channel();
channels.add(channel);
}
started = true;
}
/**
* Configure the bootstrap before it get bound
*/
protected void configureBootstrap(ServerBootstrap bootstrap) {
// Bind and start to accept incoming connections.
bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
}
@Override
public synchronized void unbind() {
if (!started) {
return;
}
List<Future<?>> futures = new ArrayList<>();
bossGroup.ifPresent(boss -> futures.add(boss.shutdownGracefully()));
if (workerGroup != null) {
futures.add(workerGroup.shutdownGracefully());
}
futures.add(channels.close());
if (gracefulShutdown) {
futures.forEach(Throwing.<Future<?>>consumer(Future::await).sneakyThrow());
}
started = false;
}
@Override
public synchronized List<InetSocketAddress> getListenAddresses() {
return channels.stream()
.map(channel -> (InetSocketAddress) channel.localAddress())
.collect(ImmutableList.toImmutableList());
}
/**
* Create ChannelPipelineFactory to use by this Server implementation
*/
protected abstract ChannelInitializer<SocketChannel> createPipelineFactory();
/**
* Set the read/write timeout for the server. This will throw a {@link IllegalStateException} if the
* server is running.
*/
public void setTimeout(int timeout) {
if (started) {
throw new IllegalStateException("Can only be set when the server is not running");
}
this.timeout = timeout;
}
/**
* Set the Backlog for the socket. This will throw a {@link IllegalStateException} if the server is running.
*/
public void setBacklog(int backlog) {
if (started) {
throw new IllegalStateException("Can only be set when the server is not running");
}
this.backlog = backlog;
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public boolean isBound() {
return started;
}
}