blob: 271635ca22af1acdc1e88091035544441e5a623d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database.memory
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentType
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.{ByteString, ByteStringBuilder}
import org.apache.openwhisk.common.LoggingMarkers.{
DATABASE_ATTS_DELETE,
DATABASE_ATT_DELETE,
DATABASE_ATT_GET,
DATABASE_ATT_SAVE
}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.database.StoreUtils._
import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity.DocId
import scala.collection.concurrent.TrieMap
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
object MemoryAttachmentStoreProvider extends AttachmentStoreProvider {
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging): AttachmentStore =
new MemoryAttachmentStore(implicitly[ClassTag[D]].runtimeClass.getSimpleName.toLowerCase)
}
/**
* Basic in-memory AttachmentStore implementation. Useful for testing.
*/
class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem, logging: Logging) extends AttachmentStore {
override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher
private case class Attachment(bytes: ByteString)
private val attachments = new TrieMap[String, Attachment]
private var closed = false
override val scheme = "mems"
override protected[core] def attach(
docId: DocId,
name: String,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
require(name != null, "name undefined")
val start =
transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)
val f = docStream.runWith(combinedSink(uploadSink))
val g = f.map { r =>
attachments += (attachmentKey(docId, name) -> Attachment(r.uploadResult.result().compact))
transid
.finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$docId'")
AttachResult(r.digest, r.length)
}
reportFailure(
g,
start,
failure =>
s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}
/**
* Retrieves a saved attachment, streaming it into the provided Sink.
*/
override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
val start =
transid.started(
this,
DATABASE_ATT_GET,
s"[ATT_GET] '$dbName' finding attachment '$name' of document 'id: $docId'")
val f = attachments.get(attachmentKey(docId, name)) match {
case Some(Attachment(bytes)) =>
val r = Source.single(bytes).toMat(sink)(Keep.right).run
r.map(t => {
transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$docId'")
t
})
case None =>
transid.finished(
this,
start,
s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$docId'; not found.")
Future.failed(NoDocumentException("Not found on 'readAttachment'."))
}
reportFailure(
f,
start,
failure => s"[ATT_GET] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
}
override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start = transid.started(this, DATABASE_ATTS_DELETE, s"[ATTS_DELETE] uploading attachment of document '$docId'")
val prefix = docId + "/"
attachments --= attachments.keySet.filter(_.startsWith(prefix))
transid.finished(this, start, s"[ATTS_DELETE] completed: delete attachment of document '$docId'")
Future.successful(true)
}
override protected[core] def deleteAttachment(docId: DocId, name: String)(
implicit transid: TransactionId): Future[Boolean] = {
val start = transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] uploading attachment of document '$docId'")
attachments.remove(attachmentKey(docId, name))
transid.finished(this, start, s"[ATT_DELETE] completed: delete attachment of document '$docId'")
Future.successful(true)
}
def attachmentCount: Int = attachments.size
def isClosed = closed
override def shutdown(): Unit = {
closed = true
}
private def attachmentKey(docId: DocId, name: String) = s"${docId.id}/$name"
}