blob: 6b2796e8729d7dd4ebb4010c02402ecf98fcaa1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.impl.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
import org.apache.rocketmq.remoting.config.RemotingClientConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
private final Bootstrap clientBootstrap = new Bootstrap();
private final EventLoopGroup ioGroup;
private final Class<? extends SocketChannel> socketChannelClass;
private final RemotingClientConfig clientConfig;
private EventExecutorGroup workerGroup;
private ClientChannelManager clientChannelManager;
public NettyRemotingClient(final RemotingClientConfig clientConfig) {
super(clientConfig);
this.clientConfig = clientConfig;
if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
clientConfig.getClientWorkerThreads()));
socketChannelClass = EpollSocketChannel.class;
} else {
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
clientConfig.getClientWorkerThreads()));
socketChannelClass = NioSocketChannel.class;
}
this.clientChannelManager = new ClientChannelManager(clientBootstrap, clientConfig);
this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
}
@Override
public void start() {
super.start();
this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(workerGroup,
new Decoder(),
new Encoder(),
new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
new ClientConnectionHandler(),
new RemotingCommandDispatcher());
}
});
applyOptions(clientBootstrap);
startUpHouseKeepingService();
}
@Override
public void stop() {
try {
clientChannelManager.clear();
this.ioGroup.shutdownGracefully();
this.workerGroup.shutdownGracefully();
} catch (Exception e) {
LOG.warn("RemotingClient stopped error !", e);
}
super.stop();
}
private void applyOptions(Bootstrap bootstrap) {
if (null != clientConfig) {
if (clientConfig.getTcpSoSndBufSize() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
}
if (clientConfig.getTcpSoRcvBufSize() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
}
bootstrap.option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeoutMillis()).
option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
clientConfig.getWriteBufHighWaterMark()));
}
}
@Override
public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_SYNC);
Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
try {
return this.invokeWithInterceptor(channel, request, timeoutMillis);
} catch (RemoteTimeoutException e) {
if (this.clientConfig.isClientCloseSocketIfTimeout()) {
LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address);
this.clientChannelManager.closeChannel(address, channel);
}
LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address);
throw e;
} finally {
if (this.clientConfig.isClientShortConnectionEnable()) {
this.clientChannelManager.closeChannel(address, channel);
}
}
} else {
this.clientChannelManager.closeChannel(address, channel);
throw new RemoteConnectFailureException(address);
}
}
@Override
public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
final long timeoutMillis) {
final Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
} else {
this.clientChannelManager.closeChannel(address, channel);
}
}
@Override
public void invokeOneWay(final String address, final RemotingCommand request) {
final Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
this.invokeOnewayWithInterceptor(channel, request);
} else {
this.clientChannelManager.closeChannel(address, channel);
}
}
private class ClientConnectionHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)
throws Exception {
LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
super.connect(ctx, remoteAddress, localAddress, promise);
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel());
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
super.disconnect(ctx, promise);
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel());
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
super.close(ctx, promise);
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event);
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
}
}
}