Bind HTTP response lifecycle to request scope (#1759)

diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
index d2adee5..5496d1c 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.restful.handler;
 
 import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
@@ -35,7 +36,9 @@
     
     private final FullHttpRequest httpRequest;
     
-    private final MappingContext<T> mappingContext;
+    private final FullHttpResponse httpResponse;
+    
+    private MappingContext<T> mappingContext;
     
     private Object[] args = new Object[0];
 }
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java
new file mode 100644
index 0000000..bcd435d
--- /dev/null
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.restful.pipeline;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
+import org.apache.shardingsphere.elasticjob.restful.handler.Handler;
+
+
+/**
+ * Create an instance of {@link FullHttpResponse} and initialize {@link HandleContext}.
+ */
+@Slf4j
+@Sharable
+public final class ContextInitializationInboundHandler extends ChannelInboundHandlerAdapter {
+    
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        log.debug("{}", msg);
+        FullHttpRequest httpRequest = (FullHttpRequest) msg;
+        FullHttpResponse httpResponse = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.NOT_FOUND, ctx.alloc().buffer());
+        ctx.fireChannelRead(new HandleContext<Handler>(httpRequest, httpResponse));
+    }
+}
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
index c43d75d..42f581a 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
@@ -17,16 +17,13 @@
 
 package org.apache.shardingsphere.elasticjob.restful.pipeline;
 
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
 import org.apache.shardingsphere.elasticjob.restful.handler.Handler;
@@ -42,6 +39,7 @@
 @Sharable
 public final class HandleMethodExecutor extends ChannelInboundHandlerAdapter {
     
+    @SuppressWarnings({"unchecked", "NullableProblems"})
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
         HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
@@ -49,28 +47,28 @@
             Handler handler = handleContext.getMappingContext().payload();
             Object[] args = handleContext.getArgs();
             Object handleResult = handler.execute(args);
-            FullHttpResponse response;
+            FullHttpResponse httpResponse = handleContext.getHttpResponse();
             if (null != handleResult) {
                 String mimeType = HttpUtil.getMimeType(handler.getProducing()).toString();
                 ResponseBodySerializer serializer = ResponseBodySerializerFactory.getResponseBodySerializer(mimeType);
                 byte[] bodyBytes = serializer.serialize(handleResult);
-                response = createHttpResponse(handler.getProducing(), bodyBytes, handler.getHttpStatusCode());
+                populateHttpResponse(httpResponse, handler.getProducing(), bodyBytes, handler.getHttpStatusCode());
             } else {
-                response = createHttpResponse(handler.getProducing(), new byte[0], handler.getHttpStatusCode());
+                populateHttpResponse(httpResponse, handler.getProducing(), new byte[0], handler.getHttpStatusCode());
             }
-            ctx.writeAndFlush(response);
+            ctx.writeAndFlush(httpResponse);
         } finally {
             ReferenceCountUtil.release(handleContext.getHttpRequest());
         }
     }
     
-    private FullHttpResponse createHttpResponse(final String producingContentType, final byte[] bodyBytes, final int statusCode) {
+    private void populateHttpResponse(final FullHttpResponse httpResponse, final String producingContentType, final byte[] bodyBytes, final int statusCode) {
         HttpResponseStatus httpResponseStatus = HttpResponseStatus.valueOf(statusCode);
-        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.wrappedBuffer(bodyBytes));
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, producingContentType);
-        HttpUtil.setContentLength(response, bodyBytes.length);
-        HttpUtil.setKeepAlive(response, true);
-        return response;
+        httpResponse.setStatus(httpResponseStatus);
+        httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, producingContentType);
+        httpResponse.content().writeBytes(bodyBytes);
+        HttpUtil.setContentLength(httpResponse, httpResponse.content().readableBytes());
+        HttpUtil.setKeepAlive(httpResponse, true);
     }
     
     @Override
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
index c5a4673..0c74157 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
@@ -53,6 +53,7 @@
     
     private final PathMatcher pathMatcher = new RegexPathMatcher();
     
+    @SuppressWarnings({"unchecked", "NullableProblems"})
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
index b862c91..23c1a9c 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
@@ -23,7 +23,6 @@
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.util.ReferenceCountUtil;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.restful.RestfulController;
 import org.apache.shardingsphere.elasticjob.restful.annotation.ContextPath;
 import org.apache.shardingsphere.elasticjob.restful.annotation.Mapping;
