Adding tests for all throttles
Signed-off-by: Christian Bickel <cbickel@de.ibm.com>
diff --git a/tests/src/limits/ThrottleTests.scala b/tests/src/limits/ThrottleTests.scala
new file mode 100644
index 0000000..5a34489
--- /dev/null
+++ b/tests/src/limits/ThrottleTests.scala
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package limits
+
+import java.time.Instant
+
+import scala.collection.parallel.immutable.ParSeq
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import common.TestHelpers
+import common.TestUtils
+import common.TestUtils._
+import common.Wsk
+import common.WskActorSystem
+import common.WskProps
+import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import common.WhiskProperties
+
+@RunWith(classOf[JUnitRunner])
+class ThrottleTests
+ extends FlatSpec
+ with TestHelpers
+ with WskTestHelpers
+ with WskActorSystem
+ with ScalaFutures
+ with Matchers {
+
+ implicit val testConfig = PatienceConfig(5.minutes)
+
+ implicit val wskprops = WskProps()
+ val wsk = new Wsk(usePythonCLI = false)
+ val defaultAction = Some(TestUtils.getCatalogFilename("samples/hello.js"))
+
+ val throttleWindow = 1.minute
+ val maximumInvokesPerMinute = WhiskProperties.getProperty("limits.actions.invokes.perMinute").toInt
+ val maximumFiringsPerMinute = WhiskProperties.getProperty("limits.triggers.fires.perMinute").toInt
+ val maximumConcurrentInvokes = WhiskProperties.getProperty("limits.actions.invokes.concurrent").toInt
+
+ val rateMessage = "Too many requests from user"
+ val concurrencyMessage = "The user has sent too many requests in a given amount of time."
+
+ /**
+ * Extracts the number of throttled results from a sequence of <code>RunResult</code>
+ *
+ * @param results the sequence of results
+ * @param message the message to determine the type of throttling
+ */
+ def throttledActivations(results: List[RunResult], message: String) = results.count { result =>
+ result.exitCode == TestUtils.THROTTLED && result.stderr.contains(message)
+ }
+
+ /**
+ * Waits until all successful activations are finished. Used to prevent the testcases from
+ * leaking activations.
+ *
+ * @param results the sequence of results from invocations or firings
+ */
+ def waitForActivations(results: ParSeq[RunResult]) = results.foreach { result =>
+ if (result.exitCode == SUCCESS_EXIT) withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
+ }
+
+ /**
+ * Settles throttles of 1 minute. Waits up to 1 minute depending on the time already waited.
+ *
+ * @param waitedAlready the time already gone after the last invoke or fire
+ */
+ def settleThrottles(waitedAlready: FiniteDuration) = {
+ val timeToWait = (throttleWindow - waitedAlready).max(Duration.Zero)
+ println(s"Waiting for ${timeToWait.toSeconds} seconds, already waited for ${waitedAlready.toSeconds} seconds")
+ Thread.sleep(timeToWait.toMillis)
+ }
+
+ /**
+ * Calculates the <code>Duration</code> between two <code>Instant</code>
+ *
+ * @param start the Instant something started
+ * @param end the Instant something ended
+ */
+ def durationBetween(start: Instant, end: Instant) = Duration.fromNanos(java.time.Duration.between(start, end).toNanos)
+
+ /**
+ * Invokes the the given action with the given payload as fast as it can
+ * until one of the invokes is throttled. Invokes the given amount of times
+ * at maximum.
+ *
+ * @param name name of the action
+ * @param payload payload parameter to pass
+ * @param count maximum invocations to make
+ */
+ def untilThrottled(count: Int)(run: () => RunResult) = {
+ val p = Promise[Unit]
+ val results = List.fill(count)(Future {
+ if (!p.isCompleted) {
+ val rr = run()
+ if (rr.exitCode == THROTTLED) p.trySuccess(())
+ Some(rr)
+ } else {
+ None
+ }
+ })
+ val finished = Future.sequence(results).futureValue.flatten
+ println(s"Executed ${finished.length} requests, maximum was $count")
+ finished
+ }
+
+ "Throttles" should "throttle multiple invokes of one action" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val name = "checkThrottleAction"
+ assetHelper.withCleaner(wsk.action, name) {
+ (action, _) => action.create(name, defaultAction)
+ }
+
+ val results = untilThrottled(maximumInvokesPerMinute * 2 + 1) { () =>
+ wsk.action.invoke(name, Map("payload" -> "testWord".toJson), expectedExitCode = DONTCARE_EXIT)
+ }
+ val afterInvokes = Instant.now
+
+ waitForActivations(results.par)
+ throttledActivations(results, rateMessage) should be > 0
+
+ val alreadyWaited = durationBetween(afterInvokes, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
+
+ it should "throttle multiple invokes of one trigger" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val name = "checkThrottleTrigger"
+ assetHelper.withCleaner(wsk.trigger, name) {
+ (trigger, _) => trigger.create(name)
+ }
+
+ // invokes per minute * 2 because the current minute could advance which resets the throttle
+ val results = untilThrottled(maximumInvokesPerMinute * 2 + 1) { () =>
+ wsk.trigger.fire(name, Map("payload" -> "testWord".toJson), expectedExitCode = DONTCARE_EXIT)
+ }
+ val afterFirings = Instant.now
+
+ waitForActivations(results.par)
+ throttledActivations(results, rateMessage) should be > 0
+
+ val alreadyWaited = durationBetween(afterFirings, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
+
+ it should "throttle 'concurrent' invokes of one action" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val name = "checkThrottleAction"
+ val timeoutAction = Some(TestUtils.getTestActionFilename("timeout.js"))
+
+ assetHelper.withCleaner(wsk.action, name) {
+ (action, _) => action.create(name, timeoutAction)
+ }
+
+ val slowInvokes = maximumConcurrentInvokes * 0.4
+ val fastInvokes = maximumConcurrentInvokes * 0.6 + 10
+
+ // Keep queue from draining with these
+ val slowResults = untilThrottled(slowInvokes.toInt) { () =>
+ wsk.action.invoke(name, Map("payload" -> 10.seconds.toMillis.toJson), expectedExitCode = DONTCARE_EXIT)
+ }
+
+ // Create queue length quickly, drain fast
+ val fastResults = untilThrottled(fastInvokes.toInt) { () =>
+ wsk.action.invoke(name, Map("payload" -> 10.milliseconds.toMillis.toJson), expectedExitCode = DONTCARE_EXIT)
+ }
+
+ // Sleep 3 seconds to let the background thread get the newest values
+ Thread.sleep(3.seconds.toMillis)
+
+ val endResults = untilThrottled(10) { () =>
+ wsk.action.invoke(name, Map("payload" -> 10.milliseconds.toMillis.toJson), expectedExitCode = DONTCARE_EXIT)
+ }
+ val afterInvokes = Instant.now
+
+ val combinedResults = slowResults ++ fastResults ++ endResults
+ waitForActivations(combinedResults.par)
+ throttledActivations(combinedResults, concurrencyMessage) should be > 0
+
+ val alreadyWaited = durationBetween(afterInvokes, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
+
+}