| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.eventmesh.runtime.boot; |
| |
| import org.apache.eventmesh.common.Constants; |
| import org.apache.eventmesh.common.ThreadPoolFactory; |
| import org.apache.eventmesh.common.protocol.http.HttpCommand; |
| import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; |
| import org.apache.eventmesh.common.protocol.http.body.Body; |
| import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; |
| import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; |
| import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; |
| import org.apache.eventmesh.common.protocol.http.common.RequestCode; |
| import org.apache.eventmesh.common.protocol.http.header.Header; |
| import org.apache.eventmesh.common.utils.AssertUtils; |
| import org.apache.eventmesh.common.utils.JsonUtils; |
| import org.apache.eventmesh.runtime.common.Pair; |
| import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; |
| import org.apache.eventmesh.runtime.constants.EventMeshConstants; |
| import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; |
| import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService; |
| import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.EventProcessor; |
| import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; |
| import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; |
| import org.apache.eventmesh.runtime.util.RemotingHelper; |
| import org.apache.eventmesh.runtime.util.TraceUtils; |
| import org.apache.eventmesh.runtime.util.Utils; |
| import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; |
| |
| import org.apache.commons.collections4.MapUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.http.MethodNotSupportedException; |
| import org.apache.http.entity.ContentType; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLEngine; |
| |
| import io.netty.bootstrap.ServerBootstrap; |
| import io.netty.channel.ChannelDuplexHandler; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandler.Sharable; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.ChannelPipeline; |
| import io.netty.channel.epoll.EpollServerSocketChannel; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.channel.socket.nio.NioServerSocketChannel; |
| import io.netty.handler.codec.http.DefaultFullHttpResponse; |
| import io.netty.handler.codec.http.FullHttpRequest; |
| import io.netty.handler.codec.http.FullHttpResponse; |
| import io.netty.handler.codec.http.HttpHeaderNames; |
| import io.netty.handler.codec.http.HttpHeaderValues; |
| import io.netty.handler.codec.http.HttpHeaders; |
| import io.netty.handler.codec.http.HttpMethod; |
| import io.netty.handler.codec.http.HttpObjectAggregator; |
| import io.netty.handler.codec.http.HttpRequest; |
| import io.netty.handler.codec.http.HttpRequestDecoder; |
| import io.netty.handler.codec.http.HttpResponseEncoder; |
| import io.netty.handler.codec.http.HttpResponseStatus; |
| import io.netty.handler.codec.http.HttpVersion; |
| import io.netty.handler.codec.http.QueryStringDecoder; |
| import io.netty.handler.codec.http.multipart.Attribute; |
| import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; |
| import io.netty.handler.codec.http.multipart.DiskAttribute; |
| import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; |
| import io.netty.handler.codec.http.multipart.InterfaceHttpData; |
| import io.netty.handler.ssl.SslHandler; |
| import io.netty.handler.timeout.IdleState; |
| import io.netty.handler.timeout.IdleStateEvent; |
| import io.netty.util.ReferenceCountUtil; |
| import io.opentelemetry.api.trace.Span; |
| import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| |
| @Slf4j |
| public abstract class AbstractHTTPServer extends AbstractRemotingServer { |
| |
| private HandlerService handlerService; |
| |
| |
| private HTTPMetricsServer metrics; |
| |
| private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false); |
| |
| private final transient AtomicBoolean started = new AtomicBoolean(false); |
| |
| private final transient boolean useTLS; |
| |
| private Boolean useTrace = false; //Determine whether trace is enabled |
| |
| private final transient EventMeshHTTPConfiguration eventMeshHttpConfiguration; |
| |
| private final transient ThreadPoolExecutor asyncContextCompleteHandler = |
| ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext"); |
| |
| private static final int MAX_CONNECTIONS = 20_000; |
| |
| static { |
| DiskAttribute.deleteOnExitTemporaryFile = false; |
| } |
| |
| protected final transient Map<String/* request code */, Pair<HttpRequestProcessor, ThreadPoolExecutor>> |
| processorTable = new ConcurrentHashMap<>(64); |
| |
| protected final transient Map<String/* request uri */, Pair<EventProcessor, ThreadPoolExecutor>> |
| eventProcessorTable = new ConcurrentHashMap<>(64); |
| |
| private HttpConnectionHandler httpConnectionHandler; |
| |
| private HTTPHandler httpHandler; |
| |
| public AbstractHTTPServer(final int port, final boolean useTLS, |
| final EventMeshHTTPConfiguration eventMeshHttpConfiguration) { |
| super(); |
| this.setPort(port); |
| this.useTLS = useTLS; |
| this.eventMeshHttpConfiguration = eventMeshHttpConfiguration; |
| } |
| |
| |
| public void setUseTrace(final Boolean useTrace) { |
| this.useTrace = useTrace; |
| } |
| |
| |
| public Boolean getUseTrace() { |
| return useTrace; |
| } |
| |
| public void setHandlerService(final HandlerService handlerService) { |
| this.handlerService = handlerService; |
| } |
| |
| public HTTPMetricsServer getMetrics() { |
| return metrics; |
| } |
| |
| public void setMetrics(final HTTPMetricsServer metrics) { |
| this.metrics = metrics; |
| } |
| |
| public HandlerService getHandlerService() { |
| return handlerService; |
| } |
| |
| public void sendError(final ChannelHandlerContext ctx, final HttpResponseStatus status) { |
| final FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); |
| final HttpHeaders responseHeaders = response.headers(); |
| responseHeaders.add( |
| HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET) |
| ); |
| responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); |
| responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); |
| |
| ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); |
| } |
| |
| public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpResponse response) { |
| ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { |
| if (!f.isSuccess()) { |
| if (log.isWarnEnabled()) { |
| log.warn("send response to [{}] fail, will close this channel", |
| RemotingHelper.parseChannelRemoteAddr(f.channel())); |
| } |
| f.channel().close(); |
| } |
| }); |
| } |
| |
| @Override |
| public void start() throws Exception { |
| |
| initSharableHandlers(); |
| |
| final Runnable runnable = () -> { |
| final ServerBootstrap bootstrap = new ServerBootstrap(); |
| try { |
| bootstrap.group(this.getBossGroup(), this.getIoGroup()) |
| .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) |
| .childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null)) |
| .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); |
| |
| if (log.isInfoEnabled()) { |
| log.info("HTTPServer[port={}] started.", this.getPort()); |
| } |
| |
| bootstrap.bind(this.getPort()) |
| .channel() |
| .closeFuture() |
| .sync(); |
| } catch (Exception e) { |
| log.error("HTTPServer start error!", e); |
| try { |
| shutdown(); |
| } catch (Exception ex) { |
| log.error("HTTPServer shutdown error!", ex); |
| } |
| } |
| }; |
| |
| final Thread thread = new Thread(runnable, "EventMesh-http-server"); |
| thread.setDaemon(true); |
| thread.start(); |
| started.compareAndSet(false, true); |
| } |
| |
| @Override |
| public void shutdown() throws Exception { |
| super.shutdown(); |
| started.compareAndSet(true, false); |
| } |
| |
| public void registerProcessor(final Integer requestCode, final HttpRequestProcessor processor, |
| final ThreadPoolExecutor executor) { |
| AssertUtils.notNull(requestCode, "requestCode can't be null"); |
| AssertUtils.notNull(processor, "processor can't be null"); |
| AssertUtils.notNull(executor, "executor can't be null"); |
| this.processorTable.put(requestCode.toString(), new Pair<>(processor, executor)); |
| } |
| |
| public void registerProcessor(final String requestURI, final EventProcessor processor, |
| final ThreadPoolExecutor executor) { |
| AssertUtils.notNull(requestURI, "requestURI can't be null"); |
| AssertUtils.notNull(processor, "processor can't be null"); |
| AssertUtils.notNull(executor, "executor can't be null"); |
| this.eventProcessorTable.put(requestURI, new Pair<>(processor, executor)); |
| } |
| |
| /** |
| * Validate request, return error status. |
| * |
| * @param httpRequest |
| * @return if request is validated return null else return error status |
| */ |
| private HttpResponseStatus validateHttpRequest(final HttpRequest httpRequest) { |
| if (!started.get()) { |
| return HttpResponseStatus.SERVICE_UNAVAILABLE; |
| } |
| |
| if (!httpRequest.decoderResult().isSuccess()) { |
| return HttpResponseStatus.BAD_REQUEST; |
| } |
| |
| if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) { |
| return HttpResponseStatus.METHOD_NOT_ALLOWED; |
| } |
| |
| if (!ProtocolVersion.contains(httpRequest.headers().get(ProtocolKey.VERSION))) { |
| return HttpResponseStatus.BAD_REQUEST; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}. |
| * |
| * @param ctx |
| * @param httpRequest |
| */ |
| private void preProcessHttpRequestHeader(final ChannelHandlerContext ctx, final HttpRequest httpRequest) { |
| final long startTime = System.currentTimeMillis(); |
| final HttpHeaders requestHeaders = httpRequest.headers(); |
| |
| requestHeaders.set(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime); |
| if (StringUtils.isBlank(requestHeaders.get(ProtocolKey.VERSION))) { |
| requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); |
| } |
| requestHeaders.set(ProtocolKey.ClientInstanceKey.IP, |
| RemotingHelper.parseChannelRemoteAddr(ctx.channel())); |
| requestHeaders.set(EventMeshConstants.REQ_SEND_EVENTMESH_IP, eventMeshHttpConfiguration.getEventMeshServerIp()); |
| } |
| |
| /** |
| * Parse request body to map |
| * |
| * @param httpRequest |
| * @return |
| */ |
| private Map<String, Object> parseHttpRequestBody(final HttpRequest httpRequest) throws IOException { |
| final long bodyDecodeStart = System.currentTimeMillis(); |
| final Map<String, Object> httpRequestBody = new HashMap<>(); |
| |
| if (HttpMethod.GET.equals(httpRequest.method())) { |
| new QueryStringDecoder(httpRequest.uri()) |
| .parameters() |
| .forEach((key, value) -> httpRequestBody.put(key, value.get(0))); |
| } else if (HttpMethod.POST.equals(httpRequest.method())) { |
| final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); |
| for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) { |
| if (InterfaceHttpData.HttpDataType.Attribute == parm.getHttpDataType()) { |
| final Attribute data = (Attribute) parm; |
| httpRequestBody.put(data.getName(), data.getValue()); |
| } |
| } |
| decoder.destroy(); |
| } |
| metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); |
| return httpRequestBody; |
| } |
| |
| @Sharable |
| private class HTTPHandler extends ChannelInboundHandlerAdapter { |
| |
| /** |
| * Is called for each message of type {@link HttpRequest}. |
| * |
| * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelInboundHandlerAdapter} belongs to |
| * @param msg the message to handle |
| * @throws Exception is thrown if an error occurred |
| */ |
| @Override |
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| if (!(msg instanceof HttpRequest)) { |
| return; |
| } |
| |
| HttpRequest httpRequest = (HttpRequest) msg; |
| |
| if (Objects.nonNull(handlerService) && handlerService.isProcessorWrapper(httpRequest)) { |
| handlerService.handler(ctx, httpRequest, asyncContextCompleteHandler); |
| return; |
| } |
| |
| try { |
| |
| Span span = null; |
| preProcessHttpRequestHeader(ctx, httpRequest); |
| |
| final Map<String, Object> headerMap = Utils.parseHttpHeader(httpRequest); |
| final HttpResponseStatus errorStatus = validateHttpRequest(httpRequest); |
| if (errorStatus != null) { |
| sendError(ctx, errorStatus); |
| span = TraceUtils.prepareServerSpan(headerMap, |
| EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); |
| TraceUtils.finishSpanWithException(span, headerMap, errorStatus.reasonPhrase(), null); |
| return; |
| } |
| metrics.getSummaryMetrics().recordHTTPRequest(); |
| |
| boolean useRequestURI = false; |
| for (final String processURI : eventProcessorTable.keySet()) { |
| if (httpRequest.uri().startsWith(processURI)) { |
| useRequestURI = true; |
| break; |
| } |
| } |
| |
| if (useRequestURI) { |
| if (useTrace) { |
| span.setAttribute(SemanticAttributes.HTTP_METHOD, |
| httpRequest.method() == null ? "" : httpRequest.method().name()); |
| span.setAttribute(SemanticAttributes.HTTP_FLAVOR, |
| httpRequest.protocolVersion() == null ? "" : httpRequest.protocolVersion().protocolName()); |
| span.setAttribute(SemanticAttributes.HTTP_URL, |
| httpRequest.uri()); |
| } |
| final HttpEventWrapper httpEventWrapper = parseHttpRequest(httpRequest); |
| |
| final AsyncContext<HttpEventWrapper> asyncContext = |
| new AsyncContext<>(httpEventWrapper, null, asyncContextCompleteHandler); |
| processHttpRequest(ctx, asyncContext); |
| |
| } else { |
| final HttpCommand requestCommand = new HttpCommand(); |
| final Map<String, Object> bodyMap = parseHttpRequestBody(httpRequest); |
| |
| final String requestCode = HttpMethod.POST.equals(httpRequest.method()) |
| ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE) |
| : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), ""); |
| |
| requestCommand.setHttpMethod(httpRequest.method().name()); |
| requestCommand.setHttpVersion(httpRequest.protocolVersion() == null ? "" |
| : httpRequest.protocolVersion().protocolName()); |
| requestCommand.setRequestCode(requestCode); |
| |
| HttpCommand responseCommand = null; |
| |
| if (StringUtils.isBlank(requestCode) |
| || !processorTable.containsKey(requestCode) |
| || !RequestCode.contains(Integer.valueOf(requestCode))) { |
| responseCommand = |
| requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID); |
| sendResponse(ctx, responseCommand.httpResponse()); |
| |
| span = TraceUtils.prepareServerSpan(headerMap, |
| EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); |
| TraceUtils.finishSpanWithException(span, headerMap, |
| EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null); |
| return; |
| } |
| |
| try { |
| requestCommand.setHeader(Header.buildHeader(requestCode, headerMap)); |
| requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); |
| } catch (Exception e) { |
| responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR); |
| sendResponse(ctx, responseCommand.httpResponse()); |
| |
| span = TraceUtils.prepareServerSpan(headerMap, |
| EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); |
| TraceUtils.finishSpanWithException(span, headerMap, |
| EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e); |
| return; |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("{}", requestCommand); |
| } |
| |
| final AsyncContext<HttpCommand> asyncContext = |
| new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler); |
| processEventMeshRequest(ctx, asyncContext); |
| } |
| |
| } catch (Exception ex) { |
| log.error("AbstractHTTPServer.HTTPHandler.channelRead error", ex); |
| } finally { |
| ReferenceCountUtil.release(httpRequest); |
| } |
| } |
| |
| public void processHttpRequest(final ChannelHandlerContext ctx, |
| final AsyncContext<HttpEventWrapper> asyncContext) { |
| final HttpEventWrapper requestWrapper = asyncContext.getRequest(); |
| final String requestURI = requestWrapper.getRequestURI(); |
| String processorKey = "/"; |
| for (final String eventProcessorKey : eventProcessorTable.keySet()) { |
| if (requestURI.startsWith(eventProcessorKey)) { |
| processorKey = eventProcessorKey; |
| break; |
| } |
| } |
| |
| final Pair<EventProcessor, ThreadPoolExecutor> choosed = eventProcessorTable.get(processorKey); |
| try { |
| choosed.getObject2().submit(() -> { |
| try { |
| final EventProcessor processor = choosed.getObject1(); |
| if (processor.rejectRequest()) { |
| final HttpEventWrapper responseWrapper = |
| requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); |
| |
| asyncContext.onComplete(responseWrapper); |
| |
| if (asyncContext.isComplete()) { |
| if (log.isDebugEnabled()) { |
| log.debug("{}", asyncContext.getResponse()); |
| } |
| sendResponse(ctx, asyncContext.getResponse().httpResponse()); |
| } |
| return; |
| } |
| |
| processor.processRequest(ctx, asyncContext); |
| if (!asyncContext.isComplete()) { |
| return; |
| } |
| |
| metrics.getSummaryMetrics() |
| .recordHTTPReqResTimeCost(System.currentTimeMillis() - requestWrapper.getReqTime()); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("{}", asyncContext.getResponse()); |
| } |
| |
| sendResponse(ctx, asyncContext.getResponse().httpResponse()); |
| } catch (Exception e) { |
| log.error("process error", e); |
| } |
| }); |
| } catch (RejectedExecutionException re) { |
| final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.OVERLOAD); |
| asyncContext.onComplete(responseWrapper); |
| metrics.getSummaryMetrics().recordHTTPDiscard(); |
| metrics.getSummaryMetrics().recordHTTPReqResTimeCost( |
| System.currentTimeMillis() - requestWrapper.getReqTime()); |
| try { |
| sendResponse(ctx, asyncContext.getResponse().httpResponse()); |
| } catch (Exception e) { |
| log.error("sendResponse error", e); |
| } |
| } |
| |
| } |
| |
| public void processEventMeshRequest(final ChannelHandlerContext ctx, |
| final AsyncContext<HttpCommand> asyncContext) { |
| final HttpCommand request = asyncContext.getRequest(); |
| final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = processorTable.get(request.getRequestCode()); |
| try { |
| choosed.getObject2().submit(() -> { |
| try { |
| final HttpRequestProcessor processor = choosed.getObject1(); |
| if (processor.rejectRequest()) { |
| final HttpCommand responseCommand = |
| request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); |
| asyncContext.onComplete(responseCommand); |
| |
| if (asyncContext.isComplete()) { |
| sendResponse(ctx, responseCommand.httpResponse()); |
| if (log.isDebugEnabled()) { |
| log.debug("{}", asyncContext.getResponse()); |
| } |
| final Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap(); |
| TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(traceMap, |
| EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, |
| false), |
| traceMap, |
| EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null); |
| } |
| |
| return; |
| } |
| |
| processor.processRequest(ctx, asyncContext); |
| if (!asyncContext.isComplete()) { |
| return; |
| } |
| |
| metrics.getSummaryMetrics() |
| .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("{}", asyncContext.getResponse()); |
| } |
| |
| sendResponse(ctx, asyncContext.getResponse().httpResponse()); |
| |
| } catch (Exception e) { |
| log.error("process error", e); |
| } |
| }); |
| } catch (RejectedExecutionException re) { |
| asyncContext.onComplete(request.createHttpCommandResponse(EventMeshRetCode.OVERLOAD)); |
| metrics.getSummaryMetrics().recordHTTPDiscard(); |
| metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); |
| try { |
| sendResponse(ctx, asyncContext.getResponse().httpResponse()); |
| |
| final Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap(); |
| |
| TraceUtils.finishSpanWithException( |
| TraceUtils.prepareServerSpan(traceMap, |
| EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, |
| false), |
| traceMap, |
| EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), |
| re); |
| } catch (Exception e) { |
| log.error("processEventMeshRequest fail", re); |
| } |
| } |
| } |
| |
| @Override |
| public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { |
| super.channelReadComplete(ctx); |
| ctx.flush(); |
| } |
| |
| @Override |
| public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { |
| if (null != cause) { |
| log.error("", cause); |
| } |
| |
| if (null != ctx) { |
| ctx.close(); |
| } |
| } |
| |
| Map<String, String> extractFromRequest(final HttpRequest httpRequest) { |
| return new HashMap<>(); |
| } |
| } |
| |
| private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws IOException, MethodNotSupportedException { |
| final HttpEventWrapper httpEventWrapper = new HttpEventWrapper(); |
| httpEventWrapper.setHttpMethod(httpRequest.method().name()); |
| httpEventWrapper.setHttpVersion(httpRequest.protocolVersion().protocolName()); |
| httpEventWrapper.setRequestURI(httpRequest.uri()); |
| |
| //parse http header |
| for (final String key : httpRequest.headers().names()) { |
| httpEventWrapper.getHeaderMap().put(key, httpRequest.headers().get(key)); |
| } |
| |
| final long bodyDecodeStart = System.currentTimeMillis(); |
| |
| final FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequest; |
| final Map<String, Object> bodyMap = new HashMap<>(); |
| |
| if (HttpMethod.GET.equals(fullHttpRequest.method())) { |
| new QueryStringDecoder(fullHttpRequest.uri()).parameters().forEach((key, value) -> bodyMap.put(key, value.get(0))); |
| } else if (HttpMethod.POST.equals(fullHttpRequest.method())) { |
| if (StringUtils.contains(httpRequest.headers().get("Content-Type"), |
| ContentType.APPLICATION_JSON.getMimeType())) { |
| final int length = fullHttpRequest.content().readableBytes(); |
| if (length > 0) { |
| final byte[] body = new byte[length]; |
| fullHttpRequest.content().readBytes(body); |
| bodyMap.putAll(Objects.requireNonNull(JsonUtils.parseTypeReferenceObject(new String(body, Constants.DEFAULT_CHARSET), |
| new TypeReference<Map<String, Object>>() { |
| }))); |
| } |
| } else { |
| final HttpPostRequestDecoder decoder = |
| new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); |
| for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) { |
| if (InterfaceHttpData.HttpDataType.Attribute == parm.getHttpDataType()) { |
| final Attribute data = (Attribute) parm; |
| bodyMap.put(data.getName(), data.getValue()); |
| } |
| } |
| decoder.destroy(); |
| } |
| |
| } else { |
| throw new MethodNotSupportedException("UnSupported Method " + fullHttpRequest.method()); |
| } |
| |
| httpEventWrapper.setBody(Objects.requireNonNull(JsonUtils.toJSONString(bodyMap)).getBytes(StandardCharsets.UTF_8)); |
| |
| metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); |
| |
| return httpEventWrapper; |
| } |
| |
| private void initSharableHandlers() { |
| httpConnectionHandler = new HttpConnectionHandler(); |
| httpHandler = new HTTPHandler(); |
| } |
| |
| @Sharable |
| private class HttpConnectionHandler extends ChannelDuplexHandler { |
| |
| public final transient AtomicInteger connections = new AtomicInteger(0); |
| |
| @Override |
| public void channelActive(final ChannelHandlerContext ctx) throws Exception { |
| if (connections.incrementAndGet() > MAX_CONNECTIONS) { |
| if (log.isWarnEnabled()) { |
| log.warn("client|http|channelActive|remoteAddress={}|msg=too many client({}) connect this eventMesh server", |
| RemotingHelper.parseChannelRemoteAddr(ctx.channel()), MAX_CONNECTIONS); |
| } |
| ctx.close(); |
| return; |
| } |
| super.channelActive(ctx); |
| } |
| |
| @Override |
| public void channelInactive(final ChannelHandlerContext ctx) throws Exception { |
| connections.decrementAndGet(); |
| super.channelInactive(ctx); |
| } |
| |
| @Override |
| public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) { |
| if (evt instanceof IdleStateEvent) { |
| final IdleStateEvent event = (IdleStateEvent) evt; |
| if (event.state().equals(IdleState.ALL_IDLE)) { |
| final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); |
| if (log.isInfoEnabled()) { |
| log.info("client|http|userEventTriggered|remoteAddress={}|msg={}", |
| remoteAddress, evt.getClass().getName()); |
| } |
| ctx.close(); |
| } |
| } |
| |
| ctx.fireUserEventTriggered(evt); |
| } |
| } |
| |
| private class HttpsServerInitializer extends ChannelInitializer<SocketChannel> { |
| |
| private final transient SSLContext sslContext; |
| |
| public HttpsServerInitializer(final SSLContext sslContext) { |
| this.sslContext = sslContext; |
| } |
| |
| @Override |
| protected void initChannel(final SocketChannel channel) { |
| final ChannelPipeline pipeline = channel.pipeline(); |
| |
| if (sslContext != null && useTLS) { |
| final SSLEngine sslEngine = sslContext.createSSLEngine(); |
| sslEngine.setUseClientMode(false); |
| pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine)); |
| } |
| |
| pipeline.addLast(getWorkerGroup(), |
| new HttpRequestDecoder(), |
| new HttpResponseEncoder(), |
| httpConnectionHandler, |
| new HttpObjectAggregator(Integer.MAX_VALUE), |
| httpHandler); |
| } |
| } |
| |
| } |