blob: 35eaf68d70a81f35b7e3567e306f3ed857b289c8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package whisk.common
import java.time.Instant
import scala.collection.mutable.Buffer
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import common.StreamLogging
import common.WskActorSystem
class SchedulerTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
val timeBetweenCalls = 50 milliseconds
val callsToProduce = 5
val schedulerSlack = 100 milliseconds
* Calculates the duration between two consecutive elements
* @param times the points in time to calculate the difference between
* @return duration between each element of the given sequence
def calculateDifferences(times: Seq[Instant]) = {
times sliding (2) map {
case Seq(a, b) => Duration.fromNanos(java.time.Duration.between(a, b).toNanos)
} toList
* Waits for the calls to be scheduled and executed. Adds one call-interval as additional slack
def waitForCalls(calls: Int = callsToProduce, interval: FiniteDuration = timeBetweenCalls) =
Thread.sleep((calls + 1) * interval toMillis)
behavior of "A WaitAtLeast Scheduler"
ignore should "be killable by sending it a poison pill" in {
var callCount = 0
val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
callCount += 1
Future successful true
// This is equal to a scheduled ! PoisonPill
val shutdownTimeout = 10.seconds
Await.result(akka.pattern.gracefulStop(scheduled, shutdownTimeout, PoisonPill), shutdownTimeout)
val countAfterKill = callCount
callCount should be >= callsToProduce
callCount shouldBe countAfterKill
it should "throw an expection when passed a negative duration" in {
an[IllegalArgumentException] should be thrownBy Scheduler.scheduleWaitAtLeast(-100 milliseconds) { () =>
it should "wait at least the given interval between scheduled calls" in {
val calls = Buffer[Instant]()
val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
calls +=
Future successful true
scheduled ! PoisonPill
val differences = calculateDifferences(calls)
withClue(s"expecting all $differences to be >= $timeBetweenCalls") {
differences.forall(_ >= timeBetweenCalls)
it should "stop the scheduler if an uncaught exception is thrown by the passed closure" in {
var callCount = 0
val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
callCount += 1
throw new Exception
callCount shouldBe 1
it should "log scheduler halt message with tid" in {
implicit val transid = TransactionId.testing
val msg = "test threw an exception"
val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
throw new Exception(msg)
stream.toString.split(" ").drop(1).mkString(" ") shouldBe {
s"[ERROR] [$transid] [Scheduler] halted because $msg\n"
it should "not stop the scheduler if the future from the closure is failed" in {
var callCount = 0
val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
callCount += 1
Future failed new Exception
scheduled ! PoisonPill
callCount shouldBe callsToProduce
"A WaitAtMost Scheduler" should "wait at most the given interval between scheduled calls" in {
val calls = Buffer[Instant]()
val timeBetweenCalls = 200 milliseconds
val computationTime = 100 milliseconds
val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls) { () =>
calls +=
akka.pattern.after(computationTime, actorSystem.scheduler)(Future.successful(true))
waitForCalls(interval = timeBetweenCalls)
scheduled ! PoisonPill
val differences = calculateDifferences(calls)
withClue(s"expecting all $differences to be <= $timeBetweenCalls") {
differences should not be 'empty
differences.forall(_ <= timeBetweenCalls + schedulerSlack)
it should "delay initial schedule by given duration" in {
val timeBetweenCalls = 200 milliseconds
val initialDelay = 1.second
var callCount = 0
val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, initialDelay) { () =>
callCount += 1
Future successful true
try {
callCount should be <= 1
Thread.sleep(2 * timeBetweenCalls.toMillis)
callCount should be > 1
} finally {
scheduled ! PoisonPill
it should "perform work immediately when requested" in {
val timeBetweenCalls = 200 milliseconds
val initialDelay = 1.second
var callCount = 0
val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, initialDelay) { () =>
callCount += 1
Future successful true
try {
Thread.sleep(2 * timeBetweenCalls.toMillis)
callCount should be(0)
scheduled ! Scheduler.WorkOnceNow
callCount should be(1)
} finally {
scheduled ! PoisonPill
it should "not wait when the closure takes longer than the interval" in {
val calls = Buffer[Instant]()
val timeBetweenCalls = 200 milliseconds
val computationTime = 300 milliseconds
val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls) { () =>
calls +=
akka.pattern.after(computationTime, actorSystem.scheduler)(Future.successful(true))
waitForCalls(interval = timeBetweenCalls)
scheduled ! PoisonPill
val differences = calculateDifferences(calls)
withClue(s"expecting all $differences to be <= $computationTime") {
differences should not be 'empty
differences.forall(_ <= computationTime + schedulerSlack)