blob: 6fceaa20be4699de188ff08a818988ef73fdc6a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.http.processor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.trace.Span;
public class SendAsyncMessageProcessor implements HttpRequestProcessor {
private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
private static final Logger HTTP_LOGGER = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
private static final Logger CMD_LOGGER = LoggerFactory.getLogger(EventMeshConstants.CMD);
private static final Logger ACL_LOGGER = LoggerFactory.getLogger(EventMeshConstants.ACL);
private final EventMeshHTTPServer eventMeshHTTPServer;
private final Acl acl;
public SendAsyncMessageProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.acl = eventMeshHTTPServer.getAcl();
}
@Override
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
String localAddress = IPUtils.getLocalAddress();
HttpCommand request = asyncContext.getRequest();
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(
Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP,
remoteAddr, localAddress);
SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
eventMeshHttpConfiguration.getEventMeshCluster(),
localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
eventMeshHttpConfiguration.getEventMeshIDC());
String protocolType = sendMessageRequestHeader.getProtocolType();
String protocolVersion = sendMessageRequestHeader.getProtocolVersion();
ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);
Span span = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, true);
//validate event
if (!ObjectUtils.allNotNull(event, event.getSource(), event.getSpecVersion())
|| StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR);
return;
}
String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC);
String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID);
String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS);
//validate event-extension
if (StringUtils.isAnyBlank(idc, pid, sys)
|| !StringUtils.isNumeric(pid)) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR);
return;
}
String bizNo = getExtension(event, SendMessageRequestBody.BIZSEQNO);
String uniqueId = getExtension(event, SendMessageRequestBody.UNIQUEID);
String producerGroup = getExtension(event, SendMessageRequestBody.PRODUCERGROUP);
String topic = event.getSubject();
//validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
|| event.getData() == null) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR);
return;
}
//do acl check
if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME);
String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD);
String subsystem = getExtension(event, ProtocolKey.ClientInstanceKey.SYS);
int requestCode = Integer.parseInt(request.getRequestCode());
try {
this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
} catch (Exception e) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageResponseBody.class);
ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_ACL_ERR);
return;
}
}
final HttpSummaryMetrics summaryMetrics = eventMeshHTTPServer.getMetrics().getSummaryMetrics();
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, null, SendMessageResponseBody.class);
summaryMetrics.recordHTTPDiscard();
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR);
return;
}
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.isStarted()) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, null, SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR);
return;
}
String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
String ttlExt = getExtension(event, SendMessageRequestBody.TTL);
if (StringUtils.isBlank(ttlExt) && !StringUtils.isNumeric(ttlExt)) {
event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
}
String content = event.getData() == null ? "" : new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
if (content.length() > eventMeshHttpConfiguration.getEventMeshEventSize()) {
HTTP_LOGGER.error("Event size exceeds the limit: {}",
eventMeshHttpConfiguration.getEventMeshEventSize());
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR,
"Event size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventSize(),
SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR);
return;
}
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, request.reqTime)
.withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, eventMeshHttpConfiguration.getEventMeshServerIp())
.build();
if (MESSAGE_LOGGER.isDebugEnabled()) {
MESSAGE_LOGGER.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
}
} catch (Exception e) {
MESSAGE_LOGGER.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR,
EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR);
return;
}
final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer,
eventMeshHTTPServer);
summaryMetrics.recordSendMsg();
long startTime = System.currentTimeMillis();
final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
if (HTTP_LOGGER.isDebugEnabled()) {
HTTP_LOGGER.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
summaryMetrics.recordHTTPReqResTimeCost(
System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
//ignore
}
};
try {
event = CloudEventBuilder.from(sendMessageContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
sendMessageContext.setEvent(event);
Span clientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
try {
eventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
HttpCommand succ = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString()));
asyncContext.onComplete(succ, handler);
long endTime = System.currentTimeMillis();
summaryMetrics.recordSendMsgCost(endTime - startTime);
MESSAGE_LOGGER.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId);
TraceUtils.finishSpan(span, sendMessageContext.getEvent());
}
@Override
public void onException(OnExceptionContext context) {
HttpCommand err = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2)));
asyncContext.onComplete(err, handler);
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId, context.getException());
TraceUtils.finishSpanWithException(span,
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, sendMessageContext.getEvent()),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), context.getException());
}
});
} finally {
TraceUtils.finishSpan(clientSpan, event);
}
} catch (Exception ex) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, null, SendMessageResponseBody.class);
spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR);
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId, ex);
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
}
}
private void spanWithException(CloudEvent event, String protocolVersion, EventMeshRetCode retCode) {
Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
TraceUtils.finishSpanWithException(excepSpan, EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
retCode.getErrMsg(), null);
}
}