Replace InMemoryCache with new impl that avoids read under write races
Addresses some points in #1260.
This was PR #1302.
diff --git a/tests/src/whisk/core/database/test/CacheConcurrencyTests.scala b/tests/src/whisk/core/database/test/CacheConcurrencyTests.scala
new file mode 100644
index 0000000..5fa26ce
--- /dev/null
+++ b/tests/src/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import scala.util.Try
+
+
+
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.TestUtils
+import common.Wsk
+import common.WskProps
+import whisk.common.SimpleExec
+import whisk.common.TransactionId
+
+@RunWith(classOf[JUnitRunner])
+class CacheConcurrencyTests extends FlatSpec
+ with BeforeAndAfterAll
+ with Matchers {
+
+ private val wsk = new Wsk
+
+ implicit private val logger = this
+ implicit private val transId = TransactionId.testing
+ implicit private val wp = WskProps()
+
+ val nExternalIters = 10
+ val nInternalIters = 20
+
+ for (i <- 1 to nExternalIters)
+ "the cache" should s"support concurrent CRUD without bogus residual cache entries, iter ${i}" in {
+ try {
+ val scriptPath = TestUtils.getTestActionFilename("CacheConcurrencyTests.sh")
+ val actionFile = TestUtils.getTestActionFilename("empty.js")
+ val fullCmd = Seq(scriptPath, Wsk.baseCommand.mkString, actionFile, nInternalIters.toString(), "--auth", wp.authKey) ++ wp.overrides
+
+ val (stdout, stderr, exitCode) = SimpleExec.syncRunCmd(fullCmd)
+
+ if (!stdout.isEmpty) {
+ println(stdout)
+ }
+ if (!stderr.isEmpty) {
+ println(this, stderr)
+ }
+
+ exitCode should be(0)
+
+ } finally {
+ // clean up
+ {
+ for (i <- 1 to nInternalIters)
+ yield Try { wsk.action.delete(s"testy${i}") }
+ }.forall(_.isSuccess)
+ }
+ }
+}
diff --git a/tests/src/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
new file mode 100644
index 0000000..402b15a
--- /dev/null
+++ b/tests/src/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.FiniteDuration
+import scala.language.postfixOps
+import scala.util.Failure
+import scala.util.Success
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+import common.WskActorSystem
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.database.MultipleReadersSingleWriterCache
+
+class MultipleReadersSingleWriterCacheTests(nIters: Int = 3) extends FlatSpec
+ with Matchers
+ with MultipleReadersSingleWriterCache[String, String]
+ with WskActorSystem
+ with Logging {
+
+ "the cache" should "support simple CRUD" in {
+ val inhibits = doReadWriteRead("foo").go(0 seconds)
+ inhibits.debug(this)
+
+ inhibits.nReadInhibits.get should be(0)
+ cacheSize should be(1)
+ }
+
+ "the cache" should "support concurrent CRUD to different keys" in {
+ //
+ // for the first iter, all reads are not-cached and each thread
+ // requests a different key, so we expect no read inhibits, and a
+ // bunch of write inhibits
+ //
+ val inhibits = doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
+ inhibits.nReadInhibits.get should be(0)
+ inhibits.nWriteInhibits.get should not be (0)
+
+ //
+ // after the first iter, the keys already exist, so the first read
+ // should be cached, resulting in the writes proceeding more
+ // smoothly this time, thus inhibiting some of the second reads
+ //
+ for (i <- 1 to nIters - 1) {
+ doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
+ .nReadInhibits.get should not be (0)
+ }
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys" in {
+ for (i <- 1 to nIters) {
+ doCRUD("CONCURRENT CRUD to shared keys", sharedKeys)
+ .nWriteInhibits.get should not be (0)
+ }
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys (zero latency)" in {
+ var hasInhibits = false
+ for (i <- 1 to nIters) {
+ hasInhibits = doCRUD("concurrent CRUD to shared keys (zero latency)", sharedKeys, 0 seconds)
+ .hasInhibits
+ }
+ hasInhibits should not be (false)
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys (short latency)" in {
+ for (i <- 1 to nIters) {
+ doCRUD("concurrent CRUD to shared keys (short latency)", sharedKeys, 10 milliseconds)
+ .hasInhibits should be(true)
+ }
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys (medium latency)" in {
+ for (i <- 1 to nIters) {
+ doCRUD("concurrent CRUD to shared keys (medium latency)", sharedKeys, 100 milliseconds)
+ .hasInhibits should be(true)
+ }
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys (long latency)" in {
+ for (i <- 1 to nIters) {
+ doCRUD("CONCURRENT CRUD to shared keys (long latency)", sharedKeys, 5 seconds)
+ .nWriteInhibits.get should not be (0)
+ }
+ }
+
+ "the cache" should "support concurrent CRUD to shared keys, with update first" in {
+ for (i <- 1 to nIters) {
+ doCRUD("CONCURRENT CRUD to shared keys, with update first", sharedKeys, 1 second, false)
+ .nWriteInhibits.get should be(0)
+ }
+ }
+
+ def sharedKeys = { i: Int => "foop_" + (i % 2) }
+
+ def doCRUD(
+ testName: String,
+ key: Int => String,
+ delay: FiniteDuration = 1 second,
+ readsFirst: Boolean = true,
+ nThreads: Int = 10): Inhibits = {
+
+ System.out.println(testName);
+
+ val exec = Executors.newFixedThreadPool(nThreads)
+ val inhibits = Inhibits()
+
+ for (i <- 1 to nThreads) {
+ exec.submit(new Runnable { def run() = { doReadWriteRead(key(i), inhibits, readsFirst).go(delay) } })
+ }
+
+ exec.shutdown
+ exec.awaitTermination(2, TimeUnit.MINUTES)
+
+ inhibits.debug(this)
+ inhibits
+ }
+
+ case class Inhibits(
+ nReadInhibits: AtomicInteger = new AtomicInteger(0),
+ nWriteInhibits: AtomicInteger = new AtomicInteger(0)) {
+
+ def debug(from: AnyRef) = {
+ logger.debug(from, "InhibitedReads: " + nReadInhibits);
+ logger.debug(from, "InhibitedWrites: " + nWriteInhibits);
+ }
+
+ def hasInhibits: Boolean = { nReadInhibits.get > 0 || nWriteInhibits.get > 0 }
+ }
+
+ private case class doReadWriteRead(key: String, inhibits: Inhibits = Inhibits(), readFirst: Boolean = true) {
+ def go(implicit delay: FiniteDuration): Inhibits = {
+ val latch = new CountDownLatch(2)
+
+ implicit val transId = TransactionId.testing
+
+ if (!readFirst) {
+ // we want to do the update before the first read
+ cacheUpdate(key, key, delayed("bar_b")) onFailure {
+ case t =>
+ inhibits.nWriteInhibits.incrementAndGet();
+ }
+ }
+
+ cacheLookup(key, delayed("bar"), true) onComplete {
+ case Success(s) => {
+ latch.countDown()
+ }
+ case Failure(t) => {
+ latch.countDown()
+ inhibits.nReadInhibits.incrementAndGet();
+ }
+ }
+
+ if (readFirst) {
+ // we did the read before the update, so do the write next
+ cacheUpdate(key, key, delayed("bar_b")) onFailure {
+ case t =>
+ inhibits.nWriteInhibits.incrementAndGet();
+ }
+ }
+
+ cacheLookup(key, delayed("bar_c"), true) onComplete {
+ case Success(s) => {
+ latch.countDown();
+ }
+ case Failure(t) => {
+ inhibits.nReadInhibits.incrementAndGet();
+ latch.countDown();
+ }
+ }
+
+ latch.await(2, TimeUnit.MINUTES)
+
+ inhibits
+ }
+ }
+
+ private def delayed[W](v: W)(implicit delay: FiniteDuration): Future[W] = {
+ akka.pattern.after(duration = delay, using = actorSystem.scheduler)(
+ Future.successful { v })
+ }
+
+ /** we are using cache keys, so the update key is just the string itself */
+ override protected def cacheKeyForUpdate(w: String): String = (w)
+
+ implicit private val logger = this
+}
diff --git a/tests/src/whisk/core/entity/test/MigrationEntities.scala b/tests/src/whisk/core/entity/test/MigrationEntities.scala
index 7bf020d..934ece0 100644
--- a/tests/src/whisk/core/entity/test/MigrationEntities.scala
+++ b/tests/src/whisk/core/entity/test/MigrationEntities.scala
@@ -52,6 +52,7 @@
override val collectionName = "rules"
override implicit val serdes = jsonFormat8(OldWhiskRule.apply)
+ override def cacheKeyForUpdate(t: OldWhiskRule) = t.docid.asDocInfo
}
/**
@@ -79,4 +80,5 @@
override val collectionName = "triggers"
override implicit val serdes = jsonFormat7(OldWhiskTrigger.apply)
+ override def cacheKeyForUpdate(t: OldWhiskTrigger) = t.docid.asDocInfo
}