[New Scheduler] Manage container creation (#5074)
* Manage container creation
* Add ContainerManager Test
* Add dedicated namespace
* Remove namespace
* Apply scala fmt
* Add dedicatedNamespaces filter
* Add dedicatedNamespaces test
* Move InvokerState to common
* Unify InvokerHealth message
* Add configuration for test
* Add license header
* Remove compare InvokerResourceMessage
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 6851c70..1467639 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -456,5 +456,6 @@
{{ ret | join(',') }}"
scheduler:
+ protocol: "{{ scheduler_protocol | default('http') }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index afce8fb..a710d60 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -560,9 +560,11 @@
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)
+ // Time that is needed to produce message in kafka
+ val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
+
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
-
/*
* General markers
*/
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
index e425105..433505d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
@@ -17,5 +17,45 @@
package org.apache.openwhisk.common
+import org.apache.openwhisk.core.entity.InvokerInstanceId
+
case object GracefulShutdown
case object Enable
+
+// States an Invoker can be in
+sealed trait InvokerState {
+ val asString: String
+ val isUsable: Boolean
+}
+
+object InvokerState {
+ // Invokers in this state can be used to schedule workload to
+ sealed trait Usable extends InvokerState { val isUsable = true }
+ // No workload should be scheduled to invokers in this state
+ sealed trait Unusable extends InvokerState { val isUsable = false }
+
+ // A completely healthy invoker, pings arriving fine, no system errors
+ case object Healthy extends Usable { val asString = "up" }
+ // The invoker can not create a container
+ case object Unhealthy extends Unusable { val asString = "unhealthy" }
+ // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
+ case object Unresponsive extends Unusable { val asString = "unresponsive" }
+ // The invoker is down
+ case object Offline extends Unusable { val asString = "down" }
+}
+
+/**
+ * Describes an abstract invoker. An invoker is a local container pool manager that
+ * is in charge of the container life cycle management.
+ *
+ * @param id a unique instance identifier for the invoker
+ * @param status it status (healthy, unhealthy, unresponsive, offline)
+ */
+case class InvokerHealth(id: InvokerInstanceId, status: InvokerState) {
+ override def equals(obj: scala.Any): Boolean = obj match {
+ case that: InvokerHealth => that.id == this.id && that.status == this.status
+ case _ => false
+ }
+
+ override def toString = s"InvokerHealth($id, $status)"
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index f7df2d0..5ce0f44 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -232,11 +232,11 @@
val systemPrefix = "sid_"
- var containerCreation = TransactionId(systemPrefix + "containerCreation")
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
+ def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
@@ -245,7 +245,9 @@
val controller = TransactionId(systemPrefix + "controller") // Controller startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
- def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
+ var containerCreation = TransactionId(systemPrefix + "containerCreation")
+ var containerDeletion = TransactionId(systemPrefix + "containerDeletion")
+ val warmUp = TransactionId(systemPrefix + "warmUp")
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
new file mode 100644
index 0000000..cd2e205
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
+import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
+import org.apache.openwhisk.core.entity._
+
+object WarmUp {
+ val warmUpActionIdentity = {
+ val whiskSystem = "whisk.system"
+ val uuid = UUID()
+ Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
+ }
+
+ private val actionName = "warmUp"
+
+ // this action doesn't need to be in database
+ val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))
+
+ def warmUpActivation(controller: ControllerInstanceId) = {
+ ActivationMessage(
+ transid = TransactionId.warmUp,
+ action = warmUpAction,
+ revision = DocRevision.empty,
+ user = warmUpActionIdentity,
+ activationId = new ActivationIdGenerator {}.make(),
+ rootControllerIndex = controller,
+ blocking = false,
+ content = None,
+ initArgs = Set.empty)
+ }
+
+ def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
+ ExecManifest.runtimesManifest
+ .resolveDefaultRuntime("nodejs:default")
+ .map { manifest =>
+ val metadata = WhiskActionMetaData(
+ warmUpAction.path,
+ warmUpAction.name,
+ CodeExecMetaDataAsString(manifest, false, entryPoint = None))
+ ContainerCreationMessage(
+ TransactionId.warmUp,
+ warmUpActionIdentity.namespace.name.toString,
+ warmUpAction,
+ DocRevision.empty,
+ metadata,
+ scheduler,
+ "",
+ 0)
+ }
+
+ def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
+ fqn == warmUpAction
+ }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 1058e53..c7ec206 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -203,6 +203,7 @@
object ConfigKeys {
val cluster = "whisk.cluster"
val loadbalancer = "whisk.loadbalancer"
+ val fraction = "whisk.fraction"
val buildInformation = "whisk.info"
val couchdb = "whisk.couchdb"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
index 96ae58f..6655496 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
@@ -24,4 +24,6 @@
val RawHttpAnnotationName = "raw-http"
val RequireWhiskAuthAnnotation = "require-whisk-auth"
val ProvideApiKeyAnnotationName = "provide-api-key"
+ val InvokerResourcesAnnotationName = "invoker-resources"
+ val InvokerResourcesStrictPolicyAnnotationName = "invoker-resources-strict-policy"
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index a8ead9d..f3af376 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -26,12 +26,8 @@
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
-import pureconfig._
-import pureconfig.generic.auto._
-import spray.json.DefaultJsonProtocol._
-import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
-import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
@@ -40,13 +36,17 @@
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import pureconfig.generic.auto._
+import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.duration.DurationInt
-import scala.concurrent.Await
import scala.util.{Failure, Success}
/**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 8422491..4f4c8ad 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -43,28 +43,6 @@
case object Tick
-// States an Invoker can be in
-sealed trait InvokerState {
- val asString: String
- val isUsable: Boolean
-}
-
-object InvokerState {
- // Invokers in this state can be used to schedule workload to
- sealed trait Usable extends InvokerState { val isUsable = true }
- // No workload should be scheduled to invokers in this state
- sealed trait Unusable extends InvokerState { val isUsable = false }
-
- // A completely healthy invoker, pings arriving fine, no system errors
- case object Healthy extends Usable { val asString = "up" }
- // Pings are arriving fine, the invoker returns system errors though
- case object Unhealthy extends Unusable { val asString = "unhealthy" }
- // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
- case object Unresponsive extends Unusable { val asString = "unresponsive" }
- // Pings are not arriving for this invoker
- case object Offline extends Unusable { val asString = "down" }
-}
-
// Possible answers of an activation
sealed trait InvocationFinishedResult
object InvocationFinishedResult {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index ee839b3..4c87615 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -17,34 +17,18 @@
package org.apache.openwhisk.core.loadBalancer
-import scala.concurrent.Future
import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.stream.ActorMaterializer
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi
+import scala.concurrent.Future
import scala.concurrent.duration._
-/**
- * Describes an abstract invoker. An invoker is a local container pool manager that
- * is in charge of the container life cycle management.
- *
- * @param id a unique instance identifier for the invoker
- * @param status it status (healthy, unhealthy, offline)
- */
-class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
- override def equals(obj: scala.Any): Boolean = obj match {
- case that: InvokerHealth => that.id == this.id && that.status == this.status
- case _ => false
- }
-
- override def toString = s"InvokerHealth($id, $status)"
-}
-
trait LoadBalancer {
/**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 14d3ff4..1aaf68f 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -28,6 +28,7 @@
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common._
@@ -37,7 +38,6 @@
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.controller.Controller
-import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 71ab9a0..54bd174 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -20,6 +20,7 @@
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.util.Timeout
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.ContainerRemoved
@@ -350,23 +351,6 @@
}
}
-// States an Invoker can be in
-sealed trait InvokerState {
- val asString: String
-}
-
-case object Offline extends InvokerState {
- val asString = "down"
-}
-
-case object Healthy extends InvokerState {
- val asString = "up"
-}
-
-case object Unhealthy extends InvokerState {
- val asString = "unhealthy"
-}
-
//recevied from ContainerProxy actor
case class HealthMessage(state: Boolean)
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
new file mode 100644
index 0000000..fc62ef0
--- /dev/null
+++ b/core/scheduler/src/main/resources/application.conf
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+whisk{
+ # tracing configuration
+ tracing {
+ component = "Scheduler"
+ }
+
+ fraction {
+ managed-fraction: 90%
+ blackbox-fraction: 10%
+ }
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
new file mode 100644
index 0000000..e8ce26f
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.event.Logging.InfoLevel
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.connector.ContainerCreationError.{
+ NoAvailableInvokersError,
+ NoAvailableResourceInvokersError
+}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{
+ Annotations,
+ ByteSize,
+ DocInfo,
+ DocRevision,
+ FullyQualifiedEntityName,
+ InvokerInstanceId,
+ MemoryLimit,
+ SchedulerInstanceId
+}
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.scheduler.Scheduler
+import org.apache.openwhisk.core.scheduler.message.{
+ ContainerCreation,
+ ContainerDeletion,
+ ContainerKeyMeta,
+ CreationJobState,
+ FailedCreationJob,
+ RegisterCreationJob,
+ ReschedulingCreationJob,
+ SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.service.{
+ DeleteEvent,
+ PutEvent,
+ UnwatchEndpoint,
+ WatchEndpoint,
+ WatchEndpointInserted,
+ WatchEndpointRemoved
+}
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import pureconfig.generic.auto._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
+import scala.collection.concurrent.TrieMap
+import scala.util.{Failure, Success}
+
+case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId)
+
+case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double)
+
+class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
+ provider: MessagingProvider,
+ schedulerInstanceId: SchedulerInstanceId,
+ etcdClient: EtcdClient,
+ config: WhiskConfig,
+ watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
+ extends Actor {
+ private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+ private val creationJobManager = jobManagerFactory(context)
+
+ private val messagingProducer = provider.getProducer(config)
+
+ private var warmedContainers = Set.empty[String]
+
+ private val warmedInvokers = TrieMap[Int, String]()
+
+ private val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+ private val warmKey = ContainerKeys.warmedPrefix
+ private val invokerKey = InvokerKeys.prefix
+ private val watcherName = s"container-manager"
+
+ watcherService ! WatchEndpoint(warmKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+ watcherService ! WatchEndpoint(invokerKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+
+ override def receive: Receive = {
+ case ContainerCreation(msgs, memory, invocationNamespace) =>
+ createContainer(msgs, memory, invocationNamespace)
+
+ case ContainerDeletion(invocationNamespace, fqn, revision, whiskActionMetaData) =>
+ getInvokersWithOldContainer(invocationNamespace, fqn, revision)
+ .map { invokers =>
+ val msg = ContainerDeletionMessage(
+ TransactionId.containerDeletion,
+ invocationNamespace,
+ fqn,
+ revision,
+ whiskActionMetaData)
+ invokers.foreach(sendDeletionContainerToInvoker(messagingProducer, _, msg))
+ }
+
+ case rescheduling: ReschedulingCreationJob =>
+ val msg = rescheduling.toCreationMessage(schedulerInstanceId, rescheduling.retry + 1)
+ createContainer(
+ List(msg),
+ rescheduling.actionMetaData.limits.memory.megabytes.MB,
+ rescheduling.invocationNamespace)
+
+ case WatchEndpointInserted(watchKey, key, _, true) =>
+ watchKey match {
+ case `warmKey` => warmedContainers += key
+ case `invokerKey` =>
+ val invoker = InvokerKeys.getInstanceId(key)
+ warmedInvokers.getOrElseUpdate(invoker.instance, {
+ warmUpInvoker(invoker)
+ invoker.toString
+ })
+ }
+
+ case WatchEndpointRemoved(watchKey, key, _, true) =>
+ watchKey match {
+ case `warmKey` => warmedContainers -= key
+ case `invokerKey` =>
+ val invoker = InvokerKeys.getInstanceId(key)
+ warmedInvokers.remove(invoker.instance)
+ }
+
+ case FailedCreationJob(cid, _, _, _, _, _) =>
+ inProgressWarmedContainers.remove(cid.asString)
+
+ case SuccessfulCreationJob(cid, _, _, _) =>
+ inProgressWarmedContainers.remove(cid.asString)
+
+ case GracefulShutdown =>
+ watcherService ! UnwatchEndpoint(warmKey, isPrefix = true, watcherName)
+ watcherService ! UnwatchEndpoint(invokerKey, isPrefix = true, watcherName)
+ creationJobManager ! GracefulShutdown
+
+ case _ =>
+ }
+
+ private def createContainer(msgs: List[ContainerCreationMessage],
+ memory: ByteSize,
+ invocationNamespace: String): Unit = {
+ logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
+ val coldCreations = filterWarmedCreations(msgs)
+ if (coldCreations.nonEmpty)
+ ContainerManager
+ .getAvailableInvokers(etcdClient, memory, invocationNamespace)
+ .flatMap { invokers =>
+ if (invokers.isEmpty) {
+ coldCreations.foreach { msg =>
+ ContainerManager.sendState(
+ FailedCreationJob(
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ NoAvailableInvokersError,
+ s"No available invokers."))
+ }
+ Future.failed(NoCapacityException("No available invokers."))
+ } else {
+ coldCreations.foreach { msg =>
+ creationJobManager ! RegisterCreationJob(msg)
+ }
+
+ Future {
+ ContainerManager
+ .schedule(invokers, coldCreations, memory)
+ .map { pair =>
+ sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg)
+ }
+ }
+ }.andThen {
+ case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t")
+ }
+ }
+ }
+
+ private def getInvokersWithOldContainer(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ currentRevision: DocRevision): Future[List[Int]] = {
+ val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, fqn)
+ val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, invocationNamespace, fqn)
+
+ for {
+ existing <- etcdClient
+ .getPrefix(namespacePrefix)
+ .map { res =>
+ res.getKvsList.asScala.map { kv =>
+ parseExistingContainerKey(namespacePrefix, kv.getKey)
+ }
+ }
+ warmed <- etcdClient
+ .getPrefix(warmedPrefix)
+ .map { res =>
+ res.getKvsList.asScala.map { kv =>
+ parseWarmedContainerKey(warmedPrefix, kv.getKey)
+ }
+ }
+ } yield {
+ (existing ++ warmed)
+ .dropWhile(k => k.revision > currentRevision) // remain latest revision
+ .groupBy(k => k.invokerId) // remove duplicated value
+ .map(_._2.head.invokerId)
+ .toList
+ }
+ }
+
+ /**
+ * existingKey format: {tag}/namespace/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker{id}/container/{containerId}
+ */
+ private def parseExistingContainerKey(prefix: String, existingKey: String): ContainerKeyMeta = {
+ val keys = existingKey.replace(prefix, "").split("/")
+ val revision = DocRevision(keys(0))
+ val invokerId = keys(1).replace("invoker", "").toInt
+ val containerId = keys(3)
+ ContainerKeyMeta(revision, invokerId, containerId)
+ }
+
+ /**
+ * warmedKey format: {tag}/warmed/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker/{id}/container/{containerId}
+ */
+ private def parseWarmedContainerKey(prefix: String, warmedKey: String): ContainerKeyMeta = {
+ val keys = warmedKey.replace(prefix, "").split("/")
+ val revision = DocRevision(keys(0))
+ val invokerId = keys(2).toInt
+ val containerId = keys(4)
+ ContainerKeyMeta(revision, invokerId, containerId)
+ }
+
+ // Filter out messages which can use warmed container
+ private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
+ msgs.filter { msg =>
+ val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action)
+ val chosenInvoker = warmedContainers
+ .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+ .find { container =>
+ if (container.startsWith(warmedPrefix)) {
+ logging.info(this, s"Choose a warmed container $container")
+ inProgressWarmedContainers.update(msg.creationId.asString, container)
+ true
+ } else
+ false
+ }
+ .map(_.split("/").takeRight(3).apply(0))
+ if (chosenInvoker.nonEmpty) {
+ creationJobManager ! RegisterCreationJob(msg)
+ sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
+ false
+ } else
+ true
+ }
+ }
+
+ private def sendCreationContainerToInvoker(producer: MessageProducer,
+ invoker: Int,
+ msg: ContainerCreationMessage): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+ val topic = s"${Scheduler.topicPrefix}invoker$invoker"
+ val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
+ .topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) =>
+ logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
+ transid.failed(this, start, s"error on posting to topic $topic")
+ }
+ }
+
+ private def sendDeletionContainerToInvoker(producer: MessageProducer,
+ invoker: Int,
+ msg: ContainerDeletionMessage): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+ val topic = s"${Scheduler.topicPrefix}invoker$invoker"
+ val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
+ .topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) =>
+ logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
+ transid.failed(this, start, s"error on posting to topic $topic")
+ }
+ }
+
+ private def warmUpInvoker(invoker: InvokerInstanceId): Unit = {
+ logging.info(this, s"Warm up invoker $invoker")
+ WarmUp.warmUpContainerCreationMessage(schedulerInstanceId).foreach {
+ sendCreationContainerToInvoker(messagingProducer, invoker.instance, _)
+ }
+ }
+
+ // warm up all invokers
+ private def warmUp() = {
+ // warm up exist invokers
+ ContainerManager.getAvailableInvokers(etcdClient, MemoryLimit.MIN_MEMORY).map { invokers =>
+ invokers.foreach { invoker =>
+ warmedInvokers.getOrElseUpdate(invoker.id.instance, {
+ warmUpInvoker(invoker.id)
+ invoker.id.toString
+ })
+ }
+ }
+
+ }
+
+ warmUp()
+}
+
+object ContainerManager {
+ val fractionConfig: BlackboxFractionConfig =
+ loadConfigOrThrow[BlackboxFractionConfig](ConfigKeys.fraction)
+
+ private val managedFraction: Double = Math.max(0.0, Math.min(1.0, fractionConfig.managedFraction))
+ private val blackboxFraction: Double = Math.max(1.0 - managedFraction, Math.min(1.0, fractionConfig.blackboxFraction))
+
+ def props(jobManagerFactory: ActorRefFactory => ActorRef,
+ provider: MessagingProvider,
+ schedulerInstanceId: SchedulerInstanceId,
+ etcdClient: EtcdClient,
+ config: WhiskConfig,
+ watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging): Props =
+ Props(new ContainerManager(jobManagerFactory, provider, schedulerInstanceId, etcdClient, config, watcherService))
+
+ /**
+ * The rng algorithm is responsible for the invoker distribution, and the better the distribution, the smaller the number of rescheduling.
+ *
+ */
+ def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
+
+ /**
+ * Assign an invoker to a message
+ *
+ * Assumption
+ * - The memory of each invoker is larger than minMemory.
+ * - Messages that are not assigned an invoker are discarded.
+ *
+ * @param invokers Available invoker pool
+ * @param msgs Messages to which the invoker will be assigned
+ * @param minMemory Minimum memory for all invokers
+ * @return A pair of messages and assigned invokers
+ */
+ def schedule(invokers: List[InvokerHealth], msgs: List[ContainerCreationMessage], minMemory: ByteSize)(
+ implicit logging: Logging): List[ScheduledPair] = {
+ logging.info(this, s"usable total invoker size: ${invokers.size}")
+ val noTaggedInvokers = invokers.filter(_.id.tags.isEmpty)
+ val managed = Math.max(1, Math.ceil(noTaggedInvokers.size.toDouble * managedFraction).toInt)
+ val blackboxes = Math.max(1, Math.floor(noTaggedInvokers.size.toDouble * blackboxFraction).toInt)
+ val managedInvokers = noTaggedInvokers.take(managed)
+ val blackboxInvokers = noTaggedInvokers.takeRight(blackboxes)
+ logging.info(
+ this,
+ s"${msgs.size} creation messages for ${msgs.head.invocationNamespace}/${msgs.head.action}, managedFraction:$managedFraction, blackboxFraction:$blackboxFraction, managed invoker size:$managed, blackboxes invoker size:$blackboxes")
+ val list = msgs
+ .foldLeft((List.empty[ScheduledPair], invokers)) { (tuple, msg: ContainerCreationMessage) =>
+ val pairs = tuple._1
+ val candidates = tuple._2
+
+ val requiredResources =
+ msg.whiskActionMetaData.annotations
+ .getAs[Seq[String]](Annotations.InvokerResourcesAnnotationName)
+ .getOrElse(Seq.empty[String])
+ val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
+ .getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
+ .getOrElse(true)
+ val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+ if (requiredResources.isEmpty) {
+ // only choose managed invokers or blackbox invokers
+ val wantedInvokers = if (isBlackboxInvocation) {
+ candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet
+ } else {
+ candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet
+ }
+ val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
+
+ if (wantedInvokers.nonEmpty) {
+ chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg)
+ } else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
+ chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+ } else {
+ sendState(
+ FailedCreationJob(
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ NoAvailableInvokersError,
+ s"No available invokers."))
+ (pairs, candidates)
+ }
+ } else {
+ val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
+ if (wantedInvokers.nonEmpty) {
+ chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+ } else if (resourcesStrictPolicy) {
+ sendState(
+ FailedCreationJob(
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ NoAvailableResourceInvokersError,
+ s"No available invokers with resources $requiredResources."))
+ (pairs, candidates)
+ } else {
+ val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
+ if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
+ chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg)
+ } else {
+ val leftInvokers =
+ taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
+ if (leftInvokers.nonEmpty)
+ chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
+ else {
+ sendState(
+ FailedCreationJob(
+ msg.creationId,
+ msg.invocationNamespace,
+ msg.action,
+ msg.revision,
+ NoAvailableInvokersError,
+ s"No available invokers."))
+ (pairs, candidates)
+ }
+ }
+ }
+ }
+ }
+ ._1 // pairs
+ list
+ }
+
+ private def chooseInvokerFromCandidates(
+ candidates: List[InvokerHealth],
+ wholeInvokers: List[InvokerHealth],
+ pairs: List[ScheduledPair],
+ msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = {
+ val idx = rng(mod = candidates.size)
+ val instance = candidates(idx)
+ // it must be compared to the instance unique id
+ val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head)
+ val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
+ val updated =
+ if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares.
+ wholeInvokers.updated(
+ idxInWhole,
+ instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB)))
+ } else {
+ // drop the nth element
+ val split = wholeInvokers.splitAt(idxInWhole)
+ val _ :: t = split._2
+ split._1 ::: t
+ }
+
+ (ScheduledPair(msg, instance.id) :: pairs, updated)
+ }
+
+ private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = {
+ QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
+ case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+ memoryQueueValue.queue ! state
+ case _ =>
+ logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
+ }
+ }
+
+ protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize, invocationNamespace: String)(
+ implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
+ etcd
+ .getPrefix(InvokerKeys.prefix)
+ .map { res =>
+ res.getKvsList.asScala
+ .map { kv =>
+ InvokerResourceMessage
+ .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+ .map { resourceMessage =>
+ val status = resourceMessage.status match {
+ case Healthy.asString => Healthy
+ case Unhealthy.asString => Unhealthy
+ case _ => Offline
+ }
+
+ val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+ val invoker = temporalId.copy(
+ userMemory = resourceMessage.freeMemory.MB,
+ busyMemory = Some(resourceMessage.busyMemory.MB),
+ tags = resourceMessage.tags,
+ dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
+
+ InvokerHealth(invoker, status)
+ }
+ .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
+ }
+ .filter(i => i.status.isUsable)
+ .filter(_.id.userMemory >= minMemory)
+ .filter { invoker =>
+ invoker.id.dedicatedNamespaces.isEmpty || invoker.id.dedicatedNamespaces.contains(invocationNamespace)
+ }
+ .toList
+ }
+ }
+
+ protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize)(
+ implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
+ etcd
+ .getPrefix(InvokerKeys.prefix)
+ .map { res =>
+ res.getKvsList.asScala
+ .map { kv =>
+ InvokerResourceMessage
+ .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+ .map { resourceMessage =>
+ val status = resourceMessage.status match {
+ case Healthy.asString => Healthy
+ case Unhealthy.asString => Unhealthy
+ case _ => Offline
+ }
+
+ val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+ val invoker = temporalId.copy(
+ userMemory = resourceMessage.freeMemory.MB,
+ busyMemory = Some(resourceMessage.busyMemory.MB),
+ tags = resourceMessage.tags,
+ dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
+ InvokerHealth(invoker, status)
+ }
+ .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
+ }
+ .filter(i => i.status.isUsable)
+ .filter(_.id.userMemory >= minMemory)
+ .toList
+ }
+ }
+
+}
+
+case class NoCapacityException(msg: String) extends Exception(msg)
+
+/**
+ * TODO This needs to be moved to the QueueManager component that will be added later.
+ */
+object QueuePool {
+ private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
+
+ private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
+
+ private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
+
+ private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
+
+ private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
+
+ private[scheduler] def clear() = _queuePool.clear()
+
+ private[scheduler] def size = _queuePool.size
+
+ private[scheduler] def values = _queuePool.values
+
+ private[scheduler] def keys = _queuePool.keys
+}
+case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
+case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala
new file mode 100644
index 0000000..b34149e
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.message
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.{
+ ContainerCreationAckMessage,
+ ContainerCreationError,
+ ContainerCreationMessage
+}
+import org.apache.openwhisk.core.entity.{
+ ByteSize,
+ CreationId,
+ DocRevision,
+ FullyQualifiedEntityName,
+ SchedulerInstanceId,
+ WhiskActionMetaData
+}
+
+case class ContainerKeyMeta(revision: DocRevision, invokerId: Int, containerId: String)
+
+case class ContainerCreation(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)
+case class ContainerDeletion(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ whiskActionMetaData: WhiskActionMetaData)
+
+sealed trait CreationJob
+case class RegisterCreationJob(msg: ContainerCreationMessage) extends CreationJob
+case class FinishCreationJob(ack: ContainerCreationAckMessage) extends CreationJob
+case class ReschedulingCreationJob(tid: TransactionId,
+ creationId: CreationId,
+ invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ actionMetaData: WhiskActionMetaData,
+ schedulerHost: String,
+ rpcPort: Int,
+ retry: Int)
+ extends CreationJob {
+
+ def toCreationMessage(sid: SchedulerInstanceId, retryCount: Int): ContainerCreationMessage =
+ ContainerCreationMessage(
+ tid,
+ invocationNamespace,
+ action,
+ revision,
+ actionMetaData,
+ sid,
+ schedulerHost,
+ rpcPort,
+ retryCount,
+ creationId)
+}
+
+abstract class CreationJobState(val creationId: CreationId,
+ val invocationNamespace: String,
+ val action: FullyQualifiedEntityName,
+ val revision: DocRevision)
+case class FailedCreationJob(override val creationId: CreationId,
+ override val invocationNamespace: String,
+ override val action: FullyQualifiedEntityName,
+ override val revision: DocRevision,
+ error: ContainerCreationError,
+ message: String)
+ extends CreationJobState(creationId, invocationNamespace, action, revision)
+case class SuccessfulCreationJob(override val creationId: CreationId,
+ override val invocationNamespace: String,
+ override val action: FullyQualifiedEntityName,
+ override val revision: DocRevision)
+ extends CreationJobState(creationId, invocationNamespace, action, revision)
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 5ce7f31..944f2af 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -132,6 +132,11 @@
threads = "{{ etcd.pool_threads }}"
}
}
+
+ scheduler {
+ protocol = "{{ scheduler.protocol }}"
+ }
+
}
#test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
index 8ab5946..d574069 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
@@ -22,6 +22,7 @@
import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestActor, TestFSMRef, TestKit, TestProbe}
import common.StreamLogging
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common.{Enable, GracefulShutdown, RingBuffer}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.InvokerResourceMessage
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 7e0d88f..a4a6145 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -43,7 +43,8 @@
import akka.testkit.TestProbe
import akka.util.Timeout
import common.{LoggedFunction, StreamLogging}
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
+import org.apache.openwhisk.common.{InvokerHealth, InvokerState, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.connector.PingMessage
@@ -52,13 +53,10 @@
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.loadBalancer.ActivationRequest
import org.apache.openwhisk.core.loadBalancer.GetStatus
-import org.apache.openwhisk.core.loadBalancer.InvokerState._
import org.apache.openwhisk.core.loadBalancer.InvocationFinishedResult
import org.apache.openwhisk.core.loadBalancer.InvocationFinishedMessage
import org.apache.openwhisk.core.loadBalancer.InvokerActor
import org.apache.openwhisk.core.loadBalancer.InvokerPool
-import org.apache.openwhisk.core.loadBalancer.InvokerState
-import org.apache.openwhisk.core.loadBalancer.InvokerHealth
import org.apache.openwhisk.utils.retry
import org.apache.openwhisk.core.connector.test.TestConnector
import org.apache.openwhisk.core.entity.ControllerInstanceId
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 33dddf9..dbadbce 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -27,6 +27,7 @@
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
@@ -36,10 +37,8 @@
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.NestedSemaphore
+import org.apache.openwhisk.common.{InvokerHealth, Logging, NestedSemaphore, TransactionId}
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
-import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.connector.CompletionMessage
@@ -67,7 +66,6 @@
import org.apache.openwhisk.core.entity.test.ExecHelpers
import org.apache.openwhisk.core.loadBalancer.FeedFactory
import org.apache.openwhisk.core.loadBalancer.InvokerPoolFactory
-import org.apache.openwhisk.core.loadBalancer.InvokerState._
import org.apache.openwhisk.core.loadBalancer._
/**
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
new file mode 100644
index 0000000..aa68832
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container.test
+
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import com.ibm.etcd.api.{KeyValue, RangeResponse}
+import common.{StreamLogging, WskActorSystem}
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.InvokerState.{Healthy, Unhealthy}
+import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, TransactionId}
+import org.apache.openwhisk.core.connector.ContainerCreationError.{
+ NoAvailableInvokersError,
+ NoAvailableResourceInvokersError
+}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.{ContainerId, Uninitialized}
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.test.ExecHelpers
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.message.{
+ ContainerCreation,
+ ContainerDeletion,
+ FailedCreationJob,
+ RegisterCreationJob,
+ ReschedulingCreationJob,
+ SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.service.WatchEndpointInserted
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsArray, JsBoolean, JsString}
+
+import pureconfig.generic.auto._
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration.{FiniteDuration, _}
+
+@RunWith(classOf[JUnitRunner])
+class ContainerManagerTests
+ extends TestKit(ActorSystem("ContainerManager"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with StreamLogging {
+
+ val config = new WhiskConfig(ExecManifest.requiredProperties)
+
+ val testInvocationNamespace = "test-invocation-namespace"
+ val testNamespace = "test-namespace"
+ val testAction = "test-action"
+ val testfqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction))
+ val blackboxInvocation = false
+ val testCreationId = CreationId.generate()
+ val testRevision = DocRevision("1-testRev")
+ val testMemory = 256.MB
+ val testResources = Seq.empty[String]
+ val resourcesStrictPolicy = false
+
+ val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+ val action = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+ val execMetadata = CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
+ val actionMetadata =
+ WhiskActionMetaData(
+ action.namespace,
+ action.name,
+ execMetadata,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations)
+
+ val invokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(3, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(4, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(5, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(6, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(7, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(8, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(9, userMemory = testMemory, tags = Seq.empty[String]), Healthy))
+
+ val testsid = SchedulerInstanceId("0")
+
+ val schedulerHost = "127.17.0.1"
+ val rpcPort = 13001
+
+ override def afterAll(): Unit = {
+ logLines.foreach(println)
+ QueuePool.clear()
+ TestKit.shutdownActorSystem(system)
+ super.afterAll()
+ }
+
+ override def beforeEach(): Unit = {
+ QueuePool.clear()
+ }
+
+ def mockMessaging(receiver: Option[ActorRef] = None): MessagingProvider = {
+ val messaging = stub[MessagingProvider]
+ val producer = receiver.map(fakeProducer(_)).getOrElse(stub[MessageProducer])
+ val consumer = stub[MessageConsumer]
+ (messaging
+ .getProducer(_: WhiskConfig, _: Option[ByteSize])(_: Logging, _: ActorSystem))
+ .when(*, *, *, *)
+ .returns(producer)
+ (messaging
+ .getConsumer(_: WhiskConfig, _: String, _: String, _: Int, _: FiniteDuration)(_: Logging, _: ActorSystem))
+ .when(*, *, *, *, *, *, *)
+ .returns(consumer)
+ // this is a stub producer
+ if (receiver.isEmpty) {
+ (producer
+ .send(_: String, _: Message, _: Int))
+ .when(*, *, *)
+ .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+ }
+
+ messaging
+ }
+
+ private def fakeProducer(receiver: ActorRef) = new MessageProducer {
+
+ /** Count of messages sent. */
+ override def sentCount(): Long = 0
+
+ /** Sends msg to topic. This is an asynchronous operation. */
+ override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+ val message = s"$topic-$msg"
+ receiver ! message
+
+ Future.successful(
+ new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+ }
+
+ /** Closes producer. */
+ override def close(): Unit = {}
+ }
+
+ def expectGetInvokers(etcd: EtcdClient, invokers: List[InvokerHealth] = invokers): Unit = {
+ (etcd
+ .getPrefix(_: String))
+ .expects(InvokerKeys.prefix)
+ .returning(Future.successful {
+ invokers
+ .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+ val msg = InvokerResourceMessage(
+ invoker.status.asString,
+ invoker.id.userMemory.toMB,
+ invoker.id.userMemory.toMB,
+ invoker.id.userMemory.toMB,
+ invoker.id.tags,
+ invoker.id.dedicatedNamespaces)
+
+ builder.addKvs(
+ KeyValue
+ .newBuilder()
+ .setKey(InvokerKeys.health(invoker.id))
+ .setValue(msg.toString)
+ .build())
+ }
+ .build()
+ })
+ }
+
+ /** Registers the transition callback and expects the first message */
+ def registerCallback(c: ActorRef) = {
+ c ! SubscribeTransitionCallBack(testActor)
+ expectMsg(CurrentState(c, Uninitialized))
+ }
+
+ def factory(t: TestProbe)(f: ActorRefFactory): ActorRef = t.ref
+
+ behavior of "ContainerManager"
+
+ it should "create container" in {
+ val mockEtcd = mock[EtcdClient]
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(*)
+ .returning(Future.successful {
+ RangeResponse.newBuilder().build()
+ })
+ expectGetInvokers(mockEtcd)
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+
+ val manager =
+ system.actorOf(
+ ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val msgs = List(msg1, msg2, msg3)
+ val creationMsg = ContainerCreation(msgs, testMemory, testInvocationNamespace)
+
+ manager ! creationMsg
+
+ mockJobManager.expectMsgPF() {
+ case RegisterCreationJob(`msg1`) => true
+ case RegisterCreationJob(`msg2`) => true
+ case RegisterCreationJob(`msg3`) => true
+ }
+ }
+
+ it should "try warmed containers first" in {
+ val mockEtcd = mock[EtcdClient]
+
+ // for test, only invoker2 is healthy, so that no-warmed creations can be only sent to invoker2
+ val invokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ )
+ expectGetInvokers(mockEtcd, invokers)
+ expectGetInvokers(mockEtcd, invokers)
+ expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+ val receiver = TestProbe()
+
+ val manager =
+ system.actorOf(ContainerManager
+ .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+ // there are 1 warmed container for `test-namespace/test-action` and 1 for `test-namespace/test-action-2`
+ manager ! WatchEndpointInserted(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ InvokerInstanceId(0, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+ manager ! WatchEndpointInserted(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ InvokerInstanceId(1, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.copy(name = EntityName("test-action-2")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msgs = List(msg1, msg2, msg3)
+
+ // it should reuse 2 warmed containers
+ manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
+
+ // ignore warmUp message
+ receiver.ignoreMsg {
+ case s: String => s.contains("warmUp")
+ }
+
+ // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
+ receiver.expectMsg(s"invoker0-$msg1")
+ receiver.expectMsg(s"invoker1-$msg2")
+ receiver.expectMsg(s"invoker2-$msg3")
+
+ mockJobManager.expectMsgPF() {
+ case RegisterCreationJob(`msg1`) => true
+ case RegisterCreationJob(`msg2`) => true
+ case RegisterCreationJob(`msg3`) => true
+ }
+
+ // now warmed container for action2 become warmed again
+ manager ! SuccessfulCreationJob(msg2.creationId, msg2.invocationNamespace, msg2.action, msg2.revision)
+ manager ! SuccessfulCreationJob(msg3.creationId, msg3.invocationNamespace, msg3.action, msg3.revision)
+ // it still need to use invoker2
+ manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+ receiver.expectMsg(s"invoker2-$msg1")
+ // it will use warmed container on invoker1
+ manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
+ receiver.expectMsg(s"invoker1-$msg2")
+
+ // warmed container for action1 become warmed
+ manager ! SuccessfulCreationJob(msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision)
+ manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+ receiver.expectMsg(s"invoker0-$msg1")
+ }
+
+ it should "rescheduling container creation" in {
+ val mockEtcd = mock[EtcdClient]
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(*)
+ .returning(Future.successful {
+ RangeResponse.newBuilder().build()
+ })
+ expectGetInvokers(mockEtcd)
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+
+ val manager =
+ system.actorOf(
+ ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+ val reschedulingMsg =
+ ReschedulingCreationJob(
+ TransactionId.testing,
+ testCreationId,
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ actionMetadata,
+ schedulerHost,
+ rpcPort,
+ 0)
+
+ val creationMsg = reschedulingMsg.toCreationMessage(testsid, reschedulingMsg.retry + 1)
+
+ manager ! reschedulingMsg
+
+ mockJobManager.expectMsg(RegisterCreationJob(creationMsg))
+ }
+
+ it should "forward GracefulShutdown to creation job manager" in {
+ val mockEtcd = mock[EtcdClient]
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(*)
+ .returning(Future.successful {
+ RangeResponse.newBuilder().build()
+ })
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+
+ val manager =
+ system.actorOf(
+ ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+ manager ! GracefulShutdown
+
+ mockJobManager.expectMsg(GracefulShutdown)
+ }
+
+ it should "generate random number less than mod" in {
+ val mod = 10
+ (1 to 100).foreach(_ => {
+ val num = ContainerManager.rng(mod)
+ num should be < mod
+ })
+ }
+
+ it should "choice invokers" in {
+ val healthyInvokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB), Healthy))
+
+ val minMemory = 512.MB
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns1")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns2")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns3")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msgs = List(msg1, msg2, msg3)
+
+ val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
+
+ pairs.map(_.msg) should contain theSameElementsAs msgs
+ pairs.map(_.invokerId).foreach {
+ healthyInvokers.map(_.id) should contain(_)
+ }
+ }
+
+ it should "choose invoker even if there is only one invoker" in {
+ val healthyInvokers: List[InvokerHealth] = List(InvokerHealth(InvokerInstanceId(0, userMemory = 1024.MB), Healthy))
+
+ val minMemory = 128.MB
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns1")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns2")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns3")),
+ testRevision,
+ actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msgs = List(msg1, msg2, msg3)
+
+ val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
+
+ pairs.map(_.msg) should contain theSameElementsAs msgs
+ pairs.map(_.invokerId.instance).foreach {
+ healthyInvokers.map(_.id.instance) should contain(_)
+ }
+ }
+
+ it should "filter invokers based on tags" in {
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns1")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("cpu"), JsString("memory")))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns2")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("memory"), JsString("disk")))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns3")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("disk"), JsString("cpu")))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg4 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns4")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg5 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns5")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("fake"))) ++ Parameters(
+ Annotations.InvokerResourcesStrictPolicyAnnotationName,
+ JsBoolean(true))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val probe = TestProbe()
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+ MemoryQueueValue(probe.ref, true))
+
+ val healthyInvokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("memory", "disk")), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB, tags = Seq("disk", "cpu")), Healthy),
+ InvokerHealth(InvokerInstanceId(3, userMemory = 512.MB), Healthy))
+
+ // for msg1/2/3 we choose the exact invokers for them, for msg4, we choose no tagged invokers first, here is the invoker3
+ // for msg5, there is no available invokers, and the resource strict policy is true, so return an error
+ val pairs = ContainerManager.schedule(
+ healthyInvokers,
+ List(msg1, msg2, msg3, msg4, msg5),
+ msg1.whiskActionMetaData.limits.memory.megabytes.MB) // the memory is same for all msgs
+ pairs should contain theSameElementsAs List(
+ ScheduledPair(msg1, healthyInvokers(0).id),
+ ScheduledPair(msg2, healthyInvokers(1).id),
+ ScheduledPair(msg3, healthyInvokers(2).id),
+ ScheduledPair(msg4, healthyInvokers(3).id))
+ probe.expectMsg(
+ FailedCreationJob(
+ msg5.creationId,
+ testInvocationNamespace,
+ msg5.action,
+ testRevision,
+ NoAvailableResourceInvokersError,
+ "No available invokers with resources List(fake)."))
+ }
+
+ it should "respect the resource policy while use resource filter" in {
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns1")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+ Annotations.InvokerResourcesStrictPolicyAnnotationName,
+ JsBoolean(true))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns2")),
+ testRevision,
+ actionMetadata.copy(
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+ Annotations.InvokerResourcesStrictPolicyAnnotationName,
+ JsBoolean(false))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg3 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns3")),
+ testRevision,
+ actionMetadata.copy(
+ limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+ Annotations.InvokerResourcesStrictPolicyAnnotationName,
+ JsBoolean(false))),
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val probe = TestProbe()
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+ MemoryQueueValue(probe.ref, true))
+ val healthyInvokers: List[InvokerHealth] =
+ List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy))
+
+ // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
+ val pairs =
+ ContainerManager.schedule(healthyInvokers, List(msg1), msg1.whiskActionMetaData.limits.memory.megabytes.MB)
+ pairs.size shouldBe 0
+ probe.expectMsg(
+ FailedCreationJob(
+ msg1.creationId,
+ testInvocationNamespace,
+ msg1.action,
+ testRevision,
+ NoAvailableResourceInvokersError,
+ "No available invokers with resources List(non-exist)."))
+
+ // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
+ // here is the invoker0
+ val pairs2 =
+ ContainerManager.schedule(healthyInvokers, List(msg2), msg2.whiskActionMetaData.limits.memory.megabytes.MB)
+ pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
+
+ // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
+ // if there is none, then choose other invokers, here is the invoker1
+ val pairs3 = ContainerManager.schedule(
+ healthyInvokers.takeRight(1),
+ List(msg3),
+ msg3.whiskActionMetaData.limits.memory.megabytes.MB)
+ pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
+ }
+
+ it should "send FailedCreationJob to queue manager when no invokers are available" in {
+ val mockEtcd = mock[EtcdClient]
+ val probe = TestProbe()
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(InvokerKeys.prefix)
+ .returning(Future.successful {
+ RangeResponse.newBuilder().build()
+ })
+ .twice()
+
+ val fqn = FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction))
+
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, fqn.toDocId.asDocInfo(testRevision)),
+ MemoryQueueValue(probe.ref, true))
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+
+ val manager =
+ system.actorOf(
+ ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+ val msg =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ fqn,
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ manager ! ContainerCreation(List(msg), testMemory, testInvocationNamespace)
+ probe.expectMsg(
+ FailedCreationJob(
+ msg.creationId,
+ testInvocationNamespace,
+ msg.action,
+ testRevision,
+ NoAvailableInvokersError,
+ "No available invokers."))
+ }
+
+ it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in {
+ stream.reset()
+ val mockEtcd = mock[EtcdClient]
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(*)
+ .returning(Future.successful {
+ RangeResponse.newBuilder().build()
+ })
+ expectGetInvokers(mockEtcd)
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+
+ val manager =
+ system.actorOf(
+ ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+ val exec = BlackBoxExec(ExecManifest.ImageName("image"), None, None, native = false, binary = false)
+ val action = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+ val execMetadata = BlackBoxExecMetaData(exec.image, exec.entryPoint, exec.native, exec.binary)
+ val actionMetadata =
+ WhiskActionMetaData(
+ action.namespace,
+ action.name,
+ execMetadata,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations)
+
+ val msg1 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val msgs = List(msg1)
+ val creationMsg = ContainerCreation(msgs, testMemory, testInvocationNamespace)
+
+ manager ! creationMsg
+
+ mockJobManager.expectMsgPF() {
+ case RegisterCreationJob(`msg1`) => true
+ }
+
+ Thread.sleep(1000)
+
+ // blackbox invoker number = 10 * 0.1 = 1, so the last blackbox invoker will be scheduled
+ // Because the debugging invoker is excluded, it sends a message to invoker9.
+ stream.toString should include(s"posting to invoker9")
+ }
+
+ it should "delete container" in {
+ val mockEtcd = mock[EtcdClient]
+ val invokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ )
+ val fqn = FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction))
+
+ expectGetInvokers(mockEtcd, invokers)
+
+ // both warmed and existing containers are in all invokers.
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(containerPrefix(ContainerKeys.namespacePrefix, testInvocationNamespace, fqn))
+ .returning(Future.successful {
+ invokers
+ .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+ builder.addKvs(
+ KeyValue
+ .newBuilder()
+ .setKey(
+ ContainerKeys.existingContainers(
+ testInvocationNamespace,
+ fqn,
+ testRevision,
+ Some(invoker.id),
+ Some(ContainerId("testContainer"))))
+ .build())
+ }
+ .build()
+ })
+
+ (mockEtcd
+ .getPrefix(_: String))
+ .expects(containerPrefix(ContainerKeys.warmedPrefix, testInvocationNamespace, fqn))
+ .returning(Future.successful {
+ invokers
+ .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+ builder.addKvs(KeyValue
+ .newBuilder()
+ .setKey(ContainerKeys
+ .warmedContainers(testInvocationNamespace, fqn, testRevision, invoker.id, ContainerId("testContainer")))
+ .build())
+ }
+ .build()
+ })
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+ val receiver = TestProbe()
+
+ val manager =
+ system.actorOf(ContainerManager
+ .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+ val msg = ContainerDeletionMessage(
+ TransactionId.containerDeletion,
+ testInvocationNamespace,
+ fqn,
+ testRevision,
+ actionMetadata)
+ val deletionMessage = ContainerDeletion(testInvocationNamespace, fqn, testRevision, actionMetadata)
+
+ manager ! deletionMessage
+
+ val expectedMsgs = invokers.map(i => s"invoker${i.id.instance}-$msg")
+
+ receiver.expectMsgPF() {
+ case msg: String if msg.contains("warmUp") => true
+ case msg: String => expectedMsgs.contains(msg)
+ case msg =>
+ println(s"unexpected message: $msg")
+ fail()
+ }
+ }
+
+ it should "allow managed partition to overlap with blackbox for small N" in {
+ Seq((0.1, 0.9), (0.2, 0.8), (0.3, 0.7), (0.4, 0.6), (0.5, 0.5)).foreach { fraction =>
+ val blackboxFraction = fraction._1
+ val managedFraction = fraction._2
+
+ (1 to 100).toSeq.foreach { i =>
+ val m = Math.max(1, Math.ceil(i.toDouble * managedFraction).toInt)
+ val b = Math.max(1, Math.floor(i.toDouble * blackboxFraction).toInt)
+
+ m should be <= i
+ b shouldBe Math.max(1, (blackboxFraction * i).toInt)
+
+ blackboxFraction match {
+ case 0.1 if i < 10 => m + b shouldBe i + 1
+ case 0.2 if i < 5 => m + b shouldBe i + 1
+ case 0.3 if i < 4 => m + b shouldBe i + 1
+ case 0.4 if i < 3 => m + b shouldBe i + 1
+ case 0.5 if i < 2 => m + b shouldBe i + 1
+ case _ => m + b shouldBe i
+ }
+ }
+ }
+ }
+
+ it should "return the same pools if managed- and blackbox-pools are overlapping" in {
+ val blackboxFraction = 1.0
+ val managedFraction = 1.0
+ val totalInvokerSize = 100
+ var result = mutable.Buffer[InvokerHealth]()
+ (1 to totalInvokerSize).foreach { i =>
+ result = result :+ InvokerHealth(InvokerInstanceId(i, userMemory = 256.MB), Healthy)
+ }
+
+ val m = Math.max(1, Math.ceil(totalInvokerSize.toDouble * managedFraction).toInt)
+ val b = Math.max(1, Math.floor(totalInvokerSize.toDouble * blackboxFraction).toInt)
+
+ m shouldBe totalInvokerSize
+ b shouldBe totalInvokerSize
+
+ result.take(m) shouldBe result.takeRight(b)
+ }
+}
+
+@RunWith(classOf[JUnitRunner])
+class ContainerManager2Tests
+ extends FlatSpecLike
+ with Matchers
+ with StreamLogging
+ with ExecHelpers
+ with MockFactory
+ with ScalaFutures
+ with WskActorSystem
+ with BeforeAndAfterEach
+ with DbUtils {
+
+ implicit val dispatcher = actorSystem.dispatcher
+ val etcdClient = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+ val testInvocationNamespace = "test-invocation-namespace"
+
+ override def afterAll(): Unit = {
+ etcdClient.close()
+ super.afterAll()
+ }
+
+ it should "load invoker from specified clusterName only" in {
+ val clusterName1 = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+ val clusterName2 = "clusterName2"
+ val invokerResourceMessage =
+ InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq.empty[String])
+ etcdClient.put(s"${clusterName1}/invokers/0", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName1}/invokers/1", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName1}/invokers/2", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName2}/invokers/3", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName2}/invokers/4", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName2}/invokers/5", invokerResourceMessage.serialize)
+ // Make sure store above data in etcd
+ Thread.sleep(5.seconds.toMillis)
+ ContainerManager.getAvailableInvokers(etcdClient, 0.MB, testInvocationNamespace).map { invokers =>
+ invokers.length shouldBe 3
+ invokers.foreach { invokerHealth =>
+ List(0, 1, 2) should contain(invokerHealth.id.instance)
+ }
+ }
+ // Delete etcd data finally
+ List(
+ s"${clusterName1}/invokers/0",
+ s"${clusterName1}/invokers/1",
+ s"${clusterName1}/invokers/2",
+ s"${clusterName2}/invokers/3",
+ s"${clusterName2}/invokers/4",
+ s"${clusterName2}/invokers/5").foreach(etcdClient.del(_))
+ }
+
+ it should "load invoker from specified invocation namespace only" in {
+ val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+ val invokerResourceMessage =
+ InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq.empty[String])
+ val invokerResourceMessage2 =
+ InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq(testInvocationNamespace))
+ etcdClient.put(s"${clusterName}/invokers/0", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName}/invokers/1", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName}/invokers/2", invokerResourceMessage.serialize)
+ etcdClient.put(s"${clusterName}/invokers/3", invokerResourceMessage2.serialize)
+ etcdClient.put(s"${clusterName}/invokers/4", invokerResourceMessage2.serialize)
+ etcdClient.put(s"${clusterName}/invokers/5", invokerResourceMessage2.serialize)
+ // Make sure store above data in etcd
+ Thread.sleep(5.seconds.toMillis)
+ ContainerManager.getAvailableInvokers(etcdClient, 0.MB, testInvocationNamespace).map { invokers =>
+ invokers.length shouldBe 6
+ invokers.foreach { invokerHealth =>
+ List(0, 1, 2, 3, 4, 5) should contain(invokerHealth.id.instance)
+ }
+ }
+
+ // this new namespace should not use invoker3/4/5
+ ContainerManager.getAvailableInvokers(etcdClient, 0.MB, "new-namespace").map { invokers =>
+ invokers.length shouldBe 3
+ invokers.foreach { invokerHealth =>
+ List(0, 1, 2) should contain(invokerHealth.id.instance)
+ }
+ }
+ // Delete etcd data finally
+ List(
+ s"${clusterName}/invokers/0",
+ s"${clusterName}/invokers/1",
+ s"${clusterName}/invokers/2",
+ s"${clusterName}/invokers/3",
+ s"${clusterName}/invokers/4",
+ s"${clusterName}/invokers/5").foreach(etcdClient.del(_))
+ }
+}