blob: b0c02b8bc134f8222c1293a4226bd162819d4093 [file] [log] [blame]
package org.apache.cassandra.sidecar.routes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.file.FileProps;
import io.vertx.core.file.FileSystem;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.models.HttpResponse;
import org.apache.cassandra.sidecar.utils.FileStreamer;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
/**
* Handler for sending out files.
*/
public class FileStreamHandler implements Handler<RoutingContext>
{
private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamHandler.class);
public static final String FILE_PATH_CONTEXT_KEY = "fileToTransfer";
private final FileStreamer fileStreamer;
@Inject
public FileStreamHandler(final FileStreamer fileStreamer)
{
this.fileStreamer = fileStreamer;
}
@Override
public void handle(RoutingContext context)
{
final String localFile = context.get(FILE_PATH_CONTEXT_KEY);
final HttpServerRequest request = context.request();
final String host = extractHostAddressWithoutPort(request.host());
LOGGER.debug("FileStreamHandler handle file transfer '{}' for client: {}. Instance: {}", localFile,
request.remoteAddress(), host);
FileSystem fs = context.vertx().fileSystem();
fs.exists(localFile)
.compose(exists -> ensureValidFile(fs, localFile, exists))
.compose(fileProps -> fileStreamer.stream(new HttpResponse(request, context.response()), localFile,
fileProps.size(), request.getHeader(HttpHeaderNames.RANGE)))
.onSuccess(v -> LOGGER.debug("Completed streaming file '{}'", localFile))
.onFailure(context::fail);
}
/**
* Ensures that the file exists and is a non-empty regular file
*
* @param fs The underlying filesystem
* @param localFile The path the file in the filesystem
* @param exists Whether the file exists or not
* @return a succeeded future with the {@link FileProps}, or a failed future if the file does not exist;
* is not a regular file; or if the file is empty
*/
private Future<FileProps> ensureValidFile(FileSystem fs, String localFile, Boolean exists)
{
if (!exists)
{
LOGGER.error("The requested file '{}' does not exist", localFile);
return Future.failedFuture(new HttpException(NOT_FOUND.code(), "The requested file does not exist"));
}
return fs.props(localFile)
.compose(fileProps ->
{
if (fileProps == null || !fileProps.isRegularFile())
{
// File is not a regular file
LOGGER.error("The requested file '{}' does not exist", localFile);
return Future.failedFuture(new HttpException(NOT_FOUND.code(),
"The requested file does not exist"));
}
if (fileProps.size() <= 0)
{
LOGGER.error("The requested file '{}' has 0 size", localFile);
return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code(),
"The requested file is empty"));
}
return Future.succeededFuture(fileProps);
});
}
}