Bind HTTP response lifecycle to request scope (#1759)
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
index d2adee5..5496d1c 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/handler/HandleContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.restful.handler;
import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -35,7 +36,9 @@
private final FullHttpRequest httpRequest;
- private final MappingContext<T> mappingContext;
+ private final FullHttpResponse httpResponse;
+
+ private MappingContext<T> mappingContext;
private Object[] args = new Object[0];
}
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java
new file mode 100644
index 0000000..bcd435d
--- /dev/null
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/ContextInitializationInboundHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.restful.pipeline;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
+import org.apache.shardingsphere.elasticjob.restful.handler.Handler;
+
+
+/**
+ * Create an instance of {@link FullHttpResponse} and initialize {@link HandleContext}.
+ */
+@Slf4j
+@Sharable
+public final class ContextInitializationInboundHandler extends ChannelInboundHandlerAdapter {
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ log.debug("{}", msg);
+ FullHttpRequest httpRequest = (FullHttpRequest) msg;
+ FullHttpResponse httpResponse = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.NOT_FOUND, ctx.alloc().buffer());
+ ctx.fireChannelRead(new HandleContext<Handler>(httpRequest, httpResponse));
+ }
+}
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
index c43d75d..42f581a 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandleMethodExecutor.java
@@ -17,16 +17,13 @@
package org.apache.shardingsphere.elasticjob.restful.pipeline;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
import org.apache.shardingsphere.elasticjob.restful.handler.Handler;
@@ -42,6 +39,7 @@
@Sharable
public final class HandleMethodExecutor extends ChannelInboundHandlerAdapter {
+ @SuppressWarnings({"unchecked", "NullableProblems"})
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
@@ -49,28 +47,28 @@
Handler handler = handleContext.getMappingContext().payload();
Object[] args = handleContext.getArgs();
Object handleResult = handler.execute(args);
- FullHttpResponse response;
+ FullHttpResponse httpResponse = handleContext.getHttpResponse();
if (null != handleResult) {
String mimeType = HttpUtil.getMimeType(handler.getProducing()).toString();
ResponseBodySerializer serializer = ResponseBodySerializerFactory.getResponseBodySerializer(mimeType);
byte[] bodyBytes = serializer.serialize(handleResult);
- response = createHttpResponse(handler.getProducing(), bodyBytes, handler.getHttpStatusCode());
+ populateHttpResponse(httpResponse, handler.getProducing(), bodyBytes, handler.getHttpStatusCode());
} else {
- response = createHttpResponse(handler.getProducing(), new byte[0], handler.getHttpStatusCode());
+ populateHttpResponse(httpResponse, handler.getProducing(), new byte[0], handler.getHttpStatusCode());
}
- ctx.writeAndFlush(response);
+ ctx.writeAndFlush(httpResponse);
} finally {
ReferenceCountUtil.release(handleContext.getHttpRequest());
}
}
- private FullHttpResponse createHttpResponse(final String producingContentType, final byte[] bodyBytes, final int statusCode) {
+ private void populateHttpResponse(final FullHttpResponse httpResponse, final String producingContentType, final byte[] bodyBytes, final int statusCode) {
HttpResponseStatus httpResponseStatus = HttpResponseStatus.valueOf(statusCode);
- FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.wrappedBuffer(bodyBytes));
- response.headers().set(HttpHeaderNames.CONTENT_TYPE, producingContentType);
- HttpUtil.setContentLength(response, bodyBytes.length);
- HttpUtil.setKeepAlive(response, true);
- return response;
+ httpResponse.setStatus(httpResponseStatus);
+ httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, producingContentType);
+ httpResponse.content().writeBytes(bodyBytes);
+ HttpUtil.setContentLength(httpResponse, httpResponse.content().readableBytes());
+ HttpUtil.setKeepAlive(httpResponse, true);
}
@Override
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
index c5a4673..0c74157 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoder.java
@@ -53,6 +53,7 @@
private final PathMatcher pathMatcher = new RegexPathMatcher();
+ @SuppressWarnings({"unchecked", "NullableProblems"})
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
index b862c91..23c1a9c 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcher.java
@@ -23,7 +23,6 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.ReferenceCountUtil;
-import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.restful.RestfulController;
import org.apache.shardingsphere.elasticjob.restful.annotation.ContextPath;
import org.apache.shardingsphere.elasticjob.restful.annotation.Mapping;
@@ -42,7 +41,6 @@
* Assemble a {@link HandleContext} with HTTP request and {@link MappingContext}, then pass it to the next in-bound handler.
*/
@Sharable
-@Slf4j
public final class HttpRequestDispatcher extends ChannelInboundHandlerAdapter {
private static final String TRAILING_SLASH = "/";
@@ -56,16 +54,17 @@
initMappingRegistry(restfulControllers);
}
+ @SuppressWarnings({"unchecked", "NullableProblems"})
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
- log.debug("{}", msg);
- FullHttpRequest request = (FullHttpRequest) msg;
+ HandleContext<Handler> handleContext = (HandleContext<Handler>) msg;
+ FullHttpRequest request = handleContext.getHttpRequest();
if (!trailingSlashSensitive) {
request.setUri(appendTrailingSlashIfAbsent(request.uri()));
}
Optional<MappingContext<Handler>> mappingContext = mappingRegistry.getMappingContext(request);
if (mappingContext.isPresent()) {
- HandleContext<Handler> handleContext = new HandleContext<>(request, mappingContext.get());
+ handleContext.setMappingContext(mappingContext.get());
ctx.fireChannelRead(handleContext);
} else {
ReferenceCountUtil.release(request);
diff --git a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
index 019e8f1..b0bd744 100644
--- a/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
+++ b/elasticjob-infra/elasticjob-restful/src/main/java/org/apache/shardingsphere/elasticjob/restful/pipeline/RestfulServiceChannelInitializer.java
@@ -29,6 +29,8 @@
*/
public final class RestfulServiceChannelInitializer extends ChannelInitializer<Channel> {
+ private final ContextInitializationInboundHandler contextInitializationInboundHandler;
+
private final HttpRequestDispatcher httpRequestDispatcher;
private final HandlerParameterDecoder handlerParameterDecoder;
@@ -38,6 +40,7 @@
private final ExceptionHandling exceptionHandling;
public RestfulServiceChannelInitializer(final NettyRestfulServiceConfiguration configuration) {
+ contextInitializationInboundHandler = new ContextInitializationInboundHandler();
httpRequestDispatcher = new HttpRequestDispatcher(configuration.getControllerInstances(), configuration.isTrailingSlashSensitive());
handlerParameterDecoder = new HandlerParameterDecoder();
handleMethodExecutor = new HandleMethodExecutor();
@@ -49,6 +52,7 @@
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(1024 * 1024));
+ pipeline.addLast("contextInitialization", contextInitializationInboundHandler);
pipeline.addLast("dispatcher", httpRequestDispatcher);
pipeline.addLast("handlerParameterDecoder", handlerParameterDecoder);
pipeline.addLast("handleMethodExecutor", handleMethodExecutor);
diff --git a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
index 8f5937a..85f6d82 100644
--- a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
+++ b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HandlerParameterDecoderTest.java
@@ -49,10 +49,11 @@
@Before
public void setUp() {
+ ContextInitializationInboundHandler contextInitializationInboundHandler = new ContextInitializationInboundHandler();
HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(Collections.singletonList(new DecoderTestController()), false);
HandlerParameterDecoder handlerParameterDecoder = new HandlerParameterDecoder();
HandleMethodExecutor handleMethodExecutor = new HandleMethodExecutor();
- channel = new EmbeddedChannel(httpRequestDispatcher, handlerParameterDecoder, handleMethodExecutor);
+ channel = new EmbeddedChannel(contextInitializationInboundHandler, httpRequestDispatcher, handlerParameterDecoder, handleMethodExecutor);
}
@Test
diff --git a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
index 12c6f48..a893325 100644
--- a/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
+++ b/elasticjob-infra/elasticjob-restful/src/test/java/org/apache/shardingsphere/elasticjob/restful/pipeline/HttpRequestDispatcherTest.java
@@ -23,8 +23,9 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
-import org.apache.shardingsphere.elasticjob.restful.handler.HandlerNotFoundException;
import org.apache.shardingsphere.elasticjob.restful.controller.JobController;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandleContext;
+import org.apache.shardingsphere.elasticjob.restful.handler.HandlerNotFoundException;
import org.junit.Test;
public final class HttpRequestDispatcherTest {
@@ -33,6 +34,6 @@
public void assertDispatcherHandlerNotFound() {
EmbeddedChannel channel = new EmbeddedChannel(new HttpRequestDispatcher(Lists.newArrayList(new JobController()), false));
FullHttpRequest fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/hello/myJob/myCron");
- channel.writeInbound(fullHttpRequest);
+ channel.writeInbound(new HandleContext<>(fullHttpRequest, null));
}
}