blob: 4f4c8ad161c9dc708eef6ba7bd3577cdc24e364f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import org.apache.kafka.clients.producer.RecordMetadata
import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
import akka.actor.FSM.CurrentState
import akka.actor.FSM.SubscribeTransitionCallBack
import akka.actor.FSM.Transition
import akka.pattern.pipe
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.database.NoDocumentException
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.types.EntityStore
// Received events
case object GetStatus
case object Tick
// Possible answers of an activation
sealed trait InvocationFinishedResult
object InvocationFinishedResult {
// The activation could be successfully executed from the system's point of view. That includes user- and application
// errors
case object Success extends InvocationFinishedResult
// The activation could not be executed because of a system error
case object SystemError extends InvocationFinishedResult
// The active-ack did not arrive before it timed out
case object Timeout extends InvocationFinishedResult
}
case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId)
case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, result: InvocationFinishedResult)
// Sent to a monitor if the state changed
case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
// Data stored in the Invoker
final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
/**
* Actor representing a pool of invokers
*
* The InvokerPool manages a Invokers through subactors. An new Invoker
* is registered lazily by sending it a Ping event with the name of the
* Invoker. Ping events are furthermore forwarded to the respective
* Invoker for their respective State handling.
*
* Note: An Invoker that never sends an initial Ping will not be considered
* by the InvokerPool and thus might not be caught by monitoring.
*/
class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pingConsumer: MessageConsumer,
monitor: Option[ActorRef])
extends Actor {
import InvokerState._
implicit val transid: TransactionId = TransactionId.invokerHealth
implicit val logging: Logging = new AkkaLogging(context.system.log)
implicit val timeout: Timeout = Timeout(5.seconds)
implicit val ec: ExecutionContext = context.dispatcher
// State of the actor. Mutable vars with immutable collections prevents closures or messages
// from leaking the state for external mutation
var instanceToRef = immutable.Map.empty[Int, ActorRef]
var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId]
var status = IndexedSeq[InvokerHealth]()
def receive: Receive = {
case p: PingMessage =>
val invoker = instanceToRef.getOrElse(p.instance.toInt, registerInvoker(p.instance))
instanceToRef = instanceToRef.updated(p.instance.toInt, invoker)
// For the case when the invoker was restarted and got a new displayed name
val oldHealth = status(p.instance.toInt)
if (oldHealth.id != p.instance) {
status = status.updated(p.instance.toInt, new InvokerHealth(p.instance, oldHealth.status))
refToInstance = refToInstance.updated(invoker, p.instance)
}
invoker.forward(p)
case GetStatus => sender() ! status
case msg: InvocationFinishedMessage =>
// Forward message to invoker, if InvokerActor exists
instanceToRef.get(msg.invokerInstance.toInt).foreach(_.forward(msg))
case CurrentState(invoker, currentState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
status = status.updated(instance.toInt, new InvokerHealth(instance, currentState))
}
logStatus()
case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
status = status.updated(instance.toInt, new InvokerHealth(instance, newState))
}
logStatus()
// this is only used for the internal test action which enabled an invoker to become healthy again
case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender)
}
def logStatus(): Unit = {
monitor.foreach(_ ! CurrentInvokerPoolState(status))
val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
}
/** Receive Ping messages from invokers. */
val pingPollDuration: FiniteDuration = 1.second
val invokerPingFeed: ActorRef = context.system.actorOf(Props {
new MessageFeed(
"ping",
logging,
pingConsumer,
pingConsumer.maxPeek,
pingPollDuration,
processInvokerPing,
logHandoff = false)
})
def processInvokerPing(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
PingMessage.parse(raw) match {
case Success(p: PingMessage) =>
self ! p
invokerPingFeed ! MessageFeed.Processed
case Failure(t) =>
invokerPingFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw with $t")
}
}
/** Pads a list to a given length using the given function to compute entries */
def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A): IndexedSeq[A] = list ++ (list.size until n).map(f)
// Register a new invoker
def registerInvoker(instanceId: InvokerInstanceId): ActorRef = {
logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
// Grow the underlying status sequence to the size needed to contain the incoming ping. Dummy values are created
// to represent invokers, where ping messages haven't arrived yet
status = padToIndexed(
status,
instanceId.toInt + 1,
i => new InvokerHealth(InvokerInstanceId(i, userMemory = instanceId.userMemory), Offline))
status = status.updated(instanceId.toInt, new InvokerHealth(instanceId, Offline))
val ref = childFactory(context, instanceId)
ref ! SubscribeTransitionCallBack(self) // register for state change events
refToInstance = refToInstance.updated(ref, instanceId)
ref
}
}
object InvokerPool {
private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
implicit val tid: TransactionId = TransactionId.loadbalancer
implicit val ec: ExecutionContext = db.executionContext
implicit val logging: Logging = db.logging
WhiskAction
.get(db, action.docid)
.flatMap { oldAction =>
WhiskAction.put(db, action.revision(oldAction.rev), Some(oldAction))(tid, notifier = None)
}
.recover {
case _: NoDocumentException => WhiskAction.put(db, action, old = None)(tid, notifier = None)
}
.map(_ => {})
.andThen {
case Success(_) => logging.info(this, "test action for invoker health now exists")
case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
}
}
/**
* Prepares everything for the health protocol to work (i.e. creates a testaction)
*
* @param controllerInstance instance of the controller we run in
* @param entityStore store to write the action to
* @return throws an exception on failure to prepare
*/
def prepare(controllerInstance: ControllerInstanceId, entityStore: EntityStore): Unit = {
InvokerPool
.healthAction(controllerInstance)
.map {
// Await the creation of the test action; on failure, this will abort the constructor which should
// in turn abort the startup of the controller.
a =>
Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute)
}
.orElse {
throw new IllegalStateException(
"cannot create test action for invoker health because runtime manifest is not valid")
}
}
def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
pc: MessageConsumer,
m: Option[ActorRef] = None): Props = {
Props(new InvokerPool(f, p, pc, m))
}
/** A stub identity for invoking the test action. This does not need to be a valid identity. */
val healthActionIdentity: Identity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
}
/** An action to use for monitoring invoker health. */
def healthAction(i: ControllerInstanceId): Option[WhiskAction] =
ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:default").map { manifest =>
new WhiskAction(
namespace = healthActionIdentity.namespace.name.toPath,
name = EntityName(s"invokerHealthTestAction${i.asString}"),
exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None),
limits = ActionLimits(memory = MemoryLimit(MemoryLimit.MIN_MEMORY)))
}
}
/**
* Actor representing an Invoker
*
* This finite state-machine represents an Invoker in its possible
* states "Healthy" and "Offline".
*/
class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId)
extends FSM[InvokerState, InvokerInfo] {
import InvokerState._
implicit val transid: TransactionId = TransactionId.invokerHealth
implicit val logging: Logging = new AkkaLogging(context.system.log)
val name = s"invoker${invokerInstance.toInt}"
val healthyTimeout: FiniteDuration = 10.seconds
// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
}
override def receive: Receive = customReceive.orElse(super.receive)
// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
case Event(Tick, _) =>
invokeTestAction()
stay
}
// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}
/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
}
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
/** An Unresponsive invoker represents an invoker that is not responding with active acks in a timely manner */
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
/**
* A Healthy invoker is characterized by continuously getting pings.
* It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
}
/** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
}
/** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
transid.mark(
this,
LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(newState.asString),
s"$name is ${newState.asString}",
akka.event.Logging.WarningLevel)
case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
}
onTransition(healthPingingTransitionHandler(Unhealthy))
onTransition(healthPingingTransitionHandler(Unresponsive))
initialize()
/**
* Handling for active acks. This method saves the result (successful or unsuccessful)
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
*
* @param result: result of Activation
* @param buffer to be used
*/
private def handleCompletionMessage(result: InvocationFinishedResult,
buffer: RingBuffer[InvocationFinishedResult]) = {
buffer.add(result)
// If the action is successful, the Invoker is Healthy. We execute additional test actions
// immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
invokeTestAction()
}
// Stay online if the activations was successful.
// Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
stay
} else {
val entries = buffer.toList
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
// Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unhealthy)
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unresponsive)
} else {
result match {
case InvocationFinishedResult.Success =>
// Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
// during priming of the ring buffer. In case of the latter, there is at least one additional test
// action in flight which can reverse the transition later.
gotoIfNotThere(Healthy)
case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unhealthy.
gotoIfNotThere(Unhealthy)
case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unresponsive.
gotoIfNotThere(Unresponsive)
case _ =>
// At steady state, the state of the buffer superceded and we hold the current state
// until enough events have occurred to transition to a new state.
stay
}
}
}
}
/**
* Creates an activation request with the given action and sends it to the InvokerPool.
* The InvokerPool redirects it to the invoker which is represented by this InvokerActor.
*/
private def invokeTestAction() = {
InvokerPool.healthAction(controllerInstance).map { action =>
val activationMessage = ActivationMessage(
// Use the sid of the InvokerSupervisor as tid
transid = transid,
action = action.fullyQualifiedName(true),
// Use empty DocRevision to force the invoker to pull the action from db all the time
revision = DocRevision.empty,
user = InvokerPool.healthActionIdentity,
// Create a new Activation ID for this activation
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = controllerInstance,
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
context.parent ! ActivationRequest(activationMessage, invokerInstance)
}
}
/**
* Only change the state if the currentState is not the newState.
*
* @param newState of the InvokerActor
*/
private def gotoIfNotThere(newState: InvokerState) = {
if (stateName == newState) stay() else goto(newState)
}
}
object InvokerActor {
def props(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId) =
Props(new InvokerActor(invokerInstance, controllerInstance))
val bufferSize = 10
val bufferErrorTolerance = 3
val timerName = "testActionTimer"
}