| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.remoting.transport.quic; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.logger.Logger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.ExecutorUtil; |
| import org.apache.dubbo.common.utils.NetUtils; |
| import org.apache.dubbo.remoting.Channel; |
| import org.apache.dubbo.remoting.ChannelHandler; |
| import org.apache.dubbo.remoting.RemotingException; |
| import org.apache.dubbo.remoting.RemotingServer; |
| import org.apache.dubbo.remoting.transport.AbstractServer; |
| import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers; |
| import org.apache.dubbo.remoting.utils.UrlUtils; |
| |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.socket.nio.NioDatagramChannel; |
| import io.netty.handler.ssl.util.SelfSignedCertificate; |
| import io.netty.handler.timeout.IdleStateHandler; |
| import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; |
| import io.netty.incubator.codec.quic.QuicServerCodecBuilder; |
| import io.netty.incubator.codec.quic.QuicSslContext; |
| import io.netty.incubator.codec.quic.QuicSslContextBuilder; |
| import io.netty.incubator.codec.quic.QuicStreamChannel; |
| |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| |
| /** |
| * NettyServer. |
| */ |
| public class QuicNettyServer extends AbstractServer implements RemotingServer { |
| |
| private static final Logger logger = LoggerFactory.getLogger(QuicNettyServer.class); |
| private static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; |
| /** |
| * the cache for alive worker channel. |
| * <ip:port, dubbo channel> |
| */ |
| private Map<String, Channel> channels; |
| /** |
| * netty server bootstrap. |
| */ |
| private Bootstrap bootstrap; |
| /** |
| * the boss channel that receive connections and dispatch these to worker channel. |
| */ |
| private io.netty.channel.Channel channel; |
| |
| private EventLoopGroup bossGroup; |
| private EventLoopGroup workerGroup; |
| |
| public QuicNettyServer(URL url, ChannelHandler handler) throws RemotingException { |
| // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. |
| // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler |
| super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); |
| } |
| |
| /** |
| * Init and start netty server |
| * |
| * @throws Throwable |
| */ |
| @Override |
| protected void doOpen() throws Throwable { |
| |
| bossGroup = QuicNettyEventLoopFactory.eventLoopGroup(1, "QuicNettyServerBoss"); |
| |
| final QuicNettyServerHandler nettyServerHandler = new QuicNettyServerHandler(getUrl(), this); |
| channels = nettyServerHandler.getChannels(); |
| |
| |
| SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); |
| QuicSslContext context = QuicSslContextBuilder.forServer( |
| selfSignedCertificate.privateKey(), null, selfSignedCertificate.certificate()) |
| .applicationProtocols("http/0.9").build(); |
| |
| |
| bootstrap = new Bootstrap(); |
| io.netty.channel.ChannelHandler codec = new QuicServerCodecBuilder() |
| .sslContext(context) |
| .maxIdleTimeout(5000, TimeUnit.MILLISECONDS) |
| .initialMaxData(10000000) |
| .initialMaxStreamDataBidirectionalLocal(1000000) |
| .initialMaxStreamDataBidirectionalRemote(1000000) |
| .initialMaxStreamsBidirectional(100) |
| .initialMaxStreamsUnidirectional(100) |
| .tokenHandler(InsecureQuicTokenHandler.INSTANCE) |
| .streamHandler(new ChannelInitializer<QuicStreamChannel>() { |
| @Override |
| protected void initChannel(QuicStreamChannel ch) { |
| int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); |
| NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), QuicNettyServer.this); |
| ch.pipeline() |
| .addLast("decoder", adapter.getDecoder()) |
| .addLast("encoder", adapter.getEncoder()) |
| .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS)) |
| .addLast("handler", nettyServerHandler); |
| } |
| }).build(); |
| |
| InetSocketAddress address = getBindAddress(); |
| logger.info("bind address:" + address); |
| ChannelFuture channelFuture = bootstrap.group(bossGroup) |
| .channel(NioDatagramChannel.class) |
| .handler(codec) |
| .bind(address); |
| channelFuture.addListener((ChannelFutureListener) channelFuture1 -> logger.info("bind finish:" + channelFuture1)); |
| } |
| |
| @Override |
| protected void doClose() throws Throwable { |
| try { |
| if (channel != null) { |
| // unbind. |
| channel.close(); |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| try { |
| Collection<org.apache.dubbo.remoting.Channel> channels = getChannels(); |
| if (channels != null && channels.size() > 0) { |
| for (org.apache.dubbo.remoting.Channel channel : channels) { |
| try { |
| channel.close(); |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| } |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| try { |
| if (bootstrap != null) { |
| bossGroup.shutdownGracefully().syncUninterruptibly(); |
| workerGroup.shutdownGracefully().syncUninterruptibly(); |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| try { |
| if (channels != null) { |
| channels.clear(); |
| } |
| } catch (Throwable e) { |
| logger.warn(e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| protected int getChannelsSize() { |
| return channels == null ? 0 : channels.size(); |
| } |
| |
| @Override |
| public Collection<Channel> getChannels() { |
| Collection<Channel> chs = new ArrayList<>(this.channels.size()); |
| // pick channels from NettyServerHandler ( needless to check connectivity ) |
| chs.addAll(this.channels.values()); |
| return chs; |
| } |
| |
| @Override |
| public Channel getChannel(InetSocketAddress remoteAddress) { |
| return channels.get(NetUtils.toAddressString(remoteAddress)); |
| } |
| |
| @Override |
| public boolean canHandleIdle() { |
| return true; |
| } |
| |
| @Override |
| public boolean isBound() { |
| return channel.isActive(); |
| } |
| |
| } |