blob: a700cd1e6aefb02a1735313ac3c5b8eb80418179 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.rest.netty;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.extension.rest.api.media.MediaType;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.protocol.rest.RestHeaderEnum;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* netty http response
*/
public class NettyHttpResponse implements HttpResponse {
private static final int EMPTY_CONTENT_LENGTH = 0;
private int status = 200;
private OutputStream os;
private Map<String, List<String>> outputHeaders;
private final ChannelHandlerContext ctx;
private boolean committed;
private boolean keepAlive;
private HttpMethod method;
// raw response body
private Object responseBody;
// raw response class
private Class<?> entityClass;
public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, URL url) {
this(ctx, keepAlive, null, url);
}
public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAlive, HttpMethod method, URL url) {
outputHeaders = new HashMap<>();
this.method = method;
os = new ChunkOutputStream(this, ctx, url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
this.ctx = ctx;
this.keepAlive = keepAlive;
}
public void setOutputStream(OutputStream os) {
this.os = os;
}
@Override
public int getStatus() {
return status;
}
@Override
public void setStatus(int status) {
if (status > 200) {
addOutputHeaders(RestHeaderEnum.CONTENT_TYPE.getHeader(), MediaType.TEXT_PLAIN.value);
}
this.status = status;
}
@Override
public Map<String, List<String>> getOutputHeaders() {
return outputHeaders;
}
@Override
public OutputStream getOutputStream() throws IOException {
return os;
}
@Override
public void sendError(int status) throws IOException {
sendError(status, null);
}
@Override
public void sendError(int status, String message) throws IOException {
setStatus(status);
setResponseBody(message);
if (message != null) {
getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
}
}
@Override
public boolean isCommitted() {
return committed;
}
@Override
public void reset() {
if (committed) {
throw new IllegalStateException("Messages.MESSAGES.alreadyCommitted()");
}
outputHeaders.clear();
outputHeaders.clear();
}
public boolean isKeepAlive() {
return keepAlive;
}
public DefaultHttpResponse getDefaultHttpResponse() {
DefaultHttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(getStatus()));
transformResponseHeaders(res);
return res;
}
public DefaultHttpResponse getEmptyHttpResponse() {
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(getStatus()));
if (method == null || !method.equals(HttpMethod.HEAD)) {
res.headers().add(Names.CONTENT_LENGTH, EMPTY_CONTENT_LENGTH);
}
transformResponseHeaders(res);
return res;
}
private void transformResponseHeaders(io.netty.handler.codec.http.HttpResponse res) {
transformHeaders(this, res);
}
public void prepareChunkStream() {
committed = true;
DefaultHttpResponse response = getDefaultHttpResponse();
HttpHeaders.setTransferEncodingChunked(response);
ctx.write(response);
}
public void finish() throws IOException {
if (os != null) os.flush();
ChannelFuture future;
if (isCommitted()) {
// if committed this means the output stream was used.
future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
future = ctx.writeAndFlush(getEmptyHttpResponse());
}
if (!isKeepAlive()) {
future.addListener(ChannelFutureListener.CLOSE);
}
getOutputStream().close();
}
@Override
public void flushBuffer() throws IOException {
if (os != null) os.flush();
ctx.flush();
}
@Override
public void addOutputHeaders(String name, String value) {
List<String> values = outputHeaders.get(name);
if (values == null) {
values = new ArrayList<>();
outputHeaders.put(name, values);
}
if (values.contains(value)) {
return;
}
values.add(value);
}
@SuppressWarnings({"rawtypes", "unchecked"})
public static void transformHeaders(
NettyHttpResponse nettyResponse, io.netty.handler.codec.http.HttpResponse response) {
// if (nettyResponse.isKeepAlive()) {
// response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
// } else {
// response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
// }
for (Map.Entry<String, List<String>> entry :
nettyResponse.getOutputHeaders().entrySet()) {
String key = entry.getKey();
for (String value : entry.getValue()) {
response.headers().set(key, value);
}
}
}
public Object getResponseBody() {
return responseBody;
}
public void setResponseBody(Object responseBody) {
this.responseBody = responseBody;
if (responseBody != null) {
this.entityClass = responseBody.getClass();
}
}
public Class<?> getEntityClass() {
return entityClass;
}
}