blob: 3482023fb9f2b67557734aebeacb7a3c9f6ce54d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.v2
import java.net.InetSocketAddress
import java.time.Instant
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.io.{IO, Tcp}
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.common.{LoggingMarkers, TransactionId, _}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.{
ActivationMessage,
CombinedCompletionAndResultMessage,
CompletionMessage,
ResultMessage
}
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
import org.apache.openwhisk.core.containerpool.v2.FunctionPullingContainerProxy.{
constructWhiskActivation,
containerName
}
import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, ActivationResponse => ExecutionResponse, _}
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.invoker.Invoker.LogsCollector
import org.apache.openwhisk.core.invoker.NamespaceBlacklist
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
import org.apache.openwhisk.grpc.RescheduleResponse
import org.apache.openwhisk.http.Messages
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol.{StringJsonFormat, _}
import spray.json._
import pureconfig.generic.auto._
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
// Events used internally
case class RunActivation(action: ExecutableWhiskAction, msg: ActivationMessage)
case class RunActivationCompleted(container: Container, action: ExecutableWhiskAction, duration: Option[Long])
case class InitCodeCompleted(data: WarmData)
// Events received by the actor
case class Initialize(invocationNamespace: String,
action: ExecutableWhiskAction,
schedulerHost: String,
rpcPort: Int,
transId: TransactionId)
case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration] = None)
// Event sent by the actor
case class ContainerCreationFailed(throwable: Throwable)
case class ContainerIsPaused(data: WarmData)
case class ClientCreationFailed(throwable: Throwable,
container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction)
case class ReadyToWork(data: PreWarmData)
case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
// States
sealed trait ProxyState
case object LeaseStart extends ProxyState
case object Uninitialized extends ProxyState
case object CreatingContainer extends ProxyState
case object ContainerCreated extends ProxyState
case object CreatingClient extends ProxyState
case object ClientCreated extends ProxyState
case object Running extends ProxyState
case object Pausing extends ProxyState
case object Paused extends ProxyState
case object Removing extends ProxyState
case object Rescheduling extends ProxyState
// Errors
case class ContainerHealthErrorWithResumedRun(tid: TransactionId, msg: String, resumeRun: RunActivation)
extends Exception(msg)
// Data
sealed abstract class Data(val memoryLimit: ByteSize) {
def getContainer: Option[Container]
}
case class NonexistentData() extends Data(0.B) {
override def getContainer = None
}
case class MemoryData(override val memoryLimit: ByteSize) extends Data(memoryLimit) {
override def getContainer = None
}
trait WithClient { val clientProxy: ActorRef }
case class PreWarmData(container: Container,
kind: String,
override val memoryLimit: ByteSize,
expires: Option[Deadline] = None)
extends Data(memoryLimit) {
override def getContainer = Some(container)
def isExpired(): Boolean = expires.exists(_.isOverdue())
}
case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
extends Data(action.limits.memory.megabytes.MB) {
override def getContainer = Some(container)
}
case class InitializedData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
ReschedulingData(container, invocationNamespace, action, clientProxy, resumeRun)
}
case class WarmData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
revision: DocRevision,
lastUsed: Instant,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
ReschedulingData(container, invocationNamespace, action, clientProxy, resumeRun)
}
case class ReschedulingData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef,
resumeRun: RunActivation)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
}
class FunctionPullingContainerProxy(
factory: (TransactionId,
String,
ImageName,
Boolean,
ByteSize,
Int,
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
FullyQualifiedEntityName,
DocRevision,
String,
Int,
ContainerId) => ActorRef,
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
collectLogs: LogsCollector,
getLiveContainerCount: (String, FullyQualifiedEntityName, DocRevision) => Future[Long],
getWarmedContainerLimit: (String) => Future[(Int, FiniteDuration)],
instance: InvokerInstanceId,
invokerHealthManager: ActorRef,
poolConfig: ContainerPoolConfig,
timeoutConfig: ContainerProxyTimeoutConfig,
healtCheckConfig: ContainerProxyHealthCheckConfig,
testTcp: Option[ActorRef])(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging)
extends FSM[ProxyState, Data]
with Stash {
startWith(Uninitialized, NonexistentData())
implicit val ec = actorSystem.dispatcher
private val UnusedTimeoutName = "UnusedTimeout"
private val unusedTimeout = timeoutConfig.pauseGrace
private val IdleTimeoutName = "PausingTimeout"
private val idleTimeout = timeoutConfig.idleContainer
private val KeepingTimeoutName = "KeepingTimeout"
private val RunningActivationTimeoutName = "RunningActivationTimeout"
private val runningActivationTimeout = 10.seconds
private var timedOut = false
var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
val tcp: ActorRef = testTcp.getOrElse(IO(Tcp)) //allows to testing interaction with Tcp extension
val runningActivations = new java.util.concurrent.ConcurrentHashMap[String, Boolean]
when(Uninitialized) {
// pre warm a container (creates a stem cell container)
case Event(job: Start, _) =>
factory(
TransactionId.invokerWarmup,
containerName(instance, "prewarm", job.exec.kind),
job.exec.image,
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
None)
.map(container => PreWarmData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow)))
.pipeTo(self)
goto(CreatingContainer)
// cold start
case Event(job: Initialize, _) =>
factory( // create a new container
TransactionId.invokerColdstart,
containerName(instance, job.action.namespace.namespace, job.action.name.asString),
job.action.exec.image,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
None)
.andThen {
case Failure(t) =>
context.parent ! ContainerCreationFailed(t)
}
.map { container =>
logging.debug(this, s"a container ${container.containerId} is created for ${job.action}")
// create a client
Try(
clientProxyFactory(
context,
job.invocationNamespace,
job.action.fullyQualifiedName(true),
job.action.rev,
job.schedulerHost,
job.rpcPort,
container.containerId)) match {
case Success(clientProxy) =>
clientProxy ! StartClient
ContainerCreatedData(container, job.invocationNamespace, job.action)
case Failure(t) =>
logging.error(this, s"failed to create activation client caused by: $t")
ClientCreationFailed(t, container, job.invocationNamespace, job.action)
}
}
.pipeTo(self)
goto(CreatingClient)
case _ => delay
}
when(CreatingContainer) {
// container was successfully obtained
case Event(completed: PreWarmData, _: NonexistentData) =>
context.parent ! ReadyToWork(completed)
goto(ContainerCreated) using completed
// container creation failed
case Event(t: FailureMessage, _: NonexistentData) =>
context.parent ! ContainerRemoved(true)
stop()
case _ => delay
}
// prewarmed state, container created
when(ContainerCreated) {
case Event(job: Initialize, data: PreWarmData) =>
Try(
clientProxyFactory(
context,
job.invocationNamespace,
job.action.fullyQualifiedName(true),
job.action.rev,
job.schedulerHost,
job.rpcPort,
data.container.containerId)) match {
case Success(proxy) =>
proxy ! StartClient
case Failure(t) =>
logging.error(this, s"failed to create activation client for ${job.action} caused by: $t")
self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
}
goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action)
case Event(Remove, data: PreWarmData) =>
cleanUp(data.container, None, false)
// prewarm container failed by health check
case Event(_: FailureMessage, data: PreWarmData) =>
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
cleanUp(data.container, None)
case _ => delay
}
when(CreatingClient) {
// wait for client creation when cold start
case Event(job: ContainerCreatedData, _: NonexistentData) =>
stay() using job
// wait for container creation when cold start
case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
stay()
// client was successfully obtained
case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
val clientProxy = proxy.getOrElse(sender())
val fqn = data.action.fullyQualifiedName(true)
val revision = data.action.rev
dataManagementService ! RegisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}",
"")
self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy)
goto(ClientCreated)
// client creation failed
case Event(t: ClientCreationFailed, _) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)
// there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy
// wait for container creation when cold start
case Event(ClientClosed, _: NonexistentData) =>
self ! ClientClosed
stay()
case Event(ClientClosed, data: ContainerCreatedData) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
None)
// container creation failed when cold start
case Event(t: FailureMessage, _) =>
context.parent ! ContainerRemoved(true)
stop()
case _ => delay
}
// this is for first invocation, once the first invocation is over we are ready to trigger getActivation for action concurrency
when(ClientCreated) {
// 1. request activation message to client
case Event(initializedData: InitializedData, _) =>
context.parent ! Initialized(initializedData)
initializedData.clientProxy ! RequestActivation()
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
stay() using initializedData
// 2. read executable action data from db
case Event(job: ActivationMessage, data: InitializedData) =>
timedOut = false
cancelTimer(UnusedTimeoutName)
handleActivationMessage(job, data.action)
.pipeTo(self)
stay() using data
// 3. request initialize and run command to container
case Event(job: RunActivation, data: InitializedData) =>
implicit val transid = job.msg.transid
logging.debug(this, s"received RunActivation ${job.msg.activationId} for ${job.action} in $stateName")
initializeAndRunActivation(data.container, data.clientProxy, job.action, job.msg, Some(job))
.map { activation =>
RunActivationCompleted(data.container, job.action, activation.duration)
}
.pipeTo(self)
// when it receives InitCodeCompleted, it will move to Running
stay using data
case Event(RetryRequestActivation, data: InitializedData) =>
// if this Container is marked with time out, do not retry
if (timedOut)
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
else {
data.clientProxy ! RequestActivation()
stay()
}
// code initialization was successful
case Event(completed: InitCodeCompleted, data: InitializedData) =>
// TODO support concurrency?
data.clientProxy ! ContainerWarmed // this container is warmed
1 until completed.data.action.limits.concurrency.maxConcurrent foreach { _ =>
data.clientProxy ! RequestActivation()
}
goto(Running) using completed.data // set warm data
// ContainerHealthError should cause
case Event(FailureMessage(e: ContainerHealthErrorWithResumedRun), data: InitializedData) =>
logging.error(
this,
s"container ${data.container.containerId.asString} health check failed on $stateName, ${e.resumeRun.msg.activationId} activation will be rescheduled")
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM)
// reschedule message
data.clientProxy ! RescheduleActivation(
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
e.resumeRun.msg)
goto(Rescheduling) using data.toReschedulingData(e.resumeRun)
// Failed to get activation or execute the action
case Event(t: FailureMessage, data: InitializedData) =>
logging.error(
this,
s"failed to initialize a container or run an activation for ${data.action} in state: $stateName caused by: $t")
// Stop containerProxy and ActivationClientProxy both immediately
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
case Event(StateTimeout, data: InitializedData) =>
logging.info(this, s"No more activation is coming in state: $stateName, action: ${data.action}")
// Just mark the ContainerProxy is timedout
timedOut = true
stay() // stay here because the ActivationClientProxy may send a new Activation message
case Event(ClientClosed, data: InitializedData) =>
logging.error(this, s"The Client closed in state: $stateName, action: ${data.action}")
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy).
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
None)
case _ => delay
}
when(Rescheduling, stateTimeout = 10.seconds) {
case Event(res: RescheduleResponse, data: ReschedulingData) =>
implicit val transId = data.resumeRun.msg.transid
if (!res.isRescheduled) {
logging.warn(this, s"failed to reschedule the message ${data.resumeRun.msg.activationId}, clean up data")
fallbackActivationForReschedulingData(data)
} else {
logging.warn(this, s"unhandled message is rescheduled, clean up data")
}
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
case Event(StateTimeout, data: ReschedulingData) =>
logging.error(this, s"Timeout for rescheduling message ${data.resumeRun.msg.activationId}, clean up data")(
data.resumeRun.msg.transid)
fallbackActivationForReschedulingData(data)
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
}
when(Running) {
// Run was successful.
// 1. request activation message to client
case Event(activationResult: RunActivationCompleted, data: WarmData) =>
// create timeout
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
data.clientProxy ! RequestActivation(activationResult.duration)
stay() using data
// 2. read executable action data from db
case Event(job: ActivationMessage, data: WarmData) =>
timedOut = false
cancelTimer(UnusedTimeoutName)
handleActivationMessage(job, data.action)
.pipeTo(self)
stay() using data
// 3. request run command to container
case Event(job: RunActivation, data: WarmData) =>
logging.debug(this, s"received RunActivation ${job.msg.activationId} for ${job.action} in $stateName")
implicit val transid = job.msg.transid
initializeAndRunActivation(data.container, data.clientProxy, job.action, job.msg, Some(job))
.map { activation =>
RunActivationCompleted(data.container, job.action, activation.duration)
}
.pipeTo(self)
stay using data.copy(lastUsed = Instant.now)
case Event(RetryRequestActivation, data: WarmData) =>
// if this Container is marked with time out, do not retry
if (timedOut) {
data.container.suspend()(TransactionId.invokerNanny).map(_ => ContainerPaused).pipeTo(self)
goto(Pausing)
} else {
data.clientProxy ! RequestActivation()
stay()
}
case Event(_: ResumeFailed, data: WarmData) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
// ContainerHealthError should cause
case Event(FailureMessage(e: ContainerHealthError), data: WarmData) =>
logging.error(this, s"health check failed on $stateName caused by: ContainerHealthError $e")
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM)
// Stop containerProxy and ActivationClientProxy both immediately,
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
// ContainerHealthError should cause
case Event(FailureMessage(e: ContainerHealthErrorWithResumedRun), data: WarmData) =>
logging.error(
this,
s"container ${data.container.containerId.asString} health check failed on $stateName, ${e.resumeRun.msg.activationId} activation will be rescheduled")
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_WARM)
// reschedule message
data.clientProxy ! RescheduleActivation(
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
e.resumeRun.msg)
goto(Rescheduling) using data.toReschedulingData(e.resumeRun)
// Failed to get activation or execute the action
case Event(t: FailureMessage, data: WarmData) =>
logging.error(this, s"failed to init or run in state: $stateName caused by: $t")
// Stop containerProxy and ActivationClientProxy both immediately,
// and don't send unhealthy state message to the health manager, it's already sent.
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
case Event(StateTimeout, data: WarmData) =>
logging.info(
this,
s"No more run activation is coming in state: $stateName, action: ${data.action}, container: ${data.container.containerId}")
// Just mark the ContainerProxy is timedout
timedOut = true
stay() // stay here because the ActivationClientProxy may send a new Activation message
case Event(ClientClosed, data: WarmData) =>
if (runningActivations.isEmpty) {
logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}")
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy).
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
None)
} else {
logging.info(
this,
s"Remain running activations ${runningActivations.keySet().toString()} when received ClientClosed")
setTimer(RunningActivationTimeoutName, ClientClosed, runningActivationTimeout)
stay
}
// shutdown the client first and wait for any remaining activation to be executed
// ContainerProxy will be terminated by StateTimeout if there is no further activation
case Event(GracefulShutdown, data: WarmData) =>
logging.info(this, s"receive GracefulShutdown for action: ${data.action}")
// Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
data.clientProxy ! CloseClientProxy
stay
case _ => delay
}
when(Pausing) {
case Event(ContainerPaused, data: WarmData) =>
dataManagementService ! RegisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
data.action.fullyQualifiedName(false),
data.revision,
instance,
data.container.containerId),
"")
// remove existing key so MemoryQueue can be terminated when timeout
dataManagementService ! UnregisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, data.action.fullyQualifiedName(true), data.action.rev, Some(instance), Some(data.container.containerId))}")
context.parent ! ContainerIsPaused(data)
goto(Paused)
case Event(_: FailureMessage, data: WarmData) =>
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))
case _ => delay
}
when(Paused) {
case Event(job: Initialize, data: WarmData) =>
implicit val transId = job.transId
val parent = context.parent
cancelTimer(IdleTimeoutName)
cancelTimer(KeepingTimeoutName)
data.container
.resume()
.map { _ =>
logging.info(this, s"Resumed container ${data.container.containerId}")
// put existing key again
dataManagementService ! RegisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, data.action.fullyQualifiedName(true), data.action.rev, Some(instance), Some(data.container.containerId))}",
"")
parent ! Resumed(data)
// the new queue may locates on an different scheduler, so recreate the activation client when necessary
// since akka port will no be used, we can put any value except 0 here
data.clientProxy ! RequestActivation(
newScheduler = Some(SchedulerEndpoints(job.schedulerHost, job.rpcPort, 10)))
setTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
timedOut = false
}
.recover {
case t: Throwable =>
logging.error(this, s"Failed to resume container ${data.container.containerId}, error: $t")
parent ! ResumeFailed(data)
self ! ResumeFailed(data)
}
// always clean data in etcd regardless of success and failure
dataManagementService ! UnregisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
data.action.fullyQualifiedName(false),
data.revision,
instance,
data.container.containerId))
goto(Running)
case Event(StateTimeout, data: WarmData) =>
(for {
count <- getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
(warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- getWarmedContainerLimit(
data.invocationNamespace)
} yield {
logging.info(
this,
s"Live container count: ${count}, warmed container keeping count configuration: ${warmedContainerKeepingCount} in namespace: ${data.invocationNamespace}")
if (count <= warmedContainerKeepingCount) {
Keep(warmedContainerKeepingTimeout)
} else {
Remove
}
}).pipeTo(self)
stay
case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) =>
logging.info(
this,
s"This is the remaining container for ${data.action}. The container will stop after $warmedContainerKeepingTimeout.")
setTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout)
stay
case Event(Remove | GracefulShutdown, data: WarmData) =>
dataManagementService ! UnregisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
data.action.fullyQualifiedName(false),
data.revision,
instance,
data.container.containerId))
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))
case _ => delay
}
when(Removing, unusedTimeout) {
// only if ClientProxy is closed, ContainerProxy stops. So it is important for ClientProxy to send ClientClosed.
case Event(ClientClosed, _) =>
stop()
// even if any error occurs, it still waits for ClientClosed event in order to be stopped after the client is closed.
case Event(t: FailureMessage, _) =>
logging.error(this, s"unable to delete a container due to ${t}")
stay
case Event(StateTimeout, _) =>
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")
stop
case Event(Remove | GracefulShutdown, _) =>
stay()
}
onTransition {
case _ -> Uninitialized => unstashAll()
case _ -> CreatingContainer => unstashAll()
case _ -> ContainerCreated =>
if (healtCheckConfig.enabled) {
nextStateData.getContainer.foreach { c =>
logging.info(this, s"enabling health ping for ${c.containerId.asString} on ContainerCreated")
enableHealthPing(c)
}
}
unstashAll()
case _ -> CreatingClient => unstashAll()
case _ -> ClientCreated => unstashAll()
case _ -> Running =>
if (healtCheckConfig.enabled && healthPingActor.isDefined) {
nextStateData.getContainer.foreach { c =>
logging.info(this, s"disabling health ping for ${c.containerId.asString} on Running")
disableHealthPing()
}
}
unstashAll()
case _ -> Paused => setTimer(IdleTimeoutName, StateTimeout, idleTimeout)
case _ -> Removing => unstashAll()
}
initialize()
/** Delays all incoming messages until unstashAll() is called */
def delay = {
stash()
stay
}
/**
* Only change the state if the currentState is not the newState.
*
* @param newState of the InvokerActor
*/
private def gotoIfNotThere(newState: ProxyState) = {
if (stateName == newState) stay() else goto(newState)
}
/**
* Clean up all meta data of invoking action
*
* @param container the container to destroy
* @param fqn the action to stop
* @param clientProxy the client to destroy
* @return
*/
private def cleanUp(container: Container,
invocationNamespace: String,
fqn: FullyQualifiedEntityName,
revision: DocRevision,
clientProxy: Option[ActorRef]): State = {
dataManagementService ! UnregisterData(
s"${ContainerKeys.existingContainers(invocationNamespace, fqn, revision, Some(instance), Some(container.containerId))}")
cleanUp(container, clientProxy)
}
private def cleanUp(container: Container, clientProxy: Option[ActorRef], replacePrewarm: Boolean = true): State = {
context.parent ! ContainerRemoved(replacePrewarm)
val unpause = stateName match {
case Paused => container.resume()(TransactionId.invokerNanny)
case _ => Future.successful(())
}
unpause.andThen {
case Success(_) => destroyContainer(container)
case Failure(t) =>
// docker may hang when try to remove a paused container, so we shouldn't remove it
logging.error(this, s"Failed to resume container ${container.containerId}, error: $t")
}
clientProxy match {
case Some(clientProxy) => clientProxy ! StopClientProxy
case None => self ! ClientClosed
}
gotoIfNotThere(Removing)
}
/**
* Destroys the container
*
* @param container the container to destroy
*/
private def destroyContainer(container: Container) = {
container
.destroy()(TransactionId.invokerNanny)
.andThen {
case Failure(t) =>
logging.error(this, s"Failed to destroy container: ${container.containerId.asString} caused by ${t}")
}
}
private def handleActivationMessage(msg: ActivationMessage, action: ExecutableWhiskAction): Future[RunActivation] = {
implicit val transid = msg.transid
logging.info(this, s"received a message ${msg.activationId} for ${msg.action} in $stateName")
if (!namespaceBlacklist.isBlacklisted(msg.user)) {
logging.debug(this, s"namespace ${msg.user.namespace.name} is not in the namespaceBlacklist")
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject
logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
// set trace context to continue tracing
WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty)
logging.warn(this, s"revision was not provided for ${actionid.id}")
get(entityStore, actionid.id, actionid.rev, actionid.rev != DocRevision.empty)
.flatMap { action =>
action.toExecutableWhiskAction match {
case Some(executable) =>
Future.successful(RunActivation(executable, msg))
case None =>
logging
.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
}
.recoverWith {
case DocumentRevisionMismatchException(_) =>
// if revision is mismatched, the action may have been updated,
// so try again with the latest code
logging.warn(
this,
s"msg ${msg.activationId} for ${msg.action} in $stateName is updated, fetching latest code")
handleActivationMessage(msg.copy(revision = DocRevision.empty), action)
case t =>
// If the action cannot be found, the user has concurrently deleted it,
// making this an application error. All other errors are considered system
// errors and should cause the invoker to be considered unhealthy.
val response = t match {
case _: NoDocumentException =>
ExecutionResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ExecutionResponse.whiskError(Messages.actionMismatchWhileInvoking)
case e: Throwable =>
logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.")
ExecutionResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
logging.error(
this,
s"Error to fetch action ${msg.action} for msg ${msg.activationId}, error is ${t.getMessage}")
val context = UserContext(msg.user)
val activation = generateFallbackActivation(action, msg, response)
sendActiveAck(
transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
storeActivation(msg.transid, activation, msg.blocking, context)
// in case action is removed container proxy should be terminated
Future.failed(new IllegalStateException("action does not exist"))
}
} else {
// Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry is not written.
val activation =
generateFallbackActivation(action, msg, ExecutionResponse.applicationError(Messages.namespacesBlacklisted))
sendActiveAck(
msg.transid,
activation,
false,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(msg.transid, activation, instance))
logging.warn(
this,
s"namespace ${msg.user.namespace.name} was blocked in containerProxy, complete msg ${msg.activationId} with error.")
Future.failed(new IllegalStateException(s"namespace ${msg.user.namespace.name} was blocked in containerProxy."))
}
}
private def enableHealthPing(c: Container) = {
val hpa = healthPingActor.getOrElse {
logging.info(this, s"creating health ping actor for ${c.addr.asString()}")
val hp = context.actorOf(
TCPPingClient
.props(tcp, c.toString(), healtCheckConfig, new InetSocketAddress(c.addr.host, c.addr.port)))
healthPingActor = Some(hp)
hp
}
hpa ! HealthPingEnabled(true)
}
private def disableHealthPing() = {
healthPingActor.foreach(_ ! HealthPingEnabled(false))
}
def fallbackActivationForReschedulingData(data: ReschedulingData): Unit = {
val context = UserContext(data.resumeRun.msg.user)
val activation =
generateFallbackActivation(data.action, data.resumeRun.msg, ExecutionResponse.whiskError(Messages.abnormalRun))
sendActiveAck(
data.resumeRun.msg.transid,
activation,
data.resumeRun.msg.blocking,
data.resumeRun.msg.rootControllerIndex,
data.resumeRun.msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(data.resumeRun.msg.transid, activation, instance))
storeActivation(data.resumeRun.msg.transid, activation, data.resumeRun.msg.blocking, context)
}
/**
* Runs the job, initialize first if necessary.
* Completes the job by:
* 1. sending an activate ack,
* 2. fetching the logs for the run,
* 3. indicating the resource is free to the parent pool,
* 4. recording the result to the data store
*
* @param container the container to run the job on
* @param job the job to run
* @return a future completing after logs have been collected and
* added to the WhiskActivation
*/
private def initializeAndRunActivation(
container: Container,
clientProxy: ActorRef,
action: ExecutableWhiskAction,
msg: ActivationMessage,
resumeRun: Option[RunActivation] = None)(implicit tid: TransactionId): Future[WhiskActivation] = {
// Add the activation to runningActivations set
runningActivations.put(msg.activationId.asString, true)
val actionTimeout = action.limits.timeout.duration
val (env, parameters) = ContainerProxy.partitionArguments(msg.content, msg.initArgs)
val environment = Map(
"namespace" -> msg.user.namespace.name.toJson,
"action_name" -> msg.action.qualifiedNameWithLeadingSlash.toJson,
"action_version" -> msg.action.version.toJson,
"activation_id" -> msg.activationId.toString.toJson,
"transaction_id" -> msg.transid.id.toJson)
// if the action requests the api key to be injected into the action context, add it here;
// treat a missing annotation as requesting the api key for backward compatibility
val authEnvironment = {
if (action.annotations.isTruthy(Annotations.ProvideApiKeyAnnotationName, valueForNonExistent = true)) {
msg.user.authkey.toEnvironment.fields
} else Map.empty
}
// Only initialize iff we haven't yet warmed the container
val initialize = stateData match {
case _: WarmData =>
Future.successful(None)
case _ =>
val owEnv = (authEnvironment ++ environment ++ Map(
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)) map {
case (key, value) => "__OW_" + key.toUpperCase -> value
}
container
.initialize(action.containerInitializer(env ++ owEnv), actionTimeout, action.limits.concurrency.maxConcurrent)
.map(Some(_))
}
val activation: Future[WhiskActivation] = initialize
.flatMap { initInterval =>
// immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap
if (initInterval.isDefined) {
stateData match {
case _: InitializedData =>
self ! InitCodeCompleted(
WarmData(container, msg.user.namespace.name.asString, action, msg.revision, Instant.now, clientProxy))
case _ =>
Future.failed(new IllegalStateException("lease does not exist"))
}
}
val env = authEnvironment ++ environment ++ Map(
// compute deadline on invoker side avoids discrepancies inside container
// but potentially under-estimates actual deadline
"deadline" -> (Instant.now.toEpochMilli + actionTimeout.toMillis).toString.toJson)
container
.run(
parameters,
env.toJson.asJsObject,
actionTimeout,
action.limits.concurrency.maxConcurrent,
resumeRun.isDefined)(msg.transid)
.map {
case (runInterval, response) =>
val initRunInterval = initInterval
.map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
.getOrElse(runInterval)
constructWhiskActivation(
action,
msg,
initInterval,
initRunInterval,
runInterval.duration >= actionTimeout,
response)
}
}
.recoverWith {
case h: ContainerHealthError if resumeRun.isDefined =>
// health error occurs
logging.error(this, s"caught healthchek check error while running activation")
Future.failed(ContainerHealthErrorWithResumedRun(h.tid, h.msg, resumeRun.get))
case InitializationError(interval, response) =>
Future.successful(
constructWhiskActivation(
action,
msg,
Some(interval),
interval,
interval.duration >= actionTimeout,
response))
case t =>
// Actually, this should never happen - but we want to make sure to not miss a problem
logging.error(this, s"caught unexpected error while running activation: $t")
Future.successful(
constructWhiskActivation(
action,
msg,
None,
Interval.zero,
false,
ExecutionResponse.whiskError(Messages.abnormalRun)))
}
val splitAckMessagesPendingLogCollection = collectLogs.logsToBeCollected(action)
// Sending an active ack is an asynchronous operation. The result is forwarded as soon as
// possible for blocking activations so that dependent activations can be scheduled. The
// completion message which frees a load balancer slot is sent after the active ack future
// completes to ensure proper ordering.
val sendResult = if (msg.blocking) {
activation.map { result =>
val ackMsg =
if (splitAckMessagesPendingLogCollection) ResultMessage(tid, result)
else CombinedCompletionAndResultMessage(tid, result, instance)
sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
}
} else {
// For non-blocking request, do not forward the result.
if (splitAckMessagesPendingLogCollection) Future.successful(())
else
activation.map { result =>
val ackMsg = CompletionMessage(tid, result, instance)
sendActiveAck(tid, result, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, ackMsg)
}
}
activation.foreach { activation =>
val healthMessage = HealthMessage(!activation.response.isWhiskError)
invokerHealthManager ! healthMessage
}
val context = UserContext(msg.user)
// Adds logs to the raw activation.
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
.flatMap { activation =>
// Skips log collection entirely, if the limit is set to 0
if (action.limits.logs.asMegaBytes == 0.MB) {
Future.successful(Right(activation))
} else {
val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
collectLogs(tid, msg.user, activation, container, action)
.andThen {
case Success(_) => tid.finished(this, start)
case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
}
.map(logs => Right(activation.withLogs(logs)))
.recover {
case LogCollectingException(logs) =>
Left(ActivationLogReadingError(activation.withLogs(logs)))
case _ =>
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
}
}
}
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completion message to the controller after the active ack ensures proper ordering
// (result is received before the completion message for blocking invokes).
if (splitAckMessagesPendingLogCollection) {
sendResult.onComplete(
_ =>
sendActiveAck(
tid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CompletionMessage(tid, activation, instance)))
}
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, msg.blocking, context)
}
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs
.andThen {
// remove activationId from runningActivations in any case
case _ => runningActivations.remove(msg.activationId.asString)
}
.flatMap {
case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
case Right(act) => Future.successful(act)
}
}
/** Generates an activation with zero runtime. Usually used for error cases */
private def generateFallbackActivation(action: ExecutableWhiskAction,
msg: ActivationMessage,
response: ExecutionResponse): WhiskActivation = {
val now = Instant.now
val causedBy = if (msg.causedBySequence) {
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
} else None
WhiskActivation(
activationId = msg.activationId,
namespace = msg.user.namespace.name.toPath,
subject = msg.user.subject,
cause = msg.cause,
name = msg.action.name,
version = msg.action.version.getOrElse(SemVer()),
start = now,
end = now,
duration = Some(0),
response = response,
annotations = {
Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++
Parameters(WhiskActivation.kindAnnotation, JsString(action.exec.kind)) ++
causedBy
})
}
}
object FunctionPullingContainerProxy {
def props(factory: (TransactionId,
String,
ImageName,
Boolean,
ByteSize,
Int,
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
FullyQualifiedEntityName,
DocRevision,
String,
Int,
ContainerId) => ActorRef,
ack: ActiveAck,
store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
collectLogs: LogsCollector,
getLiveContainerCount: (String, FullyQualifiedEntityName, DocRevision) => Future[Long],
getWarmedContainerLimit: (String) => Future[(Int, FiniteDuration)],
instance: InvokerInstanceId,
invokerHealthManager: ActorRef,
poolConfig: ContainerPoolConfig,
timeoutConfig: ContainerProxyTimeoutConfig,
healthCheckConfig: ContainerProxyHealthCheckConfig =
loadConfigOrThrow[ContainerProxyHealthCheckConfig](ConfigKeys.containerProxyHealth),
tcp: Option[ActorRef] = None)(implicit actorSystem: ActorSystem, mat: ActorMaterializer, logging: Logging) =
Props(
new FunctionPullingContainerProxy(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService,
clientProxyFactory,
ack,
store,
collectLogs,
getLiveContainerCount,
getWarmedContainerLimit,
instance,
invokerHealthManager,
poolConfig,
timeoutConfig,
healthCheckConfig,
tcp))
private val containerCount = new Counter
/**
* Generates a unique container name.
*
* @param prefix the container name's prefix
* @param suffix the container name's suffix
* @return a unique container name
*/
def containerName(instance: InvokerInstanceId, prefix: String, suffix: String): String = {
def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
val sanitizedPrefix = prefix.filter(isAllowed)
val sanitizedSuffix = suffix.filter(isAllowed)
s"${ContainerFactory.containerNamePrefix(instance)}_${containerCount.next()}_${sanitizedPrefix}_${sanitizedSuffix}"
}
/**
* Creates a WhiskActivation ready to be sent via active ack.
*
* @param job the job that was executed
* @param interval the time it took to execute the job
* @param response the response to return to the user
* @return a WhiskActivation to be sent to the user
*/
def constructWhiskActivation(action: ExecutableWhiskAction,
msg: ActivationMessage,
initInterval: Option[Interval],
totalInterval: Interval,
isTimeout: Boolean,
response: ExecutionResponse) = {
val causedBy = if (msg.causedBySequence) {
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
} else None
val waitTime = {
val end = initInterval.map(_.start).getOrElse(totalInterval.start)
Parameters(WhiskActivation.waitTimeAnnotation, Interval(msg.transid.meta.start, end).duration.toMillis.toJson)
}
val initTime = {
initInterval.map(initTime => Parameters(WhiskActivation.initTimeAnnotation, initTime.duration.toMillis.toJson))
}
val binding =
msg.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString)))
WhiskActivation(
activationId = msg.activationId,
namespace = msg.user.namespace.name.toPath,
subject = msg.user.subject,
cause = msg.cause,
name = action.name,
version = action.version,
start = totalInterval.start,
end = totalInterval.end,
duration = Some(totalInterval.duration.toMillis),
response = response,
annotations = {
Parameters(WhiskActivation.limitsAnnotation, action.limits.toJson) ++
Parameters(WhiskActivation.pathAnnotation, JsString(action.fullyQualifiedName(false).asString)) ++
Parameters(WhiskActivation.kindAnnotation, JsString(action.exec.kind)) ++
Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) ++
causedBy ++ initTime ++ waitTime ++ binding
})
}
}