| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.alibaba.dubbo.remoting.transport.netty4; |
| |
| import com.alibaba.dubbo.common.Constants; |
| import com.alibaba.dubbo.common.URL; |
| import com.alibaba.dubbo.common.Version; |
| import com.alibaba.dubbo.common.logger.Logger; |
| import com.alibaba.dubbo.common.logger.LoggerFactory; |
| import com.alibaba.dubbo.common.utils.NetUtils; |
| import com.alibaba.dubbo.remoting.ChannelHandler; |
| import com.alibaba.dubbo.remoting.RemotingException; |
| import com.alibaba.dubbo.remoting.transport.AbstractClient; |
| |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.buffer.PooledByteBufAllocator; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.nio.NioEventLoopGroup; |
| import io.netty.channel.socket.nio.NioSocketChannel; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * NettyClient. |
| */ |
| public class NettyClient extends AbstractClient { |
| |
| private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); |
| |
| private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true)); |
| |
| private Bootstrap bootstrap; |
| |
| private volatile Channel channel; // volatile, please copy reference to use |
| |
| public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { |
| super(url, wrapChannelHandler(url, handler)); |
| } |
| |
| @Override |
| protected void doOpen() throws Throwable { |
| final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); |
| bootstrap = new Bootstrap(); |
| bootstrap.group(nioEventLoopGroup) |
| .option(ChannelOption.SO_KEEPALIVE, true) |
| .option(ChannelOption.TCP_NODELAY, true) |
| .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) |
| //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) |
| .channel(NioSocketChannel.class); |
| |
| if (getTimeout() < 3000) { |
| bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); |
| } else { |
| bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); |
| } |
| |
| bootstrap.handler(new ChannelInitializer() { |
| |
| @Override |
| protected void initChannel(Channel ch) throws Exception { |
| NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); |
| ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug |
| .addLast("decoder", adapter.getDecoder()) |
| .addLast("encoder", adapter.getEncoder()) |
| .addLast("handler", nettyClientHandler); |
| } |
| }); |
| } |
| |
| @Override |
| protected void doConnect() throws Throwable { |
| long start = System.currentTimeMillis(); |
| ChannelFuture future = bootstrap.connect(getConnectAddress()); |
| try { |
| boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); |
| |
| if (ret && future.isSuccess()) { |
| Channel newChannel = future.channel(); |
| try { |
| // Close old channel |
| Channel oldChannel = NettyClient.this.channel; // copy reference |
| if (oldChannel != null) { |
| try { |
| if (logger.isInfoEnabled()) { |
| logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); |
| } |
| oldChannel.close(); |
| } finally { |
| NettyChannel.removeChannelIfDisconnected(oldChannel); |
| } |
| } |
| } finally { |
| if (NettyClient.this.isClosed()) { |
| try { |
| if (logger.isInfoEnabled()) { |
| logger.info("Close new netty channel " + newChannel + ", because the client closed."); |
| } |
| newChannel.close(); |
| } finally { |
| NettyClient.this.channel = null; |
| NettyChannel.removeChannelIfDisconnected(newChannel); |
| } |
| } else { |
| NettyClient.this.channel = newChannel; |
| } |
| } |
| } else if (future.cause() != null) { |
| throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " |
| + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause()); |
| } else { |
| throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " |
| + getRemoteAddress() + " client-side timeout " |
| + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " |
| + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); |
| } |
| } finally { |
| if (!isConnected()) { |
| //future.cancel(true); |
| } |
| } |
| } |
| |
| @Override |
| protected void doDisConnect() throws Throwable { |
| try { |
| NettyChannel.removeChannelIfDisconnected(channel); |
| } catch (Throwable t) { |
| logger.warn(t.getMessage()); |
| } |
| } |
| |
| @Override |
| protected void doClose() throws Throwable { |
| //can't shutdown nioEventLoopGroup |
| //nioEventLoopGroup.shutdownGracefully(); |
| } |
| |
| @Override |
| protected com.alibaba.dubbo.remoting.Channel getChannel() { |
| Channel c = channel; |
| if (c == null || !c.isActive()) |
| return null; |
| return NettyChannel.getOrAddChannel(c, getUrl(), this); |
| } |
| |
| } |