blob: a4a6145e5f5e5bec7541e1aa9d0728931bf33b4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.loadBalancer.test
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpecLike
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.FSM
import akka.actor.FSM.CurrentState
import akka.actor.FSM.SubscribeTransitionCallBack
import akka.actor.FSM.Transition
import akka.pattern.ask
import akka.testkit.ImplicitSender
import akka.testkit.TestFSMRef
import akka.testkit.TestKit
import akka.testkit.TestProbe
import akka.util.Timeout
import common.{LoggedFunction, StreamLogging}
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.common.{InvokerHealth, InvokerState, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.connector.PingMessage
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.loadBalancer.ActivationRequest
import org.apache.openwhisk.core.loadBalancer.GetStatus
import org.apache.openwhisk.core.loadBalancer.InvocationFinishedResult
import org.apache.openwhisk.core.loadBalancer.InvocationFinishedMessage
import org.apache.openwhisk.core.loadBalancer.InvokerActor
import org.apache.openwhisk.core.loadBalancer.InvokerPool
import org.apache.openwhisk.utils.retry
import org.apache.openwhisk.core.connector.test.TestConnector
import org.apache.openwhisk.core.entity.ControllerInstanceId
@RunWith(classOf[JUnitRunner])
class InvokerSupervisionTests
extends TestKit(ActorSystem("InvokerSupervision"))
with ImplicitSender
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with MockFactory
with StreamLogging {
val config = new WhiskConfig(ExecManifest.requiredProperties)
val defaultUserMemory: ByteSize = 1024.MB
ExecManifest.initialize(config)
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}
implicit val timeout = Timeout(5.seconds)
/** Imitates a StateTimeout in the FSM */
def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
/** Queries all invokers for their state */
def allStates(pool: ActorRef) =
Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InvokerInstanceId, InvokerState)]], timeout.duration)
/** Helper to generate a list of (InstanceId, InvokerState) */
def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
case (state, index) => new InvokerHealth(InvokerInstanceId(index, userMemory = defaultUserMemory), state)
}
val pC = new TestConnector("pingFeedTtest", 4, false) {}
behavior of "InvokerPool"
it should "successfully create invokers in its pool on ping and keep track of statechanges" in {
val invoker5 = TestProbe()
val invoker2 = TestProbe()
val invoker5Instance = InvokerInstanceId(5, userMemory = defaultUserMemory)
val invoker2Instance = InvokerInstanceId(2, userMemory = defaultUserMemory)
val children = mutable.Queue(invoker5.ref, invoker2.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
within(timeout.duration) {
// create first invoker
val ping0 = PingMessage(invoker5Instance)
supervisor ! ping0
invoker5.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
invoker5.expectMsg(ping0)
invoker5.send(supervisor, CurrentState(invoker5.ref, Healthy))
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Offline, Offline, Offline, Healthy))
// create second invoker
val ping1 = PingMessage(invoker2Instance)
supervisor ! ping1
invoker2.expectMsgType[SubscribeTransitionCallBack]
invoker2.expectMsg(ping1)
invoker2.send(supervisor, CurrentState(invoker2.ref, Healthy))
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Healthy, Offline, Offline, Healthy))
// ping the first invoker again
supervisor ! ping0
invoker5.expectMsg(ping0)
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Healthy, Offline, Offline, Healthy))
// one invoker goes offline
invoker2.send(supervisor, Transition(invoker2.ref, Healthy, Offline))
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Offline, Offline, Offline, Healthy))
}
}
it should "forward the ActivationResult to the appropriate invoker" in {
val invoker = TestProbe()
val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val invokerName = s"invoker${invokerInstance.toInt}"
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
within(timeout.duration) {
// Create one invoker
val ping0 = PingMessage(invokerInstance)
supervisor ! ping0
invoker.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
invoker.expectMsg(ping0)
invoker.send(supervisor, CurrentState(invoker.ref, Healthy))
allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Healthy))
// Send message and expect receive in invoker
val msg = InvocationFinishedMessage(invokerInstance, InvocationFinishedResult.Success)
supervisor ! msg
invoker.expectMsg(msg)
}
}
it should "forward an ActivationMessage to the sendActivation-Method" in {
val invoker = TestProbe()
val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val invokerName = s"invoker${invokerInstance.toInt}"
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
val sendActivationToInvoker = LoggedFunction { (a: ActivationMessage, b: InvokerInstanceId) =>
Future.successful(new RecordMetadata(new TopicPartition(invokerName, 0), 0L, 0L, 0L, Long.box(0L), 0, 0))
}
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
// Send ActivationMessage to InvokerPool
val uuid = UUID()
val activationMessage = ActivationMessage(
transid = TransactionId.invokerHealth,
action = FullyQualifiedEntityName(EntityPath("whisk.system/utils"), EntityName("date")),
revision = DocRevision.empty,
user = Identity(
Subject("unhealthyInvokerCheck"),
Namespace(EntityName("unhealthyInvokerCheck"), uuid),
BasicAuthenticationAuthKey(uuid, Secret())),
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = ControllerInstanceId("0"),
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
val msg = ActivationRequest(activationMessage, invokerInstance)
supervisor ! msg
// Verify, that MessageProducer will receive a call to send the message
retry(sendActivationToInvoker.calls should have size 1, N = 3, waitBeforeRetry = Some(500.milliseconds))
}
behavior of "InvokerActor"
it should "start and stay unhealthy while min threshold is not met" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
(1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Unhealthy
}
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy
}
it should "revert to unhealthy during initial startup if there is a failed test activation" in {
assume(InvokerActor.bufferErrorTolerance >= 3)
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}
// unHealthy -> offline
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
val pool = TestProbe()
val invoker =
pool.system.actorOf(
InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
timeout(invoker)
pool.expectMsg(Transition(invoker, Unhealthy, Offline))
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
// unhealthy -> healthy -> unhealthy -> healthy
it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
val pool = TestProbe()
val invoker =
pool.system.actorOf(
InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
(1 to InvokerActor.bufferSize).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
// Fill buffer with errors
(1 to InvokerActor.bufferSize).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
}
pool.expectMsg(Transition(invoker, Healthy, Unhealthy))
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
}
}
// unhealthy -> healthy -> overloaded -> healthy
it should "goto healthy again, if overloaded and error buffer has enough successful invocations" in {
val pool = TestProbe()
val invoker =
pool.system.actorOf(
InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
(1 to InvokerActor.bufferSize).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
// Fill buffer with timeouts
(1 to InvokerActor.bufferSize).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Timeout)
}
pool.expectMsg(Transition(invoker, Healthy, Unresponsive))
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unresponsive, Healthy))
}
}
// unhealthy -> offline
// offline -> unhealthy
it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
val pool = TestProbe()
val invoker =
pool.system.actorOf(
InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
timeout(invoker)
pool.expectMsg(Transition(invoker, Unhealthy, Offline))
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
invoker.isTimerActive(InvokerActor.timerName) shouldBe true
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
invoker.stateName shouldBe Healthy
invoker.isTimerActive(InvokerActor.timerName) shouldBe false
}
it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
within(timeout.duration) {
val ping = PingMessage(invokerInstance)
supervisor ! ping
invoker0.expectMsgType[SubscribeTransitionCallBack]
invoker0.expectMsg(ping)
allStates(supervisor) shouldBe IndexedSeq(new InvokerHealth(invokerInstance, Offline))
}
}
it should "update the invoker instance id after it was restarted" in {
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
val invokerAfterRestart =
InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-zyx"), userMemory = defaultUserMemory)
within(timeout.duration) {
val ping = PingMessage(invokerInstance)
supervisor ! ping
invoker0.expectMsgType[SubscribeTransitionCallBack]
invoker0.expectMsg(ping)
invoker0.send(supervisor, CurrentState(invoker0.ref, Unhealthy))
val newPing = PingMessage(invokerAfterRestart)
supervisor ! newPing
allStates(supervisor) shouldBe IndexedSeq(new InvokerHealth(invokerAfterRestart, Unhealthy))
}
}
}