blob: 54bd174189f141194e94f443f92951eca200d24f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.v2
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.ContainerRemoved
import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
import org.apache.openwhisk.core.entitlement.Privilege
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.types.EntityStore
import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
import org.apache.openwhisk.core.service.UpdateDataOnChange
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class InvokerHealthManager(instanceId: InvokerInstanceId,
healthContainerProxyFactory: (ActorRefFactory, ActorRef) => ActorRef,
dataManagementService: ActorRef,
entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging)
extends FSM[InvokerState, InvokerHealthData]
with Stash {
implicit val requestTimeout = Timeout(5.seconds)
implicit val ec: ExecutionContext = actorSystem.dispatcher
implicit val transid: TransactionId = TransactionId.invokerHealth
private[containerpool] var healthActionProxy: Option[ActorRef] = None
startWith(
Offline,
InvokerInfo(
new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
when(Offline) {
case Event(GracefulShutdown, _: InvokerInfo) =>
logging.warn(this, "Received a graceful shutdown flag, stopping the invoker.")
stay
case Event(Enable, _) =>
InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
startTestAction(self)
}
goto(Unhealthy)
}
when(Unhealthy) {
case Event(ContainerRemoved(_), _) =>
healthActionProxy = None
startTestAction(self)
stay
case Event(msg: FailureMessage, _) =>
logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
stay
case Event(ContainerCreationFailed(_), _) =>
stay
}
when(Healthy) {
case Event(msg: FailureMessage, _) =>
logging.error(this, s"invoker${instanceId}, status:${stateName} got a failure message: ${msg}")
goto(Unhealthy)
}
whenUnhandled {
case Event(_: Initialized, _) =>
// Initialized messages sent by ContainerProxy for HealthManger
stay()
case Event(ContainerRemoved(_), _) =>
// Drop messages sent by ContainerProxy for HealthManger
healthActionProxy = None
stay()
case Event(GracefulShutdown, _) =>
self ! GracefulShutdown
goto(Offline)
case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
if (stateName != Offline) {
handleHealthMessage(healthMsg.state, data.buffer)
} else {
stay
}
case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
// in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested lease not found, we need to get the lease again.
case Event(t: FailureMessage, _) =>
logging.error(this, s"Failure happens, restart InvokerHealthManager: ${t}")
goto(Offline)
}
// It is important to note that stateName and the stateData in onTransition callback refer to the previous one.
// We should access to the next data with nextStateData
onTransition {
case Offline -> Unhealthy =>
publishHealthStatusAndStay(Unhealthy, nextStateData)
case Healthy -> Unhealthy =>
unstashAll()
transid.mark(
this,
LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
s"invoker${instanceId.toInt} is unhealthy",
akka.event.Logging.WarningLevel)
startTestAction(self)
publishHealthStatusAndStay(Unhealthy, nextStateData)
case _ -> Healthy =>
logging.info(this, s"invoker became healthy, stop health action proxy.")
unstashAll()
stopTestAction()
publishHealthStatusAndStay(Healthy, nextStateData)
case oldState -> newState if oldState != newState =>
publishHealthStatusAndStay(newState, nextStateData)
unstashAll()
}
private def publishHealthStatusAndStay(state: InvokerState, stateData: InvokerHealthData) = {
stateData match {
case data: InvokerInfo =>
val invokerResourceMessage = InvokerResourceMessage(
state.asString,
data.memory.freeMemory,
data.memory.busyMemory,
data.memory.inProgressMemory,
instanceId.tags,
instanceId.dedicatedNamespaces)
dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
case data =>
logging.error(this, s"unexpected data is found: $data")
stay
}
}
initialize()
private def startTestAction(manager: ActorRef): Unit = {
val namespace = InvokerHealthManager.healthActionIdentity.namespace.name.asString
val docId = InvokerHealthManager.healthAction(instanceId).get.docid
WhiskAction.get(entityStore, docId).onComplete {
case Success(action) =>
val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
startHealthAction(initialize, manager)
case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
}
}
private def startHealthAction(initialize: Initialize, manager: ActorRef): Unit = {
healthActionProxy match {
case Some(proxy) =>
// make healthContainerProxy's status is Running, then healthContainerProxy can fetch the activation using ActivationServiceClient
proxy ! initialize
case None =>
val proxy = healthContainerProxyFactory(context, manager)
proxy ! initialize
healthActionProxy = Some(proxy)
}
}
def stopTestAction(): Unit = {
healthActionProxy.foreach {
healthActionProxy = None
_ ! GracefulShutdown
}
}
/**
* This method is to handle health message from ContainerProxy.pub
* It can induce status change.
*
* @param state activation result state
* @param buffer RingBuffer to track status
* @return
*/
def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State = {
buffer.add(state)
val falseStateCount = buffer.toList.count(_ == false)
if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
gotoIfNotThere(Healthy)
} else {
logging.warn(
this,
s"become unhealthy because system error exceeded the error tolerance, falseStateCount $falseStateCount, errorTolerance ${InvokerHealthManager.bufferErrorTolerance}")
gotoIfNotThere(Unhealthy)
}
}
/**
* This is to decide wether to change from the newState or not.
* If current state is already newState, it will stay, otherwise it will change its state.
*
* @param newState the desired state to change.
* @return
*/
private def gotoIfNotThere(newState: InvokerState) = {
if (stateName == newState) {
stay()
} else {
goto(newState)
}
}
/** Delays all incoming messages until unstashAll() is called */
def delay = {
stash()
stay
}
}
case class HealthActivationServiceClient() extends Actor {
private var closed: Boolean = false
override def receive: Receive = {
case StartClient => sender() ! ClientCreationCompleted()
case _: RequestActivation =>
InvokerHealthManager.healthActivation match {
case Some(activation) if !closed =>
sender() ! activation.copy(
transid = TransactionId.invokerHealthActivation,
activationId = ActivationId.generate())
case _ if closed =>
context.parent ! ClientClosed
context.stop(self)
case _ => // do nothing
}
case CloseClientProxy =>
closed = true
}
}
object InvokerHealthManager {
val healthActionNamePrefix = "invokerHealthTestAction"
val bufferSize = 10
val bufferErrorTolerance = 3
val healthActionIdentity: Identity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(
Subject(whiskSystem),
Namespace(EntityName(whiskSystem), uuid),
BasicAuthenticationAuthKey(uuid, Secret()),
Set[Privilege]())
}
def healthAction(i: InvokerInstanceId): Option[WhiskAction] =
ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:default").map { manifest =>
new WhiskAction(
namespace = InvokerHealthManager.healthActionIdentity.namespace.name.toPath,
name = EntityName(s"$healthActionNamePrefix${i.toInt}"),
exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None),
limits = ActionLimits(memory = MemoryLimit(MemoryLimit.MIN_MEMORY), logs = LogLimit(0.B)))
}
var healthActivation: Option[ActivationMessage] = None
private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[DocInfo] = {
implicit val tid: TransactionId = TransactionId.invokerHealthManager
implicit val ec: ExecutionContext = db.executionContext
implicit val logging: Logging = db.logging
WhiskAction
.get(db, action.docid)
.flatMap { oldAction =>
WhiskAction.put(db, action.revision(oldAction.rev), Some(oldAction))(tid, notifier = None)
}
.recoverWith {
case _: NoDocumentException => WhiskAction.put(db, action, old = None)(tid, notifier = None)
}
.andThen {
case Success(_) => logging.info(this, "test action for invoker health now exists")
case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
}
}
private def createHealthActivation(entityStore: ArtifactStore[WhiskEntity],
docInfo: DocInfo)(implicit ec: ExecutionContext, logging: Logging) = {
implicit val transId = TransactionId.invokerHealth
WhiskAction.get(entityStore, docInfo.id).onComplete {
case Success(action) =>
healthActivation = Some(
ActivationMessage(
TransactionId.invokerHealth,
action.toExecutableWhiskAction.get.fullyQualifiedName(true),
action.rev,
healthActionIdentity,
ActivationId.generate(),
ControllerInstanceId("health"),
blocking = false,
content = None))
case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
}
}
def prepare(entityStore: ArtifactStore[WhiskEntity],
invokerInstanceId: InvokerInstanceId)(implicit ec: ExecutionContext, logging: Logging): Future[Unit] = {
InvokerHealthManager.healthAction(invokerInstanceId) match {
case Some(action) =>
createTestActionForInvokerHealth(entityStore, action)
.map(docId => createHealthActivation(entityStore, docId))
case None =>
throw new IllegalStateException(
"cannot create test action for invoker health because runtime manifest is not valid")
}
}
def props(instanceId: InvokerInstanceId,
childFactory: (ActorRefFactory, ActorRef) => ActorRef,
dataManagementService: ActorRef,
entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: ActorSystem, logging: Logging): Props = {
Props(new InvokerHealthManager(instanceId, childFactory, dataManagementService, entityStore))
}
}
//recevied from ContainerProxy actor
case class HealthMessage(state: Boolean)
//rereived from ContainerPool actor
case class MemoryInfo(freeMemory: Long, busyMemory: Long, inProgressMemory: Long)
// Data stored in the Invoker
sealed class InvokerHealthData
case class InvokerInfo(buffer: RingBuffer[Boolean],
memory: MemoryInfo = MemoryInfo(0, 0, 0),
currentInvokerResource: Option[InvokerResourceMessage] = None)
extends InvokerHealthData