blob: 655eb68f3dc23e5c3597faf4f12dbc2d3d4863cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.agent.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
private final HttpTransferRequestsStore transferRequestsStore;
public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
this.transferRequestsStore = transferRequestsStore;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
try {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
if (request.method() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri().substring(request.uri().lastIndexOf("/") + 1);
logger.info("Received download request through url {}", uri);
AgentHttpDownloadData downloadData = transferRequestsStore.getDownloadRequest(uri);
if (downloadData == null) {
logger.error("Couldn't find transfer request for uri {}", uri);
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = downloadData.getConnectorConfig().getMetadata().getResourceSize();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, downloadData.getConnectorConfig().getMetadata().getFriendlyName());
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the initial line and the header.
ctx.write(response);
// Write the content.
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture;
// TODO: Support chunked streaming
if (downloadData.getIncomingStreamingConnector() == null && downloadData.getIncomingChunkedConnector() != null) {
logger.error("Chunked data download is not yes supported in Http transport");
throw new Exception("Chunked data download is not yes supported in Http transport");
}
IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
incomingStreamingConnector.init(downloadData.getConnectorConfig());
InputStream inputStream = downloadData.getChildResourcePath().equals("")?
incomingStreamingConnector.fetchInputStream() :
incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
ctx.newProgressivePromise());
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
lastContentFuture = sendFileFuture;
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
logger.debug(future.channel() + " Transfer progress: " + progress);
} else {
logger.debug(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) {
logger.info(future.channel() + " Transfer complete.");
}
});
// Decide whether to close the connection or not.
if (!HttpUtil.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
} catch (Exception e) {
logger.error("Errored while processing HTTP download request", e);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(HttpHeaderNames.LOCATION, newUri);
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void setContentTypeHeader(HttpResponse response, String path) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=\"" + path+ "\"");
}
}