blob: 85cd565173291a6db51a444fff20eaccee19adc3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslClientTlsHandler;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslContexts;
import org.apache.dubbo.remoting.utils.UrlUtils;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT;
import static org.apache.dubbo.remoting.transport.netty4.NettyEventLoopFactory.socketChannelClass;
public class NettyConnectionClient extends AbstractConnectionClient {
private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(NettyConnectionClient.class);
private AtomicReference<Promise<Object>> connectingPromise;
private Promise<Void> closePromise;
private AtomicReference<io.netty.channel.Channel> channel;
private ConnectionListener connectionListener;
private Bootstrap bootstrap;
public static final AttributeKey<AbstractConnectionClient> CONNECTION = AttributeKey.valueOf("connection");
private AtomicBoolean isReconnecting;
public NettyConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}
@Override
protected void initConnectionClient() {
this.protocol = getUrl().getOrDefaultFrameworkModel()
.getExtensionLoader(WireProtocol.class)
.getExtension(getUrl().getProtocol());
this.remote = getConnectAddress();
this.connectingPromise = new AtomicReference<>();
this.connectionListener = new ConnectionListener();
this.channel = new AtomicReference<>();
this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.init = new AtomicBoolean(false);
this.increase();
this.isReconnecting = new AtomicBoolean(false);
}
@Override
protected void doOpen() throws Throwable {
initConnectionClient();
initBootstrap();
}
private void initBootstrap() {
final Bootstrap nettyBootstrap = new Bootstrap();
nettyBootstrap
.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.remoteAddress(getConnectAddress())
.channel(socketChannelClass());
final NettyConnectionHandler connectionHandler = new NettyConnectionHandler(this);
nettyBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
SslContext sslContext = SslContexts.buildClientSslContext(getUrl());
nettyBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch, getUrl(), getChannelHandler());
final ChannelPipeline pipeline = ch.pipeline();
NettySslContextOperator nettySslContextOperator = new NettySslContextOperator();
if (sslContext != null) {
pipeline.addLast("negotiation", new SslClientTlsHandler(sslContext));
}
// pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO)); //for debug
int heartbeat = UrlUtils.getHeartbeat(getUrl());
pipeline.addLast("client-idle-handler", new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS));
pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler);
NettyConfigOperator operator = new NettyConfigOperator(nettyChannel, getChannelHandler());
protocol.configClientPipeline(getUrl(), operator, nettySslContextOperator);
// set null but do not close this client, it will be reconnect in the future
ch.closeFuture().addListener(channelFuture -> channel.set(null));
// TODO support Socks5
}
});
this.bootstrap = nettyBootstrap;
}
@Override
protected void doClose() {
// AbstractPeer close can set closed true.
if (isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Connection:%s freed ", this));
}
final io.netty.channel.Channel current = getNettyChannel();
if (current != null) {
current.close();
}
this.channel.set(null);
closePromise.setSuccess(null);
}
}
@Override
protected void doConnect() throws RemotingException {
if (!isReconnecting.compareAndSet(false, true)) {
return;
}
if (isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
String.format("%s aborted to reconnect cause connection closed. ", NettyConnectionClient.this));
}
}
init.compareAndSet(false, true);
long start = System.currentTimeMillis();
createConnectingPromise();
final ChannelFuture promise = bootstrap.connect();
promise.addListener(this.connectionListener);
boolean ret = connectingPromise.get().awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// destroy connectingPromise after used
synchronized (this) {
connectingPromise.set(null);
}
if (promise.cause() != null) {
Throwable cause = promise.cause();
// 6-1 Failed to connect to provider server by other reason.
RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getConnectAddress()
+ ", error message is:" + cause.getMessage(),
cause);
LOGGER.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
"Failed to connect to provider server by other reason.",
cause);
throw remotingException;
} else if (!ret || !promise.isSuccess()) {
// 6-2 Client-side timeout
RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server "
+ getConnectAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
LOGGER.error(
TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "", "Client-side timeout.", remotingException);
throw remotingException;
}
}
@Override
protected void doDisConnect() {
NettyChannel.removeChannelIfDisconnected(getNettyChannel());
}
@Override
public void onConnected(Object channel) {
if (!(channel instanceof io.netty.channel.Channel)) {
return;
}
io.netty.channel.Channel nettyChannel = ((io.netty.channel.Channel) channel);
if (isClosed()) {
nettyChannel.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s is closed, ignoring connected event", this));
}
return;
}
this.channel.set(nettyChannel);
// This indicates that the connection is available.
if (this.connectingPromise.get() != null) {
this.connectingPromise.get().trySuccess(CONNECTED_OBJECT);
}
nettyChannel.attr(CONNECTION).set(this);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s connected ", this));
}
}
@Override
public void onGoaway(Object channel) {
if (!(channel instanceof io.netty.channel.Channel)) {
return;
}
io.netty.channel.Channel nettyChannel = (io.netty.channel.Channel) channel;
if (this.channel.compareAndSet(nettyChannel, null)) {
NettyChannel.removeChannelIfDisconnected(nettyChannel);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s goaway", this));
}
}
}
@Override
protected Channel getChannel() {
io.netty.channel.Channel c = getNettyChannel();
if (c == null) {
return null;
}
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
io.netty.channel.Channel getNettyChannel() {
return this.channel.get();
}
@Override
public Object getChannel(Boolean generalizable) {
return Boolean.TRUE.equals(generalizable) ? getNettyChannel() : getChannel();
}
@Override
public boolean isAvailable() {
if (isClosed()) {
return false;
}
io.netty.channel.Channel nettyChannel = getNettyChannel();
if (nettyChannel != null && nettyChannel.isActive()) {
return true;
}
if (init.compareAndSet(false, true)) {
try {
doConnect();
} catch (RemotingException e) {
LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to connect to server: " + getConnectAddress());
}
}
createConnectingPromise();
connectingPromise.get().awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// destroy connectingPromise after used
synchronized (this) {
connectingPromise.set(null);
}
nettyChannel = getNettyChannel();
return nettyChannel != null && nettyChannel.isActive();
}
@Override
public void createConnectingPromise() {
connectingPromise.compareAndSet(null, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
}
public Promise<Void> getClosePromise() {
return closePromise;
}
public static AbstractConnectionClient getConnectionClientFromChannel(io.netty.channel.Channel channel) {
return channel.attr(CONNECTION).get();
}
public ChannelFuture write(Object request) throws RemotingException {
if (!isAvailable()) {
throw new RemotingException(
null,
null,
"Failed to send request " + request + ", cause: The channel to " + remote + " is closed!");
}
return ((io.netty.channel.Channel) getChannel()).writeAndFlush(request);
}
@Override
public void addCloseListener(Runnable func) {
getClosePromise().addListener(future -> func.run());
}
@Override
public void destroy() {
close();
}
@Override
public String toString() {
return super.toString() + " (Ref=" + this.getCounter() + ",local="
+ Optional.ofNullable(getChannel())
.map(Channel::getLocalAddress)
.orElse(null) + ",remote=" + getRemoteAddress();
}
class ConnectionListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) {
if (!isReconnecting.compareAndSet(true, false)) {
return;
}
if (future.isSuccess()) {
return;
}
final NettyConnectionClient connectionClient = NettyConnectionClient.this;
if (connectionClient.isClosed() || connectionClient.getCounter() == 0) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format(
"%s aborted to reconnect. %s",
connectionClient, future.cause().getMessage()));
}
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format(
"%s is reconnecting, attempt=%d cause=%s",
connectionClient, 0, future.cause().getMessage()));
}
connectivityExecutor.schedule(
() -> {
try {
connectionClient.doConnect();
} catch (RemotingException e) {
LOGGER.error(
TRANSPORT_FAILED_RECONNECT,
"",
"",
"Failed to connect to server: " + getConnectAddress());
}
},
reconnectDuaration,
TimeUnit.MILLISECONDS);
}
}
}