blob: e8ce26f2adbcb96a59182009c3ed63c066958ec9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.scheduler.container
import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.ContainerCreationError.{
NoAvailableInvokersError,
NoAvailableResourceInvokersError
}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.{
Annotations,
ByteSize,
DocInfo,
DocRevision,
FullyQualifiedEntityName,
InvokerInstanceId,
MemoryLimit,
SchedulerInstanceId
}
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.scheduler.Scheduler
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
ContainerKeyMeta,
CreationJobState,
FailedCreationJob,
RegisterCreationJob,
ReschedulingCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.service.{
DeleteEvent,
PutEvent,
UnwatchEndpoint,
WatchEndpoint,
WatchEndpointInserted,
WatchEndpointRemoved
}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.collection.concurrent.TrieMap
import scala.util.{Failure, Success}
case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId)
case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double)
class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
provider: MessagingProvider,
schedulerInstanceId: SchedulerInstanceId,
etcdClient: EtcdClient,
config: WhiskConfig,
watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private val creationJobManager = jobManagerFactory(context)
private val messagingProducer = provider.getProducer(config)
private var warmedContainers = Set.empty[String]
private val warmedInvokers = TrieMap[Int, String]()
private val inProgressWarmedContainers = TrieMap.empty[String, String]
private val warmKey = ContainerKeys.warmedPrefix
private val invokerKey = InvokerKeys.prefix
private val watcherName = s"container-manager"
watcherService ! WatchEndpoint(warmKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
watcherService ! WatchEndpoint(invokerKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
override def receive: Receive = {
case ContainerCreation(msgs, memory, invocationNamespace) =>
createContainer(msgs, memory, invocationNamespace)
case ContainerDeletion(invocationNamespace, fqn, revision, whiskActionMetaData) =>
getInvokersWithOldContainer(invocationNamespace, fqn, revision)
.map { invokers =>
val msg = ContainerDeletionMessage(
TransactionId.containerDeletion,
invocationNamespace,
fqn,
revision,
whiskActionMetaData)
invokers.foreach(sendDeletionContainerToInvoker(messagingProducer, _, msg))
}
case rescheduling: ReschedulingCreationJob =>
val msg = rescheduling.toCreationMessage(schedulerInstanceId, rescheduling.retry + 1)
createContainer(
List(msg),
rescheduling.actionMetaData.limits.memory.megabytes.MB,
rescheduling.invocationNamespace)
case WatchEndpointInserted(watchKey, key, _, true) =>
watchKey match {
case `warmKey` => warmedContainers += key
case `invokerKey` =>
val invoker = InvokerKeys.getInstanceId(key)
warmedInvokers.getOrElseUpdate(invoker.instance, {
warmUpInvoker(invoker)
invoker.toString
})
}
case WatchEndpointRemoved(watchKey, key, _, true) =>
watchKey match {
case `warmKey` => warmedContainers -= key
case `invokerKey` =>
val invoker = InvokerKeys.getInstanceId(key)
warmedInvokers.remove(invoker.instance)
}
case FailedCreationJob(cid, _, _, _, _, _) =>
inProgressWarmedContainers.remove(cid.asString)
case SuccessfulCreationJob(cid, _, _, _) =>
inProgressWarmedContainers.remove(cid.asString)
case GracefulShutdown =>
watcherService ! UnwatchEndpoint(warmKey, isPrefix = true, watcherName)
watcherService ! UnwatchEndpoint(invokerKey, isPrefix = true, watcherName)
creationJobManager ! GracefulShutdown
case _ =>
}
private def createContainer(msgs: List[ContainerCreationMessage],
memory: ByteSize,
invocationNamespace: String): Unit = {
logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
val coldCreations = filterWarmedCreations(msgs)
if (coldCreations.nonEmpty)
ContainerManager
.getAvailableInvokers(etcdClient, memory, invocationNamespace)
.flatMap { invokers =>
if (invokers.isEmpty) {
coldCreations.foreach { msg =>
ContainerManager.sendState(
FailedCreationJob(
msg.creationId,
msg.invocationNamespace,
msg.action,
msg.revision,
NoAvailableInvokersError,
s"No available invokers."))
}
Future.failed(NoCapacityException("No available invokers."))
} else {
coldCreations.foreach { msg =>
creationJobManager ! RegisterCreationJob(msg)
}
Future {
ContainerManager
.schedule(invokers, coldCreations, memory)
.map { pair =>
sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg)
}
}
}.andThen {
case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t")
}
}
}
private def getInvokersWithOldContainer(invocationNamespace: String,
fqn: FullyQualifiedEntityName,
currentRevision: DocRevision): Future[List[Int]] = {
val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, fqn)
val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, invocationNamespace, fqn)
for {
existing <- etcdClient
.getPrefix(namespacePrefix)
.map { res =>
res.getKvsList.asScala.map { kv =>
parseExistingContainerKey(namespacePrefix, kv.getKey)
}
}
warmed <- etcdClient
.getPrefix(warmedPrefix)
.map { res =>
res.getKvsList.asScala.map { kv =>
parseWarmedContainerKey(warmedPrefix, kv.getKey)
}
}
} yield {
(existing ++ warmed)
.dropWhile(k => k.revision > currentRevision) // remain latest revision
.groupBy(k => k.invokerId) // remove duplicated value
.map(_._2.head.invokerId)
.toList
}
}
/**
* existingKey format: {tag}/namespace/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker{id}/container/{containerId}
*/
private def parseExistingContainerKey(prefix: String, existingKey: String): ContainerKeyMeta = {
val keys = existingKey.replace(prefix, "").split("/")
val revision = DocRevision(keys(0))
val invokerId = keys(1).replace("invoker", "").toInt
val containerId = keys(3)
ContainerKeyMeta(revision, invokerId, containerId)
}
/**
* warmedKey format: {tag}/warmed/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker/{id}/container/{containerId}
*/
private def parseWarmedContainerKey(prefix: String, warmedKey: String): ContainerKeyMeta = {
val keys = warmedKey.replace(prefix, "").split("/")
val revision = DocRevision(keys(0))
val invokerId = keys(2).toInt
val containerId = keys(4)
ContainerKeyMeta(revision, invokerId, containerId)
}
// Filter out messages which can use warmed container
private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
msgs.filter { msg =>
val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action)
val chosenInvoker = warmedContainers
.filter(!inProgressWarmedContainers.values.toSeq.contains(_))
.find { container =>
if (container.startsWith(warmedPrefix)) {
logging.info(this, s"Choose a warmed container $container")
inProgressWarmedContainers.update(msg.creationId.asString, container)
true
} else
false
}
.map(_.split("/").takeRight(3).apply(0))
if (chosenInvoker.nonEmpty) {
creationJobManager ! RegisterCreationJob(msg)
sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
false
} else
true
}
}
private def sendCreationContainerToInvoker(producer: MessageProducer,
invoker: Int,
msg: ContainerCreationMessage): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Scheduler.topicPrefix}invoker$invoker"
val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
producer.send(topic, msg).andThen {
case Success(status) =>
transid.finished(
this,
start,
s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
.topic()}[${status.partition()}][${status.offset()}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
transid.failed(this, start, s"error on posting to topic $topic")
}
}
private def sendDeletionContainerToInvoker(producer: MessageProducer,
invoker: Int,
msg: ContainerDeletionMessage): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Scheduler.topicPrefix}invoker$invoker"
val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
producer.send(topic, msg).andThen {
case Success(status) =>
transid.finished(
this,
start,
s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
.topic()}[${status.partition()}][${status.offset()}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
transid.failed(this, start, s"error on posting to topic $topic")
}
}
private def warmUpInvoker(invoker: InvokerInstanceId): Unit = {
logging.info(this, s"Warm up invoker $invoker")
WarmUp.warmUpContainerCreationMessage(schedulerInstanceId).foreach {
sendCreationContainerToInvoker(messagingProducer, invoker.instance, _)
}
}
// warm up all invokers
private def warmUp() = {
// warm up exist invokers
ContainerManager.getAvailableInvokers(etcdClient, MemoryLimit.MIN_MEMORY).map { invokers =>
invokers.foreach { invoker =>
warmedInvokers.getOrElseUpdate(invoker.id.instance, {
warmUpInvoker(invoker.id)
invoker.id.toString
})
}
}
}
warmUp()
}
object ContainerManager {
val fractionConfig: BlackboxFractionConfig =
loadConfigOrThrow[BlackboxFractionConfig](ConfigKeys.fraction)
private val managedFraction: Double = Math.max(0.0, Math.min(1.0, fractionConfig.managedFraction))
private val blackboxFraction: Double = Math.max(1.0 - managedFraction, Math.min(1.0, fractionConfig.blackboxFraction))
def props(jobManagerFactory: ActorRefFactory => ActorRef,
provider: MessagingProvider,
schedulerInstanceId: SchedulerInstanceId,
etcdClient: EtcdClient,
config: WhiskConfig,
watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging): Props =
Props(new ContainerManager(jobManagerFactory, provider, schedulerInstanceId, etcdClient, config, watcherService))
/**
* The rng algorithm is responsible for the invoker distribution, and the better the distribution, the smaller the number of rescheduling.
*
*/
def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
/**
* Assign an invoker to a message
*
* Assumption
* - The memory of each invoker is larger than minMemory.
* - Messages that are not assigned an invoker are discarded.
*
* @param invokers Available invoker pool
* @param msgs Messages to which the invoker will be assigned
* @param minMemory Minimum memory for all invokers
* @return A pair of messages and assigned invokers
*/
def schedule(invokers: List[InvokerHealth], msgs: List[ContainerCreationMessage], minMemory: ByteSize)(
implicit logging: Logging): List[ScheduledPair] = {
logging.info(this, s"usable total invoker size: ${invokers.size}")
val noTaggedInvokers = invokers.filter(_.id.tags.isEmpty)
val managed = Math.max(1, Math.ceil(noTaggedInvokers.size.toDouble * managedFraction).toInt)
val blackboxes = Math.max(1, Math.floor(noTaggedInvokers.size.toDouble * blackboxFraction).toInt)
val managedInvokers = noTaggedInvokers.take(managed)
val blackboxInvokers = noTaggedInvokers.takeRight(blackboxes)
logging.info(
this,
s"${msgs.size} creation messages for ${msgs.head.invocationNamespace}/${msgs.head.action}, managedFraction:$managedFraction, blackboxFraction:$blackboxFraction, managed invoker size:$managed, blackboxes invoker size:$blackboxes")
val list = msgs
.foldLeft((List.empty[ScheduledPair], invokers)) { (tuple, msg: ContainerCreationMessage) =>
val pairs = tuple._1
val candidates = tuple._2
val requiredResources =
msg.whiskActionMetaData.annotations
.getAs[Seq[String]](Annotations.InvokerResourcesAnnotationName)
.getOrElse(Seq.empty[String])
val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
.getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
.getOrElse(true)
val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
if (requiredResources.isEmpty) {
// only choose managed invokers or blackbox invokers
val wantedInvokers = if (isBlackboxInvocation) {
candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet
} else {
candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet
}
val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
if (wantedInvokers.nonEmpty) {
chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg)
} else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
} else {
sendState(
FailedCreationJob(
msg.creationId,
msg.invocationNamespace,
msg.action,
msg.revision,
NoAvailableInvokersError,
s"No available invokers."))
(pairs, candidates)
}
} else {
val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
if (wantedInvokers.nonEmpty) {
chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
} else if (resourcesStrictPolicy) {
sendState(
FailedCreationJob(
msg.creationId,
msg.invocationNamespace,
msg.action,
msg.revision,
NoAvailableResourceInvokersError,
s"No available invokers with resources $requiredResources."))
(pairs, candidates)
} else {
val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg)
} else {
val leftInvokers =
taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
if (leftInvokers.nonEmpty)
chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
else {
sendState(
FailedCreationJob(
msg.creationId,
msg.invocationNamespace,
msg.action,
msg.revision,
NoAvailableInvokersError,
s"No available invokers."))
(pairs, candidates)
}
}
}
}
}
._1 // pairs
list
}
private def chooseInvokerFromCandidates(
candidates: List[InvokerHealth],
wholeInvokers: List[InvokerHealth],
pairs: List[ScheduledPair],
msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = {
val idx = rng(mod = candidates.size)
val instance = candidates(idx)
// it must be compared to the instance unique id
val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head)
val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
val updated =
if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares.
wholeInvokers.updated(
idxInWhole,
instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB)))
} else {
// drop the nth element
val split = wholeInvokers.splitAt(idxInWhole)
val _ :: t = split._2
split._1 ::: t
}
(ScheduledPair(msg, instance.id) :: pairs, updated)
}
private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = {
QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
memoryQueueValue.queue ! state
case _ =>
logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
}
}
protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize, invocationNamespace: String)(
implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
etcd
.getPrefix(InvokerKeys.prefix)
.map { res =>
res.getKvsList.asScala
.map { kv =>
InvokerResourceMessage
.parse(kv.getValue.toString(StandardCharsets.UTF_8))
.map { resourceMessage =>
val status = resourceMessage.status match {
case Healthy.asString => Healthy
case Unhealthy.asString => Unhealthy
case _ => Offline
}
val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
val invoker = temporalId.copy(
userMemory = resourceMessage.freeMemory.MB,
busyMemory = Some(resourceMessage.busyMemory.MB),
tags = resourceMessage.tags,
dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
InvokerHealth(invoker, status)
}
.getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
}
.filter(i => i.status.isUsable)
.filter(_.id.userMemory >= minMemory)
.filter { invoker =>
invoker.id.dedicatedNamespaces.isEmpty || invoker.id.dedicatedNamespaces.contains(invocationNamespace)
}
.toList
}
}
protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize)(
implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
etcd
.getPrefix(InvokerKeys.prefix)
.map { res =>
res.getKvsList.asScala
.map { kv =>
InvokerResourceMessage
.parse(kv.getValue.toString(StandardCharsets.UTF_8))
.map { resourceMessage =>
val status = resourceMessage.status match {
case Healthy.asString => Healthy
case Unhealthy.asString => Unhealthy
case _ => Offline
}
val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
val invoker = temporalId.copy(
userMemory = resourceMessage.freeMemory.MB,
busyMemory = Some(resourceMessage.busyMemory.MB),
tags = resourceMessage.tags,
dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
InvokerHealth(invoker, status)
}
.getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
}
.filter(i => i.status.isUsable)
.filter(_.id.userMemory >= minMemory)
.toList
}
}
}
case class NoCapacityException(msg: String) extends Exception(msg)
/**
* TODO This needs to be moved to the QueueManager component that will be added later.
*/
object QueuePool {
private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
private[scheduler] def clear() = _queuePool.clear()
private[scheduler] def size = _queuePool.size
private[scheduler] def values = _queuePool.values
private[scheduler] def keys = _queuePool.keys
}
case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)