package whisk.core.database.test
import java.util.concurrent.TimeoutException
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionCounter
import whisk.common.TransactionId
import whisk.core.database.ArtifactStore
import whisk.core.database.CouchDbRestClient
import whisk.core.database.DocumentFactory
import whisk.core.database.NoDocumentException
import whisk.core.entity.DocId
import whisk.core.entity.DocInfo
import whisk.core.entity.EntityPath
import whisk.core.entity.Identity
import whisk.core.entity.WhiskDocument
import whisk.core.entity.WhiskEntityQueries
import whisk.core.entity.types.AuthStore
import whisk.core.entity.types.EntityStore
import whisk.core.entity.AuthKey
* WARNING: the put/get/del operations in this trait operate directly on the datastore,
* and in the presence of a cache, there will be inconsistencies if one mixes these
* operations with those that flow through the cache. To mitigate this, use unique asset
* names in tests, and defer all cleanup to the end of a test suite.
trait DbUtils extends TransactionCounter {
implicit val dbOpTimeout = 15 seconds
val docsToDelete = ListBuffer[(ArtifactStore[_], DocInfo)]()
case class RetryOp() extends Throwable
* Retry an operation 'step()' awaiting its result up to 'timeout'.
* Attempt the operation up to 'count' times. The future from the
* step is not aborted --- TODO fix this.
def retry[T](step: () => Future[T], timeout: Duration, count: Int = 5): Try[T] = {
val future = step()
if (count > 0) try {
val result = Await.result(future, timeout)
} catch {
case n: NoDocumentException =>
println("no document exception, retrying")
retry(step, timeout, count - 1)
case RetryOp() =>
println("condition not met, retrying")
retry(step, timeout, count - 1)
case t: TimeoutException =>
println("timed out, retrying")
retry(step, timeout, count - 1)
case t: Throwable =>
println(s"unexpected failure $t")
else Failure(new NoDocumentException("timed out"))
* Wait on a view to update with documents added to namespace. This uses retry above,
* where the step performs a direct db query to retrieve the view and check the count
* matches the given value.
def waitOnView[Au](db: ArtifactStore[Au], namespace: EntityPath, count: Int)(
implicit context: ExecutionContext, transid: TransactionId, timeout: Duration) = {
val success = retry(() => {
val startKey = List(namespace.toString)
val endKey = List(namespace.toString, WhiskEntityQueries.TOP)
db.query(WhiskEntityQueries.viewname(WhiskEntityQueries.ALL), startKey, endKey, 0, 0, false, true, false) map { l =>
if (l.length != count) {
throw RetryOp()
} else true
}, timeout)
assert(success.isSuccess, "wait aborted")
* Wait on a view specific to a collection to update with documents added to that collection in namespace.
* This uses retry above, where the step performs a collection-specific view query using the collection
* factory. The result count from the view is checked against the given value.
def waitOnView(db: EntityStore, factory: WhiskEntityQueries[_], namespace: EntityPath, count: Int)(
implicit context: ExecutionContext, transid: TransactionId, timeout: Duration) = {
val success = retry(() => {
factory.listCollectionInNamespace(db, namespace, 0, 0) map { l =>
if (l.left.get.length < count) {
throw RetryOp()
} else true
}, timeout)
assert(success.isSuccess, "wait aborted")
* Wait on view for the authentication table. This is like the other waitOnViews but
* specific to the WhiskAuth records.
def waitOnView(db: AuthStore, authkey: AuthKey, count: Int)(
implicit context: ExecutionContext, transid: TransactionId, timeout: Duration) = {
val success = retry(() => {
Identity.list(db, authkey) map { l =>
if (l.length != count) {
throw RetryOp()
} else true
}, timeout)
assert(success.isSuccess, "wait aborted after: " + timeout + ": " + success)
* Wait on view using the CouchDbRestClient. This is like the other waitOnViews.
def waitOnView(db: CouchDbRestClient, designDocName: String, viewName: String, count: Int)(
implicit context: ExecutionContext, timeout: Duration) = {
val success = retry(() => {
db.executeView(designDocName, viewName)().map {
case Right(doc) =>
val length = doc.fields("rows").convertTo[List[JsObject]].length
if (length != count) {
throw RetryOp()
} else true
case Left(_) =>
throw RetryOp()
}, timeout)
assert(success.isSuccess, "wait aborted after: " + timeout + ": " + success)
* Puts document 'w' in datastore, and add it to gc queue to delete after the test completes.
def put[A, Au >: A](db: ArtifactStore[Au], w: A, garbageCollect: Boolean = true)(
implicit transid: TransactionId, timeout: Duration = 10 seconds): DocInfo = {
val docFuture = db.put(w)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
if (garbageCollect) docsToDelete += ((db, doc))
* Gets document by id from datastore, and add it to gc queue to delete after the test completes.
def get[A, Au >: A](db: ArtifactStore[Au], docid: DocId, factory: DocumentFactory[A], garbageCollect: Boolean = true)(
implicit transid: TransactionId, timeout: Duration = 10 seconds, ma: Manifest[A]): A = {
val docFuture = factory.get(db, docid)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
if (garbageCollect) docsToDelete += ((db, docid.asDocInfo))
* Deletes document by id from datastore.
def del[A <: WhiskDocument, Au >: A](db: ArtifactStore[Au], docid: DocId, factory: DocumentFactory[A])(
implicit transid: TransactionId, timeout: Duration = 10 seconds, ma: Manifest[A]) = {
val docFuture = factory.get(db, docid)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
Await.result(db.del(doc.docinfo), timeout)
* Puts a document 'entity' into the datastore, then do a get to retrieve it and confirm the identity.
def putGetCheck[A, Au >: A](db: ArtifactStore[Au], entity: A, factory: DocumentFactory[A], gc: Boolean = true)(
implicit transid: TransactionId, timeout: Duration = 10 seconds, ma: Manifest[A]): (DocInfo, A) = {
val doc = put(db, entity, gc)
assert(doc != null && != null && doc.rev.asString != null)
val future = factory.get(db,, doc.rev)
val dbEntity = Await.result(future, timeout)
assert(dbEntity != null)
assert(dbEntity == entity)
(doc, dbEntity)
* Deletes all documents added to gc queue.
def cleanup()(implicit timeout: Duration = 10 seconds) = { { e => Try(Await.result(e._1.del(e._2)(TransactionId.testing), timeout)) }