blob: a91b52088276c0e368e182cdaddba7129f50d4ce [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.core.database.test
import scala.collection.parallel._
import scala.concurrent.forkjoin.ForkJoinPool
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfter
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import common.TestUtils
import common.TestUtils._
import common.Wsk
import common.WskProps
import common.WskTestHelpers
import spray.json.JsString
import whisk.common.TransactionId
@RunWith(classOf[JUnitRunner])
class CacheConcurrencyTests extends FlatSpec
with WskTestHelpers
with BeforeAndAfter {
implicit private val transId = TransactionId.testing
implicit private val wp = WskProps()
private val wsk = new Wsk
val nExternalIters = 1
val nInternalIters = 5
val nThreads = nInternalIters * 30
val parallel = (1 to nInternalIters).par
parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
def run[W](phase: String)(block: String => W) = parallel.map { i =>
val name = s"testy${i}"
withClue(s"$phase: failed for $name") { (name, block(name)) }
}
before {
run("pre-test sanitize") {
name => wsk.action.sanitize(name)
}
}
after {
run("post-test sanitize") {
name => wsk.action.sanitize(name)
}
}
for (n <- 1 to nExternalIters)
"the cache" should s"support concurrent CRUD without bogus residual cache entries, iter ${n}" in {
val scriptPath = TestUtils.getTestActionFilename("CacheConcurrencyTests.sh")
val actionFile = TestUtils.getTestActionFilename("empty.js")
run("create") {
name => wsk.action.create(name, Some(actionFile))
}
run("update") {
name => wsk.action.create(name, None, update = true)
}
run("delete+get") {
name =>
// run 30 operations in parallel: 15 get, 1 delete, 14 more get
val para = (1 to 30).par
para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
para.map {
i =>
if (i != 16) {
val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT)
withClue(s"expecting get to either succeed or fail with not found: $rr") {
// some will succeed and some should fail with not found
rr.exitCode should (be(SUCCESS_EXIT) or be(NOT_FOUND))
}
} else {
wsk.action.delete(name)
}
}
}
run("get after delete") {
name => wsk.action.get(name, expectedExitCode = NOT_FOUND)
}
run("recreate") {
name => wsk.action.create(name, Some(actionFile))
}
run("reupdate") {
name => wsk.action.create(name, None, parameters = Map("color" -> JsString("red")), update = true)
}
run("update+get") {
name =>
// run 30 operations in parallel: 15 get, 1 update, 14 more get
val para = (1 to 30).par
para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
para.map {
i =>
if (i != 16) {
wsk.action.get(name)
} else {
wsk.action.create(name, None, parameters = Map("color" -> JsString("blue")), update = true)
}
}
}
run("get after update") {
name => wsk.action.get(name)
} map {
case (name, rr) =>
withClue(s"get after update: failed check for $name") {
rr.stdout should include("blue")
rr.stdout should not include ("red")
}
}
}
}