| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.remoting.transport.netty4; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.Version; |
| import org.apache.dubbo.common.config.ConfigurationUtils; |
| import org.apache.dubbo.common.constants.CommonConstants; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.resource.GlobalResourceInitializer; |
| import org.apache.dubbo.common.utils.NetUtils; |
| import org.apache.dubbo.common.utils.StringUtils; |
| import org.apache.dubbo.remoting.ChannelHandler; |
| import org.apache.dubbo.remoting.Constants; |
| import org.apache.dubbo.remoting.RemotingException; |
| import org.apache.dubbo.remoting.api.SslClientTlsHandler; |
| import org.apache.dubbo.remoting.transport.AbstractClient; |
| import org.apache.dubbo.remoting.utils.UrlUtils; |
| |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.buffer.PooledByteBufAllocator; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.handler.proxy.Socks5ProxyHandler; |
| import io.netty.handler.timeout.IdleStateHandler; |
| import io.netty.util.concurrent.EventExecutorGroup; |
| |
| import java.net.InetSocketAddress; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.dubbo.common.constants.CommonConstants.SSL_ENABLED_KEY; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER; |
| import static org.apache.dubbo.remoting.Constants.DEFAULT_CONNECT_TIMEOUT; |
| import static org.apache.dubbo.remoting.api.NettyEventLoopFactory.eventLoopGroup; |
| import static org.apache.dubbo.remoting.api.NettyEventLoopFactory.socketChannelClass; |
| |
| /** |
| * NettyClient. |
| */ |
| public class NettyClient extends AbstractClient { |
| |
| private static final String SOCKS_PROXY_HOST = "socksProxyHost"; |
| |
| private static final String SOCKS_PROXY_PORT = "socksProxyPort"; |
| |
| private static final String DEFAULT_SOCKS_PROXY_PORT = "1080"; |
| |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class); |
| |
| /** |
| * netty client bootstrap |
| */ |
| private static final GlobalResourceInitializer<EventLoopGroup> EVENT_LOOP_GROUP = new GlobalResourceInitializer<>(() -> |
| eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker"), |
| EventExecutorGroup::shutdownGracefully); |
| |
| private Bootstrap bootstrap; |
| |
| /** |
| * current channel. Each successful invocation of {@link NettyClient#doConnect()} will |
| * replace this with new channel and close old channel. |
| * <b>volatile, please copy reference to use.</b> |
| */ |
| private volatile Channel channel; |
| |
| /** |
| * The constructor of NettyClient. |
| * It wil init and start netty. |
| */ |
| public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { |
| // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. |
| // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler |
| super(url, wrapChannelHandler(url, handler)); |
| } |
| |
| /** |
| * Init bootstrap |
| * |
| * @throws Throwable |
| */ |
| @Override |
| protected void doOpen() throws Throwable { |
| final NettyClientHandler nettyClientHandler = createNettyClientHandler(); |
| bootstrap = new Bootstrap(); |
| initBootstrap(nettyClientHandler); |
| } |
| |
| protected NettyClientHandler createNettyClientHandler() { |
| return new NettyClientHandler(getUrl(), this); |
| } |
| |
| protected void initBootstrap(NettyClientHandler nettyClientHandler) { |
| bootstrap.group(EVENT_LOOP_GROUP.get()) |
| .option(ChannelOption.SO_KEEPALIVE, true) |
| .option(ChannelOption.TCP_NODELAY, true) |
| .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) |
| //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) |
| .channel(socketChannelClass()); |
| |
| bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout())); |
| bootstrap.handler(new ChannelInitializer<SocketChannel>() { |
| |
| @Override |
| protected void initChannel(SocketChannel ch) throws Exception { |
| int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); |
| |
| if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { |
| ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl())); |
| } |
| |
| NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); |
| ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug |
| .addLast("decoder", adapter.getDecoder()) |
| .addLast("encoder", adapter.getEncoder()) |
| .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) |
| .addLast("handler", nettyClientHandler); |
| |
| String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST); |
| if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) { |
| int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); |
| Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); |
| ch.pipeline().addFirst(socks5ProxyHandler); |
| } |
| } |
| }); |
| } |
| |
| private boolean isFilteredAddress(String host) { |
| // filter local address |
| return StringUtils.isEquals(NetUtils.getLocalHost(), host) || NetUtils.isLocalHost(host); |
| } |
| |
| @Override |
| protected void doConnect() throws Throwable { |
| try { |
| String ipv6Address = NetUtils.getLocalHostV6(); |
| InetSocketAddress connectAddress; |
| //first try ipv6 address |
| if (ipv6Address != null && getUrl().getParameter(CommonConstants.IPV6_KEY) != null) { |
| connectAddress = new InetSocketAddress(getUrl().getParameter(CommonConstants.IPV6_KEY), getUrl().getPort()); |
| try { |
| doConnect(connectAddress); |
| return; |
| } catch (Throwable throwable) { |
| //ignore |
| } |
| } |
| |
| connectAddress = getConnectAddress(); |
| doConnect(connectAddress); |
| } finally { |
| // just add new valid channel to NettyChannel's cache |
| if (!isConnected()) { |
| //future.cancel(true); |
| } |
| } |
| } |
| |
| private void doConnect(InetSocketAddress serverAddress) throws RemotingException { |
| long start = System.currentTimeMillis(); |
| ChannelFuture future = bootstrap.connect(serverAddress); |
| try { |
| boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); |
| |
| if (ret && future.isSuccess()) { |
| Channel newChannel = future.channel(); |
| try { |
| // Close old channel |
| // copy reference |
| Channel oldChannel = NettyClient.this.channel; |
| if (oldChannel != null) { |
| try { |
| if (logger.isInfoEnabled()) { |
| logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); |
| } |
| oldChannel.close(); |
| } finally { |
| NettyChannel.removeChannelIfDisconnected(oldChannel); |
| } |
| } |
| } finally { |
| if (NettyClient.this.isClosed()) { |
| try { |
| if (logger.isInfoEnabled()) { |
| logger.info("Close new netty channel " + newChannel + ", because the client closed."); |
| } |
| newChannel.close(); |
| } finally { |
| NettyClient.this.channel = null; |
| NettyChannel.removeChannelIfDisconnected(newChannel); |
| } |
| } else { |
| NettyClient.this.channel = newChannel; |
| } |
| } |
| } else if (future.cause() != null) { |
| |
| Throwable cause = future.cause(); |
| |
| // 6-1 Failed to connect to provider server by other reason. |
| |
| RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " |
| + serverAddress + ", error message is:" + cause.getMessage(), cause); |
| |
| logger.error(TRANSPORT_FAILED_CONNECT_PROVIDER, "network disconnected", "", |
| "Failed to connect to provider server by other reason.", cause); |
| |
| throw remotingException; |
| |
| } else { |
| |
| // 6-2 Client-side timeout |
| |
| RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " |
| + serverAddress + " client-side timeout " |
| + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " |
| + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); |
| |
| logger.error(TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "", |
| "Client-side timeout.", remotingException); |
| |
| throw remotingException; |
| } |
| } finally { |
| // just add new valid channel to NettyChannel's cache |
| if (!isConnected()) { |
| //future.cancel(true); |
| } |
| } |
| } |
| |
| @Override |
| protected void doDisConnect() throws Throwable { |
| try { |
| NettyChannel.removeChannelIfDisconnected(channel); |
| } catch (Throwable t) { |
| logger.warn(t.getMessage()); |
| } |
| } |
| |
| @Override |
| protected void doClose() throws Throwable { |
| // can't shut down nioEventLoopGroup because the method will be invoked when closing one channel but not a client, |
| // but when and how to close the nioEventLoopGroup ? |
| // nioEventLoopGroup.shutdownGracefully(); |
| } |
| |
| @Override |
| protected org.apache.dubbo.remoting.Channel getChannel() { |
| Channel c = channel; |
| if (c == null) { |
| return null; |
| } |
| return NettyChannel.getOrAddChannel(c, getUrl(), this); |
| } |
| |
| Channel getNettyChannel() { |
| return channel; |
| } |
| |
| @Override |
| public boolean canHandleIdle() { |
| return true; |
| } |
| |
| protected EventLoopGroup getEventLoopGroup() { |
| return EVENT_LOOP_GROUP.get(); |
| } |
| |
| protected Bootstrap getBootstrap() { |
| return bootstrap; |
| } |
| } |