* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.openwhisk.core.database.test
import akka.http.scaladsl.model.ContentTypes
import{Sink, Source, StreamConverters}
import akka.util.{ByteString, ByteStringBuilder}
import common.{StreamLogging, WskActorSystem}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.{AttachmentStore, NoDocumentException}
import org.apache.openwhisk.core.entity.DocId
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.util.Random
trait AttachmentStoreBehaviors
extends ScalaFutures
with DbUtils
with Matchers
with StreamLogging
with WskActorSystem
with BeforeAndAfterAll {
this: FlatSpec =>
//Bring in sync the timeout used by ScalaFutures and DBUtils
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout)
protected val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
private val attachmentsToDelete = ListBuffer[String]()
def store: AttachmentStore
def storeType: String
def garbageCollectAttachments: Boolean = true
* In some cases like when CloudFront CDN is used then deletes are not immediately reflected in reads
* as the objects are still present in cache. For such cases we would relax some of the test assertions
protected def lazyDeletes: Boolean = false
behavior of s"$storeType AttachmentStore"
it should "add and read attachment" in {
implicit val tid: TransactionId = transid()
val bytes = randomBytes(16023)
val docId = newDocId()
val result = store.attach(docId, "code", ContentTypes.`application/octet-stream`, chunkedSource(bytes)).futureValue
result.length shouldBe 16023
val byteBuilder = store.readAttachment(docId, "code", byteStringSink()).futureValue
byteBuilder.result() shouldBe ByteString(bytes)
it should "add and delete attachments" in {
implicit val tid: TransactionId = transid()
val b1 = randomBytes(1000)
val b2 = randomBytes(2000)
val b3 = randomBytes(3000)
val docId = newDocId()
//create another doc with similar name to verify it is unaffected by deletes of the first docs attachments
val docId2 = DocId( + "2")
val r1 = store.attach(docId, "c1", ContentTypes.`application/octet-stream`, chunkedSource(b1)).futureValue
val r2 = store.attach(docId, "c2", ContentTypes.`application/json`, chunkedSource(b2)).futureValue
val r3 = store.attach(docId, "c3", ContentTypes.`application/json`, chunkedSource(b3)).futureValue
//create attachments for the other doc
val r21 = store.attach(docId2, "c21", ContentTypes.`application/octet-stream`, chunkedSource(b1)).futureValue
val r22 = store.attach(docId2, "c22", ContentTypes.`application/json`, chunkedSource(b2)).futureValue
r1.length shouldBe 1000
r2.length shouldBe 2000
r3.length shouldBe 3000
r21.length shouldBe 1000
r22.length shouldBe 2000
println(s"creating doc ${docId}")
attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1)
attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
attachmentBytes(docId2, "c21").futureValue.result() shouldBe ByteString(b1)
attachmentBytes(docId2, "c22").futureValue.result() shouldBe ByteString(b2)
//Delete single attachment
store.deleteAttachment(docId, "c1").futureValue shouldBe true
//Non deleted attachments related to same docId must still be accessible
println(s"read missing doc ${docId}")
if (!lazyDeletes) attachmentBytes(docId, "c1").failed.futureValue shouldBe a[NoDocumentException]
attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
//Delete all attachments
store.deleteAttachments(docId).futureValue shouldBe true
if (!lazyDeletes) attachmentBytes(docId, "c2").failed.futureValue shouldBe a[NoDocumentException]
if (!lazyDeletes) attachmentBytes(docId, "c3").failed.futureValue shouldBe a[NoDocumentException]
//Make sure doc2 attachments are left untouched
if (!lazyDeletes) attachmentBytes(docId2, "c21").futureValue.result() shouldBe ByteString(b1)
if (!lazyDeletes) attachmentBytes(docId2, "c22").futureValue.result() shouldBe ByteString(b2)
attachmentsToDelete += docId2.asString
it should "throw NoDocumentException on reading non existing attachment" in {
implicit val tid: TransactionId = transid()
val docId = DocId("no-existing-id")
val f = store.readAttachment(docId, "code", byteStringSink())
f.failed.futureValue shouldBe a[NoDocumentException]
it should "not write an attachment when there is error in Source" in {
implicit val tid: TransactionId = transid()
val docId = newDocId()
val error = new Error("boom!")
val faultySource = Source(1 to 10)
.map { n =>
if (n == 7) throw error
val writeResult = store.attach(docId, "code", ContentTypes.`application/octet-stream`, faultySource)
writeResult.failed.futureValue.getCause should be theSameInstanceAs error
val readResult = store.readAttachment(docId, "code", byteStringSink())
readResult.failed.futureValue shouldBe a[NoDocumentException]
override def afterAll(): Unit = {
if (garbageCollectAttachments) {
implicit val tid: TransactionId = transid()
val f =
.mapAsync(2)(id => store.deleteAttachments(DocId(id)))
Await.result(f, 1.minute)
protected def garbageCollect(docId: DocId): Unit = {}
protected def newDocId(): DocId = {
//By default create an info with dummy revision
//as apart from CouchDB other stores do not support the revision property
//for blobs
counter = counter + 1
val docId = s"${prefix}_$counter"
attachmentsToDelete += docId
@volatile var counter = 0
private def attachmentBytes(id: DocId, name: String) = {
implicit val tid: TransactionId = transid()
store.readAttachment(id, name, byteStringSink())
private def chunkedSource(bytes: Array[Byte]): Source[ByteString, _] = {
StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42)
private def byteStringSink() = {
Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)