blob: afd1187ad720ca763c8050f8919bed5bad47e6b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database
import java.util.Base64
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ContentType, Uri}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import spray.json.DefaultJsonProtocol
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.AttachmentSupport.MemScheme
import org.apache.openwhisk.core.entity.Attachments.Attached
import org.apache.openwhisk.core.entity.{ByteSize, DocId, DocInfo, UUID}
import scala.concurrent.{ExecutionContext, Future}
object AttachmentSupport {
/**
* Scheme name for attachments which are inlined
*/
val MemScheme: String = "mem"
}
case class InliningConfig(maxInlineSize: ByteSize)
/**
* Provides support for inlining small attachments. Inlined attachment contents are encoded as part of attachment
* name itself.
*/
trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends DefaultJsonProtocol {
protected def executionContext: ExecutionContext
/**
* Attachment scheme name to use for non inlined attachments
*/
protected def attachmentScheme: String
protected def inliningConfig: InliningConfig
/**
* Attachments having size less than this would be inlined
*/
def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
/**
* See {{ ArtifactStore#put }}
*/
protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo]
/**
* Given a ByteString source it determines if the source can be inlined or not by returning an
* Either - Left(byteString) containing all the bytes from the source or Right(Source[ByteString, _])
* if the source is large
*/
protected[database] def inlineOrAttach(docStream: Source[ByteString, _],
previousPrefix: ByteString = ByteString.empty)(
implicit system: ActorSystem): Future[Either[ByteString, Source[ByteString, _]]] = {
implicit val ec = executionContext
docStream.prefixAndTail(1).runWith(Sink.head).flatMap {
case (Nil, _) =>
Future.successful(Left(previousPrefix))
case (Seq(prefix), tail) =>
val completePrefix = previousPrefix ++ prefix
if (completePrefix.size < maxInlineSize.toBytes) {
inlineOrAttach(tail, completePrefix)
} else {
Future.successful(Right(tail.prepend(Source.single(completePrefix))))
}
}
}
/**
* Constructs a URI for the attachment
*
* @param bytesOrSource either byteString or byteString source
* @param path function to generate the attachment name for non inlined case
* @return constructed uri. In case of inlined attachment the uri contains base64 encoded inlined attachment content
*/
protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
bytesOrSource match {
case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
case Right(_) => uriFrom(scheme = attachmentScheme, path = path)
}
}
//Not using Uri.from due to https://github.com/akka/akka-http/issues/2080
protected[database] def uriFrom(scheme: String, path: String): Uri = Uri(s"$scheme:$path")
/**
* Constructs a source from inlined attachment contents
*/
protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] = {
require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
Source.single(ByteString(decode(uri)))
}
protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme
/**
* Computes digest for passed bytes as hex encoded string
*/
protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
val digester = StoreUtils.emptyDigest()
digester.update(bytes.toArray)
StoreUtils.encodeDigest(digester.digest())
}
/**
* Attaches the passed source content to an {{ AttachmentStore }}
*
* @param doc document with attachment
* @param update function to update the `Attached` state with attachment metadata
* @param contentType contentType of the attachment
* @param docStream attachment source
* @param oldAttachment old attachment in case of update. Required for deleting the old attachment
* @param attachmentStore attachmentStore where attachment needs to be stored
* @return a tuple of updated document info and attachment metadata
*/
protected[database] def attachToExternalStore[A <: DocumentAbstraction](doc: A,
update: (A, Attached) => A,
contentType: ContentType,
docStream: Source[ByteString, _],
oldAttachment: Option[Attached],
attachmentStore: AttachmentStore)(
implicit transid: TransactionId,
actorSystem: ActorSystem): Future[(DocInfo, Attached)] = {
val asJson = doc.toDocumentRecord
val id = asJson.fields("_id").convertTo[String].trim
implicit val ec = executionContext
for {
bytesOrSource <- inlineOrAttach(docStream)
uri = uriOf(bytesOrSource, UUID().asString)
attached <- {
// Upload if cannot be inlined
bytesOrSource match {
case Left(bytes) =>
Future.successful(Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes))))
case Right(source) =>
attachmentStore
.attach(DocId(id), uri.path.toString, contentType, source)
.map(r => Attached(uri.toString, contentType, Some(r.length), Some(r.digest)))
}
}
i1 <- put(update(doc, attached))
//Remove old attachment if it was part of attachmentStore
_ <- oldAttachment
.map { old =>
val oldUri = Uri(old.attachmentName)
if (oldUri.scheme == attachmentStore.scheme) {
attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
} else {
Future.successful(true)
}
}
.getOrElse(Future.successful(true))
} yield (i1, attached)
}
private def encode(bytes: Seq[Byte]): String = {
Base64.getUrlEncoder.encodeToString(bytes.toArray)
}
private def decode(uri: Uri): Array[Byte] = {
Base64.getUrlDecoder.decode(uri.path.toString())
}
}