blob: 9d639711f60465610b9c6dcd2c0f23c3e673b336 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.tcp.client;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.GoodbyeTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.HeartBeatTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.HelloTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.ListenTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.MessageAckTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.MessageTransferTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.RecommendTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.SubscribeTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.UnSubscribeTask;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.opentelemetry.api.trace.Span;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<Package> {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
private EventMeshTCPServer eventMeshTCPServer;
public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Exception {
long startTime = System.currentTimeMillis();
validateMsg(pkg);
eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics()
.getClient2eventMeshMsgNum().incrementAndGet();
Command cmd = pkg.getHeader().getCmd();
try {
Runnable task;
if (isNeedTrace(cmd)) {
pkg.getHeader().getProperties()
.put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshServerIp());
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IP, session.getClient().getHost());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IDC, session.getClient().getIdc());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup());
}
if (Command.RECOMMEND_REQUEST == cmd) {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
}
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
return;
}
if (Command.HELLO_REQUEST == cmd) {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
}
task = new HelloTask(pkg, ctx, startTime, eventMeshTCPServer);
eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
return;
}
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx) == null) {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg);
}
throw new Exception("no session is found");
}
logMessageFlow(ctx, pkg, cmd);
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx)
.getSessionState() == SessionState.CLOSED) {
throw new Exception(
"this eventMesh tcp session will be closed, may be reboot or version change!");
}
dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
log.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
if (isNeedTrace(cmd)) {
Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
TimeUnit.MILLISECONDS, false);
TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(),
"exception occurred while dispatch pkg", e);
}
writeToClient(cmd, pkg, ctx, e);
}
}
private boolean isNeedTrace(Command cmd) {
if (eventMeshTCPServer.getEventMeshTCPConfiguration().isEventMeshServerTraceEnable()
&& cmd != null && (Command.REQUEST_TO_SERVER == cmd
|| Command.ASYNC_MESSAGE_TO_SERVER == cmd
|| Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
return true;
}
return false;
}
private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e) {
try {
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
pkg.getHeader().getSeq()));
ctx.writeAndFlush(res);
} catch (Exception ex) {
log.warn("writeToClient failed", ex);
}
}
private Command getReplyCommand(Command cmd) {
switch (cmd) {
case HELLO_REQUEST:
return Command.HELLO_RESPONSE;
case RECOMMEND_REQUEST:
return Command.RECOMMEND_RESPONSE;
case HEARTBEAT_REQUEST:
return Command.HEARTBEAT_RESPONSE;
case SUBSCRIBE_REQUEST:
return Command.SUBSCRIBE_RESPONSE;
case UNSUBSCRIBE_REQUEST:
return Command.UNSUBSCRIBE_RESPONSE;
case LISTEN_REQUEST:
return Command.LISTEN_RESPONSE;
case CLIENT_GOODBYE_REQUEST:
return Command.CLIENT_GOODBYE_RESPONSE;
case REQUEST_TO_SERVER:
return Command.RESPONSE_TO_CLIENT;
case ASYNC_MESSAGE_TO_SERVER:
return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
case BROADCAST_MESSAGE_TO_SERVER:
return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
default:
return cmd;
}
}
private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) {
if (pkg.getBody() instanceof EventMeshMessage) {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
}
} else {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
}
}
}
private void validateMsg(Package pkg) throws Exception {
if (pkg == null) {
throw new Exception("the incoming message is empty.");
}
if (pkg.getHeader() == null) {
log.error("the incoming message does not have a header|pkg={}", pkg);
throw new Exception("the incoming message does not have a header.");
}
if (pkg.getHeader().getCmd() == null) {
log.error("the incoming message does not have a command type|pkg={}", pkg);
throw new Exception("the incoming message does not have a command type.");
}
}
private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd)
throws Exception {
Runnable task;
switch (cmd) {
case HEARTBEAT_REQUEST:
task = new HeartBeatTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case CLIENT_GOODBYE_REQUEST:
case SERVER_GOODBYE_RESPONSE:
task = new GoodbyeTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case SUBSCRIBE_REQUEST:
task = new SubscribeTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case UNSUBSCRIBE_REQUEST:
task = new UnSubscribeTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case LISTEN_REQUEST:
task = new ListenTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case REQUEST_TO_SERVER:
case RESPONSE_TO_SERVER:
case ASYNC_MESSAGE_TO_SERVER:
case BROADCAST_MESSAGE_TO_SERVER:
task = new MessageTransferTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
case RESPONSE_TO_CLIENT_ACK:
case ASYNC_MESSAGE_TO_CLIENT_ACK:
case BROADCAST_MESSAGE_TO_CLIENT_ACK:
case REQUEST_TO_CLIENT_ACK:
task = new MessageAckTask(pkg, ctx, startTime, eventMeshTCPServer);
break;
default:
throw new Exception("unknown cmd");
}
eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
}
}