blob: 771a8afe4c523a8abbdec589bab363d9c7651740 [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.core.loadBalancer.test
import akka.testkit.TestFSMRef
import scala.concurrent.duration._
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import whisk.core.loadBalancer.InvokerActor
import akka.pattern.ask
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.Healthy
import whisk.core.loadBalancer.GetStatus
import akka.util.Timeout
import scala.concurrent.Await
import whisk.core.loadBalancer.InvokerInfo
import akka.testkit.TestActorRef
import whisk.core.loadBalancer.InvokerPool
import akka.actor.FSM
import akka.actor.ActorRef
import common.WskActorSystem
import whisk.core.connector.PingMessage
@RunWith(classOf[JUnitRunner])
class InvokerSupervisionTests extends FlatSpec with Matchers with WskActorSystem {
implicit val timeout = Timeout(5.seconds)
/** Imitates a StateTimeout in the FSM */
def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
/** Queries all invokers for their state */
def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[Map[String, InvokerInfo]], 5.seconds)
behavior of "InvokerPool"
it should "successfully create invokers in its pool on ping" in {
val supervisor = TestActorRef(new InvokerPool(_ => ()))
// Create one invoker
supervisor ! PingMessage("invoker0")
supervisor.children should have size 1
// Get the status of that invoker
allStates(supervisor) shouldBe Map("invoker0" -> Healthy)
// Create another invoker
supervisor ! PingMessage("invoker1")
supervisor.children should have size 2
// Shouldn't create another invoker if ping with the same name is sent
supervisor ! PingMessage("invoker0")
supervisor.children should have size 2
}
it should "forward a ping to the appropriate invoker, calling the provided callback accordingly" in {
var callbackCalled = 0
val supervisor = TestActorRef(new InvokerPool(_ => callbackCalled = callbackCalled + 1))
// Create two invokers
supervisor ! PingMessage("invoker0")
supervisor ! PingMessage("invoker1")
supervisor.children should have size 2
// Check that both invokers are healthy
allStates(supervisor) shouldBe Map("invoker0" -> Healthy, "invoker1" -> Healthy)
// Switch off one of the invokers
timeout(supervisor.getSingleChild("invoker0"))
allStates(supervisor) shouldBe Map("invoker0" -> Offline, "invoker1" -> Healthy)
// Ping that invoker to bring it back up
supervisor ! PingMessage("invoker0")
allStates(supervisor) shouldBe Map("invoker0" -> Healthy, "invoker1" -> Healthy)
callbackCalled shouldBe 1
}
behavior of "InvokerActor"
it should "start healthy, go offline if the state times out and goes healthy on a successful ping again" in {
val invoker = TestFSMRef(new InvokerActor)
invoker.stateName shouldBe Healthy
// Turn the invoker offline
timeout(invoker)
invoker.stateName shouldBe Offline
// Ping it to bring it back up
invoker ! PingMessage("testinvoker")
invoker.stateName shouldBe Healthy
}
}