blob: 969da0448fe0918131d5666a6d0369917b5607e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.jarstore
import java.io.File
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.Materializer
import akka.stream.scaladsl.{StreamConverters, FileIO}
import akka.util.ByteString
/**
* FileDirective is a set of Akka-http directive to upload/download
* huge binary files to/from Akka-Http server.
*/
object FileDirective {
// Form field name
type Name = String
val CHUNK_SIZE = 262144
/**
* File information after a file is uploaded to server.
*
* @param originFileName original file name when user upload it in browser.
* @param file file name after the file is saved to server.
* @param length the length of the file
*/
case class FileInfo(originFileName: String, file: File, length: Long)
class Form(val fields: Map[Name, FormField]) {
def getFileInfo(fieldName: String): Option[FileInfo] = {
fields.get(fieldName).flatMap {
case Left(file) => Option(file)
case Right(_) => None
}
}
def getValue(fieldName: String): Option[String] = {
fields.get(fieldName).flatMap {
case Left(_) => None
case Right(value) => Option(value)
}
}
}
type FormField = Either[FileInfo, String]
/**
* Store the uploaded files to temporary directory, and return a Map from form field name
* to FileInfo.
*/
def uploadFile: Directive1[Form] = {
Directive[Tuple1[Form]] { inner =>
extractMaterializer {implicit mat =>
extractExecutionContext {implicit ec =>
uploadFileImpl(mat, ec) { formFuture =>
ctx => {
formFuture.map(form => inner(Tuple1(form))).flatMap(route => route(ctx))
}
}
}
}
}
}
/**
* Store the uploaded files to JarStore, and return a Map from form field name
* to FilePath in JatStore.
*/
def uploadFileTo(jarStore: JarStore): Directive1[Map[Name, FilePath]] = {
Directive[Tuple1[Map[Name, FilePath]]] { inner =>
extractMaterializer {implicit mat =>
extractExecutionContext {implicit ec =>
uploadFileImpl(jarStore)(mat, ec) { filesFuture =>
ctx => {
filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
}
}
}
}
}
}
// Downloads file from server
def downloadFileFrom(jarStore: JarStore, filePath: String): Route = {
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
StreamConverters.fromInputStream(
() => jarStore.getFile(filePath), CHUNK_SIZE
))
complete(responseEntity)
}
private def uploadFileImpl(jarStore: JarStore)
(implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Map[Name, FilePath]]] = {
Directive[Tuple1[Future[Map[Name, FilePath]]]] { inner =>
entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
val fileNameMap = formdata.parts.mapAsync(1) { p =>
if (p.filename.isDefined) {
val path = Instant.now().toEpochMilli + p.filename.get
val sink = StreamConverters.fromOutputStream(() => jarStore.createFile(path),
autoFlush = true)
p.entity.dataBytes.runWith(sink).map(written =>
if (written.count > 0) {
Map(p.name -> FilePath(path))
} else {
Map.empty[Name, FilePath]
})
} else {
Future(Map.empty[Name, FilePath])
}
}.runFold(Map.empty[Name, FilePath])((set, value) => set ++ value)
inner(Tuple1(fileNameMap))
}
}
}
private def uploadFileImpl(implicit mat: Materializer, ec: ExecutionContext)
: Directive1[Future[Form]] = {
Directive[Tuple1[Future[Form]]] { inner =>
entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
val form = formdata.parts.mapAsync(1) { p =>
if (p.filename.isDefined) {
val targetPath = File.createTempFile(s"userfile_${p.name}_",
s"${p.filename.getOrElse("")}")
val writtenFuture = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
writtenFuture.map(written =>
if (written.count > 0) {
Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
} else {
Map.empty[Name, FormField]
})
} else {
val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
total ++ input
}
valueFuture.map{value =>
Map(p.name -> Right(value.utf8String))
}
}
}.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
new Form(set.fields ++ value)
}
inner(Tuple1(form))
}
}
}
}