blob: 502622459b9aedbcc02beb119b439020703bf7fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.impl.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingEndPoint;
import org.apache.rocketmq.remoting.api.RemotingService;
import org.apache.rocketmq.remoting.api.RequestProcessor;
import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.ResponseResult;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.config.RemotingConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.internal.UIDGenerator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class NettyRemotingAbstract implements RemotingService {
protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
private final Semaphore semaphoreOneway;
private final Semaphore semaphoreAsync;
private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>();
private final AtomicLong responseCounter = new AtomicLong(0);
private final RemotingCommandFactory remotingCommandFactory;
private final String remotingInstanceId = UIDGenerator.instance().createUID();
private final ExecutorService publicExecutor;
protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
private InterceptorGroup interceptorGroup = new InterceptorGroup();
private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
NettyRemotingAbstract(RemotingConfig clientConfig) {
this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientAsyncCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl();
}
protected void putNettyEvent(final NettyChannelEvent event) {
this.channelEventExecutor.putNettyEvent(event);
}
protected void startUpHouseKeepingService() {
this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
scanResponseTable();
}
}, 3000, 1000, TimeUnit.MICROSECONDS);
}
@Override
public void start() {
if (this.channelEventListenerGroup.size() > 0) {
this.channelEventExecutor.start();
}
}
@Override
public void stop() {
ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownGracefully(channelEventExecutor);
}
protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand command) throws Exception {
if (command != null) {
switch (command.trafficType()) {
case REQUEST_ONEWAY:
case REQUEST_ASYNC:
case REQUEST_SYNC:
processRequestCommand(ctx, command);
break;
case RESPONSE:
processResponseCommand(ctx, command);
break;
default:
LOG.warn("Not supported The traffic type {} !", command.trafficType());
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.opCode());
RemotingChannel channel = new NettyChannelImpl(ctx.channel());
Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel);
try {
processorExecutorPair.getRight().submit(run);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd,
extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
}
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE,
extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
RemotingCommand response = remotingCommandFactory.createResponse(cmd);
response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
response.remark("SYSTEM_BUSY");
writeAndFlush(ctx.channel(), response);
}
}
}
@NotNull
private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
return new Runnable() {
@Override
public void run() {
try {
interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
extractRemoteAddress(ctx.channel()), cmd));
RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
extractRemoteAddress(ctx.channel()), cmd, response));
handleResponse(response, cmd, ctx);
} catch (Throwable e) {
LOG.error(String.format("Process request %s error !", cmd.toString()), e);
handleException(e, cmd, ctx);
}
}
};
}
private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
//FiXME Exception interceptor can not throw exception
interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
}
}
private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
if (response != null) {
try {
writeAndFlush(ctx.channel(), response);
} catch (Throwable e) {
LOG.error(String.format("Process request %s success, but transfer response %s failed !",
cmd.toString(), response.toString()), e);
}
}
}
}
private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final ResponseResult responseResult = ackTables.get(cmd.requestID());
if (responseResult != null) {
responseResult.setResponseCommand(cmd);
responseResult.release();
long time = System.currentTimeMillis();
ackTables.remove(cmd.requestID());
if (responseCounter.incrementAndGet() % 5000 == 0) {
LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(),
ackTables.size());
}
if (responseResult.getAsyncHandler() != null) {
boolean sameThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseResult.executeCallbackArrived(responseResult.getResponseCommand());
} catch (Throwable e) {
LOG.warn("Execute callback error !", e);
}
}
});
} catch (RejectedExecutionException e) {
sameThread = true;
LOG.warn("Execute submit error !", e);
}
} else {
sameThread = true;
}
if (sameThread) {
try {
responseResult.executeCallbackArrived(responseResult.getResponseCommand());
} catch (Throwable e) {
LOG.warn("Execute callback in response thread error !", e);
}
}
} else {
responseResult.putResponse(cmd);
}
} else {
LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel()));
}
}
private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) {
channel.writeAndFlush(msg).addListener(listener);
}
private void writeAndFlush(final Channel channel, final Object msg) {
channel.writeAndFlush(msg);
}
public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
}
void scanResponseTable() {
/*
Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, ResponseResult> next = iterator.next();
ResponseResult result = next.getValue();
if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
iterator.remove();
try {
long timeoutMillis = result.getTimeoutMillis();
long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
result.onTimeout(timeoutMillis, costTimeMillis);
LOG.error("scan response table command {} failed", result.getRequestId());
} catch (Throwable e) {
LOG.warn("Error occurred when execute timeout callback !", e);
} finally {
result.release();
LOG.warn("Removed timeout request {} ", result);
}
}
}
*/
}
public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request,
long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_SYNC);
final String remoteAddr = extractRemoteAddress(channel);
//FIXME try catch here
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
extractRemoteAddress(channel), request, responseCommand));
return responseCommand;
}
private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
try {
final int opaque = request.requestID();
final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis);
responseResult.setRequestCommand(request);
//FIXME one interceptor for all case ?
responseResult.setInterceptorGroup(this.interceptorGroup);
responseResult.setRemoteAddr(remoteAddr);
this.ackTables.put(opaque, responseResult);
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseResult.setSendRequestOK(true);
return;
} else {
responseResult.setSendRequestOK(false);
ackTables.remove(opaque);
responseResult.setCause(f.cause());
responseResult.putResponse(null);
LOG.warn("Send request command to {} failed !", remoteAddr);
}
}
};
this.writeAndFlush(channel, request, listener);
RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseResult.isSendRequestOK()) {
throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause());
}
/*
else {
throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
}*/
}
return responseCommand;
} finally {
this.ackTables.remove(request.requestID());
}
}
public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request,
final AsyncHandler invokeCallback, long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_ASYNC);
final String remoteAddr = extractRemoteAddress(channel);
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
Exception exception = null;
try {
this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback);
} catch (InterruptedException e) {
exception = e;
} finally {
if (null != exception) {
try {
this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
} catch (Throwable e) {
LOG.warn("onException ", e);
}
}
}
}
private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException {
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final int requestID = request.requestID();
SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once);
responseResult.setRequestCommand(request);
responseResult.setInterceptorGroup(this.interceptorGroup);
responseResult.setRemoteAddr(remoteAddr);
this.ackTables.put(request.requestID(), responseResult);
try {
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
responseResult.setSendRequestOK(f.isSuccess());
if (f.isSuccess()) {
return;
}
responseResult.putResponse(null);
ackTables.remove(requestID);
try {
responseResult.executeRequestSendFailed();
} catch (Throwable e) {
LOG.warn("Execute callback error !", e);
} finally {
responseResult.release();
}
LOG.warn("Send request command to channel failed.", remoteAddr);
}
};
this.writeAndFlush(channel, request, listener);
} catch (Exception e) {
responseResult.release();
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
LOG.error(info);
throw new RemoteTimeoutException(info);
}
}
public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_ONEWAY);
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
Exception exception = null;
try {
this.invokeOneway0(channel, request, timeoutMillis);
} catch (InterruptedException e) {
exception = e;
} finally {
if (null != exception) {
try {
this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
} catch (Throwable e) {
LOG.warn("onException ", e);
}
}
}
}
private void invokeOneway0(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException {
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
final SocketAddress socketAddress = channel.remoteAddress();
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
LOG.warn("Send request command to channel {} failed !", socketAddress);
}
}
};
this.writeAndFlush(channel, request, listener);
} catch (Exception e) {
once.release();
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
LOG.error(info);
throw new RemoteTimeoutException(info);
}
}
public String getRemotingInstanceId() {
return remotingInstanceId;
}
@Override
public RemotingCommandFactory commandFactory() {
return this.remotingCommandFactory;
}
@Override
public void registerRequestProcessor(String requestCode, RequestProcessor processor, ExecutorService executor) {
Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executor);
if (!this.processorTables.containsKey(requestCode)) {
this.processorTables.put(requestCode, pair);
}
}
@Override
public void registerRequestProcessor(String requestCode, RequestProcessor processor) {
this.registerRequestProcessor(requestCode, processor, publicExecutor);
}
@Override
public void unregisterRequestProcessor(String requestCode) {
this.processorTables.remove(requestCode);
}
@Override
public String remotingInstanceId() {
return this.getRemotingInstanceId();
}
@Override
public void registerInterceptor(Interceptor interceptor) {
this.interceptorGroup.registerInterceptor(interceptor);
}
@Override
public void registerChannelEventListener(ChannelEventListener listener) {
this.channelEventListenerGroup.registerChannelEventListener(listener);
}
@Override
public Pair<RequestProcessor, ExecutorService> processor(String requestCode) {
return processorTables.get(requestCode);
}
protected String extractRemoteAddress(Channel channel) {
return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
}
class ChannelEventExecutor extends Thread {
private final static int MAX_SIZE = 10000;
private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>();
private String name;
public ChannelEventExecutor(String nettyEventExector) {
super(nettyEventExector);
this.name = nettyEventExector;
}
//private final AtomicBoolean isStopped = new AtomicBoolean(true);
public void putNettyEvent(final NettyChannelEvent event) {
if (this.eventQueue.size() <= MAX_SIZE) {
this.eventQueue.add(event);
} else {
LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
LOG.info(this.name + " service started");
ChannelEventListenerGroup listener = NettyRemotingAbstract.this.channelEventListenerGroup;
while (true) {
try {
NettyChannelEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
RemotingChannel channel = new NettyChannelImpl(event.getChannel());
LOG.warn("Channel Event, {}", event);
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(channel);
break;
case INACTIVE:
listener.onChannelClose(channel);
break;
case ACTIVE:
listener.onChannelConnect(channel);
break;
case EXCEPTION:
listener.onChannelException(channel);
break;
default:
break;
}
}
} catch (Exception e) {
LOG.error("error", e);
break;
}
}
}
}
protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}