Adjust throttle tests to settle correctly should test fail.
diff --git a/tests/src/limits/ThrottleTests.scala b/tests/src/limits/ThrottleTests.scala
index 6babd59..fe6aabf 100644
--- a/tests/src/limits/ThrottleTests.scala
+++ b/tests/src/limits/ThrottleTests.scala
@@ -29,18 +29,18 @@
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-
import common.TestHelpers
import common.TestUtils
import common.TestUtils._
+import common.WhiskProperties
import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
-import common.WhiskProperties
+import spray.json._
+import spray.json.DefaultJsonProtocol._
import whisk.http.Messages._
+import whisk.utils.ExecutionContextFactory
@RunWith(classOf[JUnitRunner])
class ThrottleTests
@@ -51,8 +51,10 @@
with ScalaFutures
with Matchers {
- implicit val testConfig = PatienceConfig(5.minutes)
+ // use an infinite thread pool so that activations do not wait to send the activation requests
+ override implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+ implicit val testConfig = PatienceConfig(5.minutes)
implicit val wskprops = WskProps()
val wsk = new Wsk
val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
@@ -82,8 +84,12 @@
* @param results the sequence of results
* @param message the message to determine the type of throttling
*/
- def throttledActivations(results: List[RunResult], message: String) = results.count { result =>
- result.exitCode == TestUtils.THROTTLED && result.stderr.contains(message)
+ def throttledActivations(results: List[RunResult], message: String) = {
+ val count = results.count { result =>
+ result.exitCode == TestUtils.THROTTLED && result.stderr.contains(message)
+ }
+ println(s"number of throttled activations: $count out of ${results.length}")
+ count
}
/**
@@ -93,7 +99,10 @@
* @param results the sequence of results from invocations or firings
*/
def waitForActivations(results: ParSeq[RunResult]) = results.foreach { result =>
- if (result.exitCode == SUCCESS_EXIT) withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
+ println("waiting for activations to complete")
+ if (result.exitCode == SUCCESS_EXIT) {
+ withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
+ }
}
/**
@@ -116,34 +125,45 @@
def durationBetween(start: Instant, end: Instant) = Duration.fromNanos(java.time.Duration.between(start, end).toNanos)
/**
- * Invokes the the given action with the given payload as fast as it can
- * until one of the invokes is throttled. Invokes the given amount of times
- * at maximum.
+ * Invokes the given action up to 'count' times until one of the invokes is throttled.
*
- * @param name name of the action
- * @param payload payload parameter to pass
* @param count maximum invocations to make
*/
- def untilThrottled(count: Int)(run: () => RunResult) = {
+ def untilThrottled(count: Int, retries: Int = 3)(run: () => RunResult): List[RunResult] = {
val p = Promise[Unit]
+
val results = List.fill(count)(Future {
if (!p.isCompleted) {
val rr = run()
- println(s"exitCode = ${rr.exitCode} stderr = ${rr.stderr}")
- if (rr.exitCode == THROTTLED) p.trySuccess(())
+ if (rr.exitCode == THROTTLED) {
+ p.trySuccess(())
+ }
Some(rr)
} else {
+ println("already throttled, skipping additional runs")
None
}
})
+
val finished = Future.sequence(results).futureValue.flatten
- println(s"Executed ${finished.length} requests, maximum was $count")
- finished
+ // some activations may need to be retried
+ val failed = finished filter {
+ rr => rr.exitCode != SUCCESS_EXIT && rr.exitCode != THROTTLED
+ }
+
+ println(s"Executed ${finished.length} requests, maximum was $count, need to retry ${failed.length} (retries left: $retries)")
+ if (failed.isEmpty || retries <= 0) {
+ finished
+ } else {
+ finished ++ untilThrottled(failed.length, retries - 1)(run)
+ }
}
- "Throttles" should "throttle multiple invokes of one action" in withAssetCleaner(wskprops) {
+ behavior of "Throttles"
+
+ it should "throttle multiple activations of one action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
- val name = "checkThrottleAction"
+ val name = "checkPerMinuteActionThrottle"
assetHelper.withCleaner(wsk.action, name) {
(action, _) => action.create(name, defaultAction)
}
@@ -163,39 +183,47 @@
}.flatten.toList
val afterInvokes = Instant.now
- waitForActivations(results.par)
- throttledActivations(results, tooManyRequests) should be > 0
-
- val alreadyWaited = durationBetween(afterInvokes, Instant.now)
- settleThrottles(alreadyWaited)
+ try {
+ val throttledCount = throttledActivations(results, tooManyRequests)
+ throttledCount should be > 0
+ throttledCount should be <= (results.length - maximumInvokesPerMinute)
+ } finally {
+ waitForActivations(results.par)
+ val alreadyWaited = durationBetween(afterInvokes, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
}
- it should "throttle multiple invokes of one trigger" in withAssetCleaner(wskprops) {
+ it should "throttle multiple activations of one trigger" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
- val name = "checkThrottleTrigger"
+ val name = "checkPerMinuteTriggerThrottle"
assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) => trigger.create(name)
}
// invokes per minute * 2 because the current minute could advance which resets the throttle
- val results = untilThrottled(maximumInvokesPerMinute * 2 + 1) { () =>
+ val results = untilThrottled(maximumFiringsPerMinute * 2 + 1) { () =>
wsk.trigger.fire(name, Map("payload" -> "testWord".toJson), expectedExitCode = DONTCARE_EXIT)
}
val afterFirings = Instant.now
- waitForActivations(results.par)
- throttledActivations(results, tooManyRequests) should be > 0
-
- val alreadyWaited = durationBetween(afterFirings, Instant.now)
- settleThrottles(alreadyWaited)
+ try {
+ val throttledCount = throttledActivations(results, tooManyRequests)
+ throttledCount should be > 0
+ throttledCount should be <= (results.length - maximumFiringsPerMinute)
+ } finally {
+ // no need to wait for activations of triggers since they consume no resources
+ // (because there is no rule attached in this test)
+ val alreadyWaited = durationBetween(afterFirings, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
}
- it should "throttle 'concurrent' invokes of one action" in withAssetCleaner(wskprops) {
+ it should "throttle 'concurrent' activations of one action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
- val name = "checkThrottleAction"
- val timeoutAction = Some(TestUtils.getTestActionFilename("timeout.js"))
-
+ val name = "checkConcurrentActionThrottle"
assetHelper.withCleaner(wsk.action, name) {
+ val timeoutAction = Some(TestUtils.getTestActionFilename("timeout.js"))
(action, _) => action.create(name, timeoutAction)
}
@@ -229,10 +257,13 @@
println(s"$fastInvokes fast invokes (dur = ${fastInvokeDuration.toSeconds} sec) took ${fastIssueDuration.toSeconds} seconds to issue")
val combinedResults = slowResults ++ fastResults
- waitForActivations(combinedResults.par)
- throttledActivations(combinedResults, tooManyConcurrentRequests) should be > 0
-
- val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
- settleThrottles(alreadyWaited)
+ try {
+ val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests)
+ throttledCount should be > 0
+ } finally {
+ waitForActivations(combinedResults.par)
+ val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
+ settleThrottles(alreadyWaited)
+ }
}
}