* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.openwhisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import org.apache.kafka.clients.producer.RecordMetadata
import{Actor, ActorRef, ActorRefFactory, FSM, Props}
import akka.pattern.pipe
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.database.NoDocumentException
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.types.EntityStore
// Received events
case object GetStatus
case object Tick
// States an Invoker can be in
sealed trait InvokerState {
val asString: String
val isUsable: Boolean
object InvokerState {
// Invokers in this state can be used to schedule workload to
sealed trait Usable extends InvokerState { val isUsable = true }
// No workload should be scheduled to invokers in this state
sealed trait Unusable extends InvokerState { val isUsable = false }
// A completely healthy invoker, pings arriving fine, no system errors
case object Healthy extends Usable { val asString = "up" }
// Pings are arriving fine, the invoker returns system errors though
case object Unhealthy extends Unusable { val asString = "unhealthy" }
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
case object Unresponsive extends Unusable { val asString = "unresponsive" }
// Pings are not arriving for this invoker
case object Offline extends Unusable { val asString = "down" }
// Possible answers of an activation
sealed trait InvocationFinishedResult
object InvocationFinishedResult {
// The activation could be successfully executed from the system's point of view. That includes user- and application
// errors
case object Success extends InvocationFinishedResult
// The activation could not be executed because of a system error
case object SystemError extends InvocationFinishedResult
// The active-ack did not arrive before it timed out
case object Timeout extends InvocationFinishedResult
case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId)
case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, result: InvocationFinishedResult)
// Sent to a monitor if the state changed
case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
// Data stored in the Invoker
final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
* Actor representing a pool of invokers
* The InvokerPool manages a Invokers through subactors. An new Invoker
* is registered lazily by sending it a Ping event with the name of the
* Invoker. Ping events are furthermore forwarded to the respective
* Invoker for their respective State handling.
* Note: An Invoker that never sends an initial Ping will not be considered
* by the InvokerPool and thus might not be caught by monitoring.
class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pingConsumer: MessageConsumer,
monitor: Option[ActorRef])
extends Actor {
import InvokerState._
implicit val transid: TransactionId = TransactionId.invokerHealth
implicit val logging: Logging = new AkkaLogging(context.system.log)
implicit val timeout: Timeout = Timeout(5.seconds)
implicit val ec: ExecutionContext = context.dispatcher
// State of the actor. Mutable vars with immutable collections prevents closures or messages
// from leaking the state for external mutation
var instanceToRef = immutable.Map.empty[Int, ActorRef]
var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId]
var status = IndexedSeq[InvokerHealth]()
def receive: Receive = {
case p: PingMessage =>
val invoker = instanceToRef.getOrElse(p.instance.toInt, registerInvoker(p.instance))
instanceToRef = instanceToRef.updated(p.instance.toInt, invoker)
// For the case when the invoker was restarted and got a new displayed name
val oldHealth = status(p.instance.toInt)
if ( != p.instance) {
status = status.updated(p.instance.toInt, new InvokerHealth(p.instance, oldHealth.status))
refToInstance = refToInstance.updated(invoker, p.instance)
case GetStatus => sender() ! status
case msg: InvocationFinishedMessage =>
// Forward message to invoker, if InvokerActor exists
case CurrentState(invoker, currentState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
status = status.updated(instance.toInt, new InvokerHealth(instance, currentState))
case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
status = status.updated(instance.toInt, new InvokerHealth(instance, newState))
// this is only used for the internal test action which enabled an invoker to become healthy again
case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender)
def logStatus(): Unit = {
monitor.foreach(_ ! CurrentInvokerPoolState(status))
val pretty = => s"${} -> ${i.status}"), s"invoker status changed to ${pretty.mkString(", ")}")
/** Receive Ping messages from invokers. */
val pingPollDuration: FiniteDuration = 1.second
val invokerPingFeed: ActorRef = context.system.actorOf(Props {
new MessageFeed(
logHandoff = false)
def processInvokerPing(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
PingMessage.parse(raw) match {
case Success(p: PingMessage) =>
self ! p
invokerPingFeed ! MessageFeed.Processed
case Failure(t) =>
invokerPingFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw with $t")
/** Pads a list to a given length using the given function to compute entries */
def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A): IndexedSeq[A] = list ++ (list.size until n).map(f)
// Register a new invoker
def registerInvoker(instanceId: InvokerInstanceId): ActorRef = {, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
// Grow the underlying status sequence to the size needed to contain the incoming ping. Dummy values are created
// to represent invokers, where ping messages haven't arrived yet
status = padToIndexed(
instanceId.toInt + 1,
i => new InvokerHealth(InvokerInstanceId(i, userMemory = instanceId.userMemory), Offline))
status = status.updated(instanceId.toInt, new InvokerHealth(instanceId, Offline))
val ref = childFactory(context, instanceId)
ref ! SubscribeTransitionCallBack(self) // register for state change events
refToInstance = refToInstance.updated(ref, instanceId)
object InvokerPool {
private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
implicit val tid: TransactionId = TransactionId.loadbalancer
implicit val ec: ExecutionContext = db.executionContext
implicit val logging: Logging = db.logging
.get(db, action.docid)
.flatMap { oldAction =>
WhiskAction.put(db, action.revision(oldAction.rev), Some(oldAction))(tid, notifier = None)
.recover {
case _: NoDocumentException => WhiskAction.put(db, action, old = None)(tid, notifier = None)
.map(_ => {})
.andThen {
case Success(_) =>, "test action for invoker health now exists")
case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
* Prepares everything for the health protocol to work (i.e. creates a testaction)
* @param controllerInstance instance of the controller we run in
* @param entityStore store to write the action to
* @return throws an exception on failure to prepare
def prepare(controllerInstance: ControllerInstanceId, entityStore: EntityStore): Unit = {
.map {
// Await the creation of the test action; on failure, this will abort the constructor which should
// in turn abort the startup of the controller.
a =>
Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
.orElse {
throw new IllegalStateException(
"cannot create test action for invoker health because runtime manifest is not valid")
def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pc: MessageConsumer,
m: Option[ActorRef] = None): Props = {
Props(new InvokerPool(f, p, pc, m))
/** A stub identity for invoking the test action. This does not need to be a valid identity. */
val healthActionIdentity: Identity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
/** An action to use for monitoring invoker health. */
def healthAction(i: ControllerInstanceId): Option[WhiskAction] =
ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:default").map { manifest =>
new WhiskAction(
namespace =,
name = EntityName(s"invokerHealthTestAction${i.asString}"),
exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None),
limits = ActionLimits(memory = MemoryLimit(MemoryLimit.MIN_MEMORY)))
* Actor representing an Invoker
* This finite state-machine represents an Invoker in its possible
* states "Healthy" and "Offline".
class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId)
extends FSM[InvokerState, InvokerInfo] {
import InvokerState._
implicit val transid: TransactionId = TransactionId.invokerHealth
implicit val logging: Logging = new AkkaLogging(context.system.log)
val name = s"invoker${invokerInstance.toInt}"
val healthyTimeout: FiniteDuration = 10.seconds
// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
override def receive: Receive = customReceive.orElse(super.receive)
// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
case Event(Tick, _) =>
// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
/** An Unresponsive invoker represents an invoker that is not responding with active acks in a timely manner */
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
* A Healthy invoker is characterized by continuously getting pings.
* It will go offline if that state is not confirmed for 20 seconds.
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
/** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
/** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
s"$name is ${newState.asString}",
case _ -> newState if newState.isUsable =>, s"$name is ${newState.asString}")
* Handling for active acks. This method saves the result (successful or unsuccessful)
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
* @param result: result of Activation
* @param buffer to be used
private def handleCompletionMessage(result: InvocationFinishedResult,
buffer: RingBuffer[InvocationFinishedResult]) = {
// If the action is successful, the Invoker is Healthy. We execute additional test actions
// immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
// Stay online if the activations was successful.
// Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
} else {
val entries = buffer.toList
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
// Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
} else {
result match {
case InvocationFinishedResult.Success =>
// Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
// during priming of the ring buffer. In case of the latter, there is at least one additional test
// action in flight which can reverse the transition later.
case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unhealthy.
case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unresponsive.
case _ =>
// At steady state, the state of the buffer superceded and we hold the current state
// until enough events have occurred to transition to a new state.
* Creates an activation request with the given action and sends it to the InvokerPool.
* The InvokerPool redirects it to the invoker which is represented by this InvokerActor.
private def invokeTestAction() = {
InvokerPool.healthAction(controllerInstance).map { action =>
val activationMessage = ActivationMessage(
// Use the sid of the InvokerSupervisor as tid
transid = transid,
action = action.fullyQualifiedName(true),
// Use empty DocRevision to force the invoker to pull the action from db all the time
revision = DocRevision.empty,
user = InvokerPool.healthActionIdentity,
// Create a new Activation ID for this activation
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = controllerInstance,
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
context.parent ! ActivationRequest(activationMessage, invokerInstance)
* Only change the state if the currentState is not the newState.
* @param newState of the InvokerActor
private def gotoIfNotThere(newState: InvokerState) = {
if (stateName == newState) stay() else goto(newState)
object InvokerActor {
def props(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId) =
Props(new InvokerActor(invokerInstance, controllerInstance))
val bufferSize = 10
val bufferErrorTolerance = 3
val timerName = "testActionTimer"