@@ -42,7 +41,6 @@
  * Assemble a {@link HandleContext} with HTTP request and {@link MappingContext}, then pass it to the next in-bound handler.
  */
 @Sharable
-@Slf4j
 public final class HttpRequestDispatcher extends ChannelInboundHandlerAdapter {
     
     private static final String TRAILING_SLASH = "/";
@@ -56,16 +54,17 @@
         initMappingRegistry(restfulControllers);
     }
     
+    @SuppressWarnings({"unchecked", "NullableProblems"})
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
-        log.debug("{}", msg);
-        FullHttpRequest request = (FullHttpRequest) msg;
+        HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
+        FullHttpRequest request = handleContext.getHttpRequest();
         if (!trailingSlashSensitive) {
             request.setUri(appendTrailingSlashIfAbsent(request.uri()));
         }
         Optional<MappingContext<Handler>> mappingContext = mappingRegistry.getMappingContext(request);
         if (mappingContext.isPresent()) {
-            HandleContext<Handler> handleContext = new HandleContext<>(request, mappingContext.get());
+            handleContext.setMappingContext(mappingContext.get());
             ctx.fireChannelRead(handleContext);
         } else {
             ReferenceCountUtil.release(request);
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
index 019e8f1..b0bd744 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
@@ -29,6 +29,8 @@
  */
 public final class RestfulServiceChannelInitializer extends ChannelInitializer<Channel> {
     
+    private final ContextInitializationInboundHandler contextInitializationInboundHandler;
+    
     private final HttpRequestDispatcher httpRequestDispatcher;
     
     private final HandlerParameterDecoder handlerParameterDecoder;
@@ -38,6 +40,7 @@
     private final ExceptionHandling exceptionHandling;
     
     public RestfulServiceChannelInitializer(final NettyRestfulServiceConfiguration configuration) {
+        contextInitializationInboundHandler = new ContextInitializationInboundHandler();
         httpRequestDispatcher = new HttpRequestDispatcher(configuration.getControllerInstances(), configuration.isTrailingSlashSensitive());
         handlerParameterDecoder = new HandlerParameterDecoder();
         handleMethodExecutor = new HandleMethodExecutor();
@@ -49,6 +52,7 @@
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast("codec", new HttpServerCodec());
         pipeline.addLast("aggregator", new HttpObjectAggregator(1024 * 1024));
+        pipeline.addLast("contextInitialization", contextInitializationInboundHandler);
         pipeline.addLast("dispatcher", httpRequestDispatcher);
         pipeline.addLast("handlerParameterDecoder", handlerParameterDecoder);
         pipeline.addLast("handleMethodExecutor", handleMethodExecutor);
diff --git a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
index 8f5937a..85f6d82 100644
--- a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
+++ b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
@@ -49,10 +49,11 @@
     
     @Before
     public void setUp() {
+        ContextInitializationInboundHandler contextInitializationInboundHandler = new ContextInitializationInboundHandler();
         HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(Collections.singletonList(new DecoderTestController()), false);
         HandlerParameterDecoder handlerParameterDecoder = new HandlerParameterDecoder();
         HandleMethodExecutor handleMethodExecutor = new HandleMethodExecutor();
-        channel = new EmbeddedChannel(httpRequestDispatcher, handlerParameterDecoder, handleMethodExecutor);
+        channel = new EmbeddedChannel(contextInitializationInboundHandler, httpRequestDispatcher, handlerParameterDecoder, handleMethodExecutor);
     }
     
     @Test
diff --git a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
index 12c6f48..a893325 100644
--- a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
+++ b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
@@ -23,8 +23,9 @@
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpVersion;
-import org.apache.shardingsphere.elasticjob.restful.handler.HandlerNotFoundException;
 import org.apache.shardingsphere.elasticjob.restful.controller.JobController;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandlerNotFoundException;
 import org.junit.Test;
 
 public final class HttpRequestDispatcherTest {
@@ -33,6 +34,6 @@
     public void assertDispatcherHandlerNotFound() {
         EmbeddedChannel channel = new EmbeddedChannel(new HttpRequestDispatcher(Lists.newArrayList(new JobController()), false));
         FullHttpRequest fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/hello/myJob/myCron");
-        channel.writeInbound(fullHttpRequest);
+        channel.writeInbound(new HandleContext<>(fullHttpRequest, null));
     }
 }