| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.remoting.impl.netty; |
| |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.SimpleChannelInboundHandler; |
| import java.net.SocketAddress; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.rocketmq.remoting.api.AsyncHandler; |
| import org.apache.rocketmq.remoting.api.RemotingEndPoint; |
| import org.apache.rocketmq.remoting.api.RemotingService; |
| import org.apache.rocketmq.remoting.api.RequestProcessor; |
| import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; |
| import org.apache.rocketmq.remoting.api.channel.RemotingChannel; |
| import org.apache.rocketmq.remoting.api.command.RemotingCommand; |
| import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; |
| import org.apache.rocketmq.remoting.api.command.TrafficType; |
| import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; |
| import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; |
| import org.apache.rocketmq.remoting.api.interceptor.Interceptor; |
| import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; |
| import org.apache.rocketmq.remoting.api.interceptor.RequestContext; |
| import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; |
| import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup; |
| import org.apache.rocketmq.remoting.common.Pair; |
| import org.apache.rocketmq.remoting.common.ResponseFuture; |
| import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; |
| import org.apache.rocketmq.remoting.config.RemotingConfig; |
| import org.apache.rocketmq.remoting.external.ThreadUtils; |
| import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; |
| import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; |
| import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode; |
| import org.apache.rocketmq.remoting.internal.RemotingUtil; |
| import org.apache.rocketmq.remoting.internal.UIDGenerator; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class NettyRemotingAbstract implements RemotingService { |
| protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class); |
| protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor"); |
| private final Semaphore semaphoreOneway; |
| private final Semaphore semaphoreAsync; |
| private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap<Integer, ResponseFuture>(256); |
| private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>(); |
| private final RemotingCommandFactory remotingCommandFactory; |
| private final String remotingInstanceId = UIDGenerator.instance().createUID(); |
| |
| private final ExecutorService publicExecutor; |
| private final ExecutorService asyncHandlerExecutor; |
| protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true); |
| private InterceptorGroup interceptorGroup = new InterceptorGroup(); |
| private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup(); |
| |
| NettyRemotingAbstract(RemotingConfig remotingConfig) { |
| this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true); |
| this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true); |
| this.publicExecutor = ThreadUtils.newFixedThreadPool( |
| remotingConfig.getPublicExecutorThreads(), |
| 10000, "Remoting-PublicExecutor", true); |
| |
| this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool( |
| remotingConfig.getAsyncHandlerExecutorThreads(), |
| 10000, "Remoting-PublicExecutor", true); |
| this.remotingCommandFactory = new RemotingCommandFactoryImpl(); |
| } |
| |
| protected void putNettyEvent(final NettyChannelEvent event) { |
| if (channelEventListenerGroup != null && channelEventListenerGroup.size() != 0) { |
| this.channelEventExecutor.putNettyEvent(event); |
| } |
| } |
| |
| protected void startUpHouseKeepingService() { |
| this.houseKeepingService.scheduleAtFixedRate(new Runnable() { |
| @Override |
| public void run() { |
| scanResponseTable(); |
| } |
| }, 3000, 1000, TimeUnit.MICROSECONDS); |
| } |
| |
| void scanResponseTable() { |
| final List<Integer> rList = new ArrayList<>(); |
| |
| for (final Map.Entry<Integer, ResponseFuture> next : this.ackTables.entrySet()) { |
| ResponseFuture responseFuture = next.getValue(); |
| |
| if ((responseFuture.getBeginTimestamp() + responseFuture.getTimeoutMillis()) <= System.currentTimeMillis()) { |
| rList.add(responseFuture.getRequestId()); |
| } |
| } |
| |
| for (Integer requestID : rList) { |
| ResponseFuture rf = this.ackTables.remove(requestID); |
| |
| if (rf != null) { |
| LOG.warn("remove timeout request {} ", rf); |
| rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis())); |
| executeAsyncHandler(rf); |
| } |
| } |
| } |
| |
| @Override |
| public void start() { |
| if (this.channelEventListenerGroup.size() > 0) { |
| this.channelEventExecutor.start(); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); |
| ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS); |
| ThreadUtils.shutdownGracefully(asyncHandlerExecutor, 2000, TimeUnit.MILLISECONDS); |
| ThreadUtils.shutdownGracefully(channelEventExecutor); |
| } |
| |
| protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand command) throws Exception { |
| if (command != null) { |
| switch (command.trafficType()) { |
| case REQUEST_ONEWAY: |
| case REQUEST_ASYNC: |
| case REQUEST_SYNC: |
| processRequestCommand(ctx, command); |
| break; |
| case RESPONSE: |
| processResponseCommand(ctx, command); |
| break; |
| default: |
| LOG.warn("The traffic type {} is NOT supported!", command.trafficType()); |
| break; |
| } |
| } |
| } |
| |
| public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { |
| Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.cmdCode()); |
| |
| if (processorExecutorPair == null) { |
| final RemotingCommand response = commandFactory().createResponse(cmd); |
| response.opCode(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED); |
| ctx.writeAndFlush(response); |
| LOG.warn("The command code {} is NOT supported!", cmd.cmdCode()); |
| return; |
| } |
| |
| RemotingChannel channel = new NettyChannelImpl(ctx.channel()); |
| |
| Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel); |
| |
| try { |
| processorExecutorPair.getRight().submit(run); |
| } catch (RejectedExecutionException e) { |
| LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd, |
| RemotingUtil.extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); |
| |
| if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { |
| RemotingCommand response = remotingCommandFactory.createResponse(cmd); |
| response.opCode(RemotingSysResponseCode.SYSTEM_BUSY); |
| response.remark("SYSTEM_BUSY"); |
| writeAndFlush(ctx.channel(), response); |
| } |
| } |
| } |
| |
| private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand response) { |
| final ResponseFuture responseFuture = ackTables.remove(response.requestID()); |
| if (responseFuture != null) { |
| responseFuture.setResponseCommand(response); |
| responseFuture.release(); |
| |
| this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, |
| RemotingUtil.extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response)); |
| |
| if (responseFuture.getAsyncHandler() != null) { |
| executeAsyncHandler(responseFuture); |
| } else { |
| responseFuture.putResponse(response); |
| responseFuture.release(); |
| } |
| } else { |
| LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel())); |
| } |
| } |
| |
| @NotNull |
| private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd, |
| final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) { |
| return new Runnable() { |
| @Override |
| public void run() { |
| try { |
| interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, |
| RemotingUtil.extractRemoteAddress(ctx.channel()), cmd)); |
| |
| RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd); |
| |
| interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, |
| RemotingUtil.extractRemoteAddress(ctx.channel()), cmd, response)); |
| |
| handleResponse(response, cmd, ctx); |
| } catch (Throwable e) { |
| LOG.error(String.format("Process request %s error !", cmd.toString()), e); |
| |
| handleException(e, cmd, ctx); |
| } |
| } |
| }; |
| } |
| |
| private void writeAndFlush(final Channel channel, final Object msg) { |
| channel.writeAndFlush(msg); |
| } |
| |
| /** |
| * Execute callback in callback executor. If callback executor is null, run directly in current thread |
| */ |
| private void executeAsyncHandler(final ResponseFuture responseFuture) { |
| boolean runInThisThread = false; |
| ExecutorService executor = asyncHandlerExecutor; |
| if (executor != null) { |
| try { |
| executor.submit(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| responseFuture.executeAsyncHandler(); |
| } catch (Throwable e) { |
| LOG.warn("execute callback in executor exception, and callback throw", e); |
| } finally { |
| responseFuture.release(); |
| } |
| } |
| }); |
| } catch (Throwable e) { |
| runInThisThread = true; |
| LOG.warn("execute callback in executor exception, maybe executor busy", e); |
| } |
| } else { |
| runInThisThread = true; |
| } |
| |
| if (runInThisThread) { |
| try { |
| responseFuture.executeAsyncHandler(); |
| } catch (Throwable e) { |
| LOG.warn("executeInvokeCallback Exception", e); |
| } finally { |
| responseFuture.release(); |
| } |
| } |
| } |
| |
| private void requestFail(final int requestID, final Throwable cause) { |
| ResponseFuture responseFuture = ackTables.remove(requestID); |
| if (responseFuture != null) { |
| responseFuture.setSendRequestOK(false); |
| responseFuture.putResponse(null); |
| responseFuture.setCause(cause); |
| executeAsyncHandler(responseFuture); |
| } |
| } |
| |
| private void requestFail(final ResponseFuture responseFuture, final Throwable cause) { |
| responseFuture.setCause(cause); |
| executeAsyncHandler(responseFuture); |
| } |
| |
| private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { |
| if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { |
| if (response != null) { |
| try { |
| writeAndFlush(ctx.channel(), response); |
| } catch (Throwable e) { |
| LOG.error(String.format("Process request %s success, but transfer response %s failed !", |
| cmd.toString(), response.toString()), e); |
| } |
| } |
| } |
| |
| } |
| |
| private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) { |
| if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { |
| RemotingCommand response = remotingCommandFactory.createResponse(cmd); |
| response.opCode(RemotingSysResponseCode.SYSTEM_ERROR); |
| response.remark("SYSTEM_ERROR"); |
| writeAndFlush(ctx.channel(), response); |
| } |
| } |
| |
| public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request, |
| long timeoutMillis) { |
| request.trafficType(TrafficType.REQUEST_SYNC); |
| |
| final String remoteAddr = RemotingUtil.extractRemoteAddress(channel); |
| |
| this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); |
| |
| RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis); |
| |
| this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, |
| RemotingUtil.extractRemoteAddress(channel), request, responseCommand)); |
| |
| return responseCommand; |
| } |
| |
| private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request, |
| final long timeoutMillis) { |
| try { |
| final int requestID = request.requestID(); |
| final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis); |
| responseFuture.setRequestCommand(request); |
| responseFuture.setRemoteAddr(remoteAddr); |
| |
| this.ackTables.put(requestID, responseFuture); |
| |
| ChannelFutureListener listener = new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture f) throws Exception { |
| if (f.isSuccess()) { |
| responseFuture.setSendRequestOK(true); |
| return; |
| } else { |
| responseFuture.setSendRequestOK(false); |
| |
| ackTables.remove(requestID); |
| responseFuture.setCause(f.cause()); |
| responseFuture.putResponse(null); |
| |
| LOG.warn("Send request command to {} failed !", remoteAddr); |
| } |
| } |
| }; |
| |
| this.writeAndFlush(channel, request, listener); |
| |
| RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); |
| |
| if (null == responseCommand) { |
| if (responseFuture.isSendRequestOK()) { |
| throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); |
| } else { |
| throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause()); |
| } |
| } |
| |
| return responseCommand; |
| } finally { |
| this.ackTables.remove(request.requestID()); |
| } |
| } |
| |
| private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) { |
| channel.writeAndFlush(msg).addListener(listener); |
| } |
| |
| public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request, |
| final AsyncHandler invokeCallback, long timeoutMillis) { |
| request.trafficType(TrafficType.REQUEST_ASYNC); |
| |
| final String remoteAddr = RemotingUtil.extractRemoteAddress(channel); |
| |
| this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); |
| |
| this.invokeAsync0(remoteAddr, channel, request, invokeCallback, timeoutMillis); |
| } |
| |
| private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request, |
| final AsyncHandler asyncHandler, final long timeoutMillis) { |
| boolean acquired = this.semaphoreAsync.tryAcquire(); |
| if (acquired) { |
| final int requestID = request.requestID(); |
| |
| SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); |
| |
| final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis, asyncHandler, once); |
| responseFuture.setRequestCommand(request); |
| responseFuture.setRemoteAddr(remoteAddr); |
| |
| this.ackTables.put(requestID, responseFuture); |
| try { |
| ChannelFutureListener listener = new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture f) { |
| responseFuture.setSendRequestOK(f.isSuccess()); |
| if (f.isSuccess()) { |
| return; |
| } |
| |
| requestFail(requestID, f.cause()); |
| LOG.warn("Send request command to channel failed.", remoteAddr); |
| } |
| }; |
| |
| this.writeAndFlush(channel, request, listener); |
| } catch (Exception e) { |
| requestFail(requestID, e); |
| LOG.error("Send request command to channel " + channel + " error !", e); |
| } |
| } else { |
| String info = String.format("No available async semaphore to issue the request request %s", request.toString()); |
| requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new RemoteAccessException(info)); |
| LOG.error(info); |
| } |
| } |
| |
| public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) { |
| request.trafficType(TrafficType.REQUEST_ONEWAY); |
| |
| this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channel), request)); |
| this.invokeOneway0(channel, request); |
| } |
| |
| private void invokeOneway0(final Channel channel, final RemotingCommand request) { |
| boolean acquired = this.semaphoreOneway.tryAcquire(); |
| if (acquired) { |
| final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); |
| try { |
| final SocketAddress socketAddress = channel.remoteAddress(); |
| |
| ChannelFutureListener listener = new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture f) { |
| once.release(); |
| if (!f.isSuccess()) { |
| LOG.warn("Send request command to channel {} failed !", socketAddress); |
| } |
| } |
| }; |
| |
| this.writeAndFlush(channel, request, listener); |
| } catch (Exception e) { |
| once.release(); |
| LOG.error("Send request command to channel " + channel + " error !", e); |
| } |
| } else { |
| String info = String.format("No available oneway semaphore to issue the request %s", request.toString()); |
| LOG.error(info); |
| } |
| } |
| |
| @Override |
| public void registerInterceptor(Interceptor interceptor) { |
| this.interceptorGroup.registerInterceptor(interceptor); |
| } |
| |
| @Override |
| public void registerRequestProcessor(short requestCode, RequestProcessor processor, ExecutorService executor) { |
| Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executor); |
| if (!this.processorTables.containsKey(requestCode)) { |
| this.processorTables.put(requestCode, pair); |
| } |
| } |
| |
| @Override |
| public void registerRequestProcessor(short requestCode, RequestProcessor processor) { |
| this.registerRequestProcessor(requestCode, processor, publicExecutor); |
| } |
| |
| @Override |
| public void unregisterRequestProcessor(short requestCode) { |
| this.processorTables.remove(requestCode); |
| } |
| |
| @Override |
| public Pair<RequestProcessor, ExecutorService> processor(short requestCode) { |
| return processorTables.get(requestCode); |
| } |
| |
| @Override |
| public String remotingInstanceId() { |
| return this.getRemotingInstanceId(); |
| } |
| |
| @Override |
| public RemotingCommandFactory commandFactory() { |
| return this.remotingCommandFactory; |
| } |
| |
| public String getRemotingInstanceId() { |
| return remotingInstanceId; |
| } |
| |
| @Override |
| public void registerChannelEventListener(ChannelEventListener listener) { |
| this.channelEventListenerGroup.registerChannelEventListener(listener); |
| } |
| |
| class ChannelEventExecutor extends Thread { |
| private final static int MAX_SIZE = 10000; |
| private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>(); |
| private String name; |
| |
| public ChannelEventExecutor(String nettyEventExector) { |
| super(nettyEventExector); |
| this.name = nettyEventExector; |
| } |
| |
| public void putNettyEvent(final NettyChannelEvent event) { |
| if (this.eventQueue.size() <= MAX_SIZE) { |
| this.eventQueue.add(event); |
| } else { |
| LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); |
| } |
| } |
| |
| @Override |
| public void run() { |
| LOG.info(this.name + " service started"); |
| |
| ChannelEventListenerGroup listener = NettyRemotingAbstract.this.channelEventListenerGroup; |
| |
| while (true) { |
| try { |
| NettyChannelEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); |
| if (event != null && listener != null) { |
| RemotingChannel channel = new NettyChannelImpl(event.getChannel()); |
| |
| LOG.warn("Channel Event, {}", event); |
| |
| switch (event.getType()) { |
| case IDLE: |
| listener.onChannelIdle(channel); |
| break; |
| case CLOSE: |
| listener.onChannelClose(channel); |
| break; |
| case CONNECT: |
| listener.onChannelConnect(channel); |
| break; |
| case EXCEPTION: |
| listener.onChannelException(channel); |
| break; |
| default: |
| break; |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("error", e); |
| break; |
| } |
| } |
| } |
| |
| } |
| |
| protected class RemotingCommandDispatcher extends SimpleChannelInboundHandler<RemotingCommand> { |
| |
| @Override |
| protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
| processMessageReceived(ctx, msg); |
| } |
| } |
| |
| } |