| /*
|
| * Copyright 1999-2011 Alibaba Group.
|
| *
|
| * Licensed under the Apache License, Version 2.0 (the "License");
|
| * you may not use this file except in compliance with the License.
|
| * You may obtain a copy of the License at
|
| *
|
| * http://www.apache.org/licenses/LICENSE-2.0
|
| *
|
| * Unless required by applicable law or agreed to in writing, software
|
| * distributed under the License is distributed on an "AS IS" BASIS,
|
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| * See the License for the specific language governing permissions and
|
| * limitations under the License.
|
| */
|
| package com.alibaba.dubbo.remoting.transport.netty; |
| |
| import java.util.concurrent.Executors;
|
| import java.util.concurrent.TimeUnit;
|
|
|
| import org.jboss.netty.bootstrap.ClientBootstrap;
|
| import org.jboss.netty.channel.Channel;
|
| import org.jboss.netty.channel.ChannelFactory;
|
| import org.jboss.netty.channel.ChannelFuture;
|
| import org.jboss.netty.channel.ChannelPipeline;
|
| import org.jboss.netty.channel.ChannelPipelineFactory;
|
| import org.jboss.netty.channel.Channels;
|
| import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
|
|
| import com.alibaba.dubbo.common.Constants;
|
| import com.alibaba.dubbo.common.URL;
|
| import com.alibaba.dubbo.common.Version;
|
| import com.alibaba.dubbo.common.logger.Logger;
|
| import com.alibaba.dubbo.common.logger.LoggerFactory;
|
| import com.alibaba.dubbo.common.utils.NamedThreadFactory;
|
| import com.alibaba.dubbo.common.utils.NetUtils;
|
| import com.alibaba.dubbo.remoting.ChannelHandler;
|
| import com.alibaba.dubbo.remoting.RemotingException;
|
| import com.alibaba.dubbo.remoting.transport.AbstractClient;
|
| import com.alibaba.dubbo.remoting.transport.handler.ChannelHandlers;
|
| |
| /** |
| * NettyClient. |
| * |
| * @author qian.lei |
| * @author william.liangf |
| */ |
| public class NettyClient extends AbstractClient {
|
|
|
| private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); |
| |
| // 因ChannelFactory的关闭有DirectMemory泄露,采用静态化规避 |
| // https://issues.jboss.org/browse/NETTY-424 |
| private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)), |
| Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)), |
| Constants.DEFAULT_IO_THREADS); |
| private ClientBootstrap bootstrap; |
| |
| private volatile Channel channel; // volatile, please copy reference to use |
| |
| public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{ |
| super(url, wrapChannelHandler(url, handler)); |
| } |
| protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){ |
| url = url.addParameter(Constants.THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME) |
| .addParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); |
| return ChannelHandlers.wrap(handler, url); |
| } |
| @Override |
| protected void doOpen() throws Throwable { |
| bootstrap = new ClientBootstrap(channelFactory); |
| // config |
| // @see org.jboss.netty.channel.socket.SocketChannelConfig |
| bootstrap.setOption("keepAlive", true); |
| bootstrap.setOption("tcpNoDelay", true); |
| bootstrap.setOption("connectTimeoutMillis", getTimeout()); |
| final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); |
| bootstrap.setPipelineFactory(new ChannelPipelineFactory() { |
| public ChannelPipeline getPipeline() { |
| NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); |
| ChannelPipeline pipeline = Channels.pipeline(); |
| pipeline.addLast("decoder", adapter.getDecoder()); |
| pipeline.addLast("encoder", adapter.getEncoder()); |
| pipeline.addLast("handler", nettyHandler); |
| return pipeline; |
| } |
| }); |
| } |
| |
| protected void doConnect() throws Throwable {
|
| long start = System.currentTimeMillis(); |
| ChannelFuture future = bootstrap.connect(getConnectAddress());
|
| try{
|
| boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
|
|
|
| if (ret && future.isSuccess()) {
|
| Channel newChannel = future.getChannel();
|
| newChannel.setInterestOps(Channel.OP_READ_WRITE);
|
| try {
|
| // 关闭旧的连接
|
| Channel oldChannel = NettyClient.this.channel; // copy reference
|
| if (oldChannel != null) {
|
| try {
|
| if (logger.isInfoEnabled()) {
|
| logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
|
| }
|
| oldChannel.close();
|
| } finally {
|
| NettyChannel.removeChannelIfDisconnected(oldChannel);
|
| }
|
| }
|
| } finally {
|
| if (NettyClient.this.isClosed()) {
|
| try {
|
| if (logger.isInfoEnabled()) {
|
| logger.info("Close new netty channel " + newChannel + ", because the client closed.");
|
| }
|
| newChannel.close();
|
| } finally {
|
| NettyClient.this.channel = null;
|
| NettyChannel.removeChannelIfDisconnected(newChannel);
|
| }
|
| } else {
|
| NettyClient.this.channel = newChannel;
|
| }
|
| }
|
| } else if (future.getCause() != null) {
|
| throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
|
| } else {
|
| throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " client-side timeout "
|
| + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
|
| + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
|
| + Version.getVersion());
|
| }
|
| }finally{
|
| if (! isConnected()) {
|
| future.cancel();
|
| }
|
| }
|
| } |
| |
| @Override |
| protected void doDisConnect() throws Throwable { |
| try { |
| NettyChannel.removeChannelIfDisconnected(channel); |
| } catch (Throwable t) { |
| logger.warn(t.getMessage()); |
| } |
| } |
| |
| @Override |
| protected void doClose() throws Throwable { |
| /*try { |
| bootstrap.releaseExternalResources(); |
| } catch (Throwable t) { |
| logger.warn(t.getMessage()); |
| }*/ |
| } |
| |
| @Override |
| protected com.alibaba.dubbo.remoting.Channel getChannel() { |
| Channel c = channel; |
| if (c == null || ! c.isConnected()) |
| return null; |
| return NettyChannel.getOrAddChannel(c, getUrl(), this); |
| } |
| |
| } |