[New Scheduler] Manage container creation (#5074)

* Manage container creation

* Add ContainerManager Test

* Add dedicated namespace

* Remove namespace

* Apply scala fmt

* Add dedicatedNamespaces filter

* Add dedicatedNamespaces test

* Move InvokerState to common

* Unify InvokerHealth message

* Add configuration for test

* Add license header

* Remove compare InvokerResourceMessage
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 6851c70..1467639 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -456,5 +456,6 @@
                       {{ ret | join(',') }}"
 
 scheduler:
+  protocol: "{{ scheduler_protocol | default('http') }}"
   dataManagementService:
     retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index afce8fb..a710d60 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -560,9 +560,11 @@
       LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
     else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)
 
+  // Time that is needed to produce message in kafka
+  val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
+
   def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
-
   /*
    * General markers
    */
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
index e425105..433505d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
@@ -17,5 +17,45 @@
 
 package org.apache.openwhisk.common
 
+import org.apache.openwhisk.core.entity.InvokerInstanceId
+
 case object GracefulShutdown
 case object Enable
+
+// States an Invoker can be in
+sealed trait InvokerState {
+  val asString: String
+  val isUsable: Boolean
+}
+
+object InvokerState {
+  // Invokers in this state can be used to schedule workload to
+  sealed trait Usable extends InvokerState { val isUsable = true }
+  // No workload should be scheduled to invokers in this state
+  sealed trait Unusable extends InvokerState { val isUsable = false }
+
+  // A completely healthy invoker, pings arriving fine, no system errors
+  case object Healthy extends Usable { val asString = "up" }
+  // The invoker can not create a container
+  case object Unhealthy extends Unusable { val asString = "unhealthy" }
+  // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
+  case object Unresponsive extends Unusable { val asString = "unresponsive" }
+  // The invoker is down
+  case object Offline extends Unusable { val asString = "down" }
+}
+
+/**
+ * Describes an abstract invoker. An invoker is a local container pool manager that
+ * is in charge of the container life cycle management.
+ *
+ * @param id a unique instance identifier for the invoker
+ * @param status it status (healthy, unhealthy, unresponsive, offline)
+ */
+case class InvokerHealth(id: InvokerInstanceId, status: InvokerState) {
+  override def equals(obj: scala.Any): Boolean = obj match {
+    case that: InvokerHealth => that.id == this.id && that.status == this.status
+    case _                   => false
+  }
+
+  override def toString = s"InvokerHealth($id, $status)"
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index f7df2d0..5ce0f44 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -232,11 +232,11 @@
 
   val systemPrefix = "sid_"
 
-  var containerCreation = TransactionId(systemPrefix + "containerCreation")
   val unknown = TransactionId(systemPrefix + "unknown")
   val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
   val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
   val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
+  def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
   val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
   val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
   val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
@@ -245,7 +245,9 @@
   val controller = TransactionId(systemPrefix + "controller") // Controller startup
   val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
   val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
-  def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
+  var containerCreation = TransactionId(systemPrefix + "containerCreation")
+  var containerDeletion = TransactionId(systemPrefix + "containerDeletion")
+  val warmUp = TransactionId(systemPrefix + "warmUp")
 
   private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
new file mode 100644
index 0000000..cd2e205
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
+import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
+import org.apache.openwhisk.core.entity._
+
+object WarmUp {
+  val warmUpActionIdentity = {
+    val whiskSystem = "whisk.system"
+    val uuid = UUID()
+    Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
+  }
+
+  private val actionName = "warmUp"
+
+  // this action doesn't need to be in database
+  val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))
+
+  def warmUpActivation(controller: ControllerInstanceId) = {
+    ActivationMessage(
+      transid = TransactionId.warmUp,
+      action = warmUpAction,
+      revision = DocRevision.empty,
+      user = warmUpActionIdentity,
+      activationId = new ActivationIdGenerator {}.make(),
+      rootControllerIndex = controller,
+      blocking = false,
+      content = None,
+      initArgs = Set.empty)
+  }
+
+  def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
+    ExecManifest.runtimesManifest
+      .resolveDefaultRuntime("nodejs:default")
+      .map { manifest =>
+        val metadata = WhiskActionMetaData(
+          warmUpAction.path,
+          warmUpAction.name,
+          CodeExecMetaDataAsString(manifest, false, entryPoint = None))
+        ContainerCreationMessage(
+          TransactionId.warmUp,
+          warmUpActionIdentity.namespace.name.toString,
+          warmUpAction,
+          DocRevision.empty,
+          metadata,
+          scheduler,
+          "",
+          0)
+      }
+
+  def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
+    fqn == warmUpAction
+  }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 1058e53..c7ec206 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -203,6 +203,7 @@
 object ConfigKeys {
   val cluster = "whisk.cluster"
   val loadbalancer = "whisk.loadbalancer"
+  val fraction = "whisk.fraction"
   val buildInformation = "whisk.info"
 
   val couchdb = "whisk.couchdb"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
index 96ae58f..6655496 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala
@@ -24,4 +24,6 @@
   val RawHttpAnnotationName = "raw-http"
   val RequireWhiskAuthAnnotation = "require-whisk-auth"
   val ProvideApiKeyAnnotationName = "provide-api-key"
+  val InvokerResourcesAnnotationName = "invoker-resources"
+  val InvokerResourcesStrictPolicyAnnotationName = "invoker-resources-strict-policy"
 }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index a8ead9d..f3af376 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -26,12 +26,8 @@
 import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import kamon.Kamon
-import pureconfig._
-import pureconfig.generic.auto._
-import spray.json.DefaultJsonProtocol._
-import spray.json._
 import org.apache.openwhisk.common.Https.HttpsConfig
-import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.connector.MessagingProvider
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
@@ -40,13 +36,17 @@
 import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
 import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
 import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import pureconfig.generic.auto._
 
+import scala.concurrent.Await
 import scala.concurrent.ExecutionContext.Implicits
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.Await
 import scala.util.{Failure, Success}
 
 /**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 8422491..4f4c8ad 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -43,28 +43,6 @@
 
 case object Tick
 
-// States an Invoker can be in
-sealed trait InvokerState {
-  val asString: String
-  val isUsable: Boolean
-}
-
-object InvokerState {
-  // Invokers in this state can be used to schedule workload to
-  sealed trait Usable extends InvokerState { val isUsable = true }
-  // No workload should be scheduled to invokers in this state
-  sealed trait Unusable extends InvokerState { val isUsable = false }
-
-  // A completely healthy invoker, pings arriving fine, no system errors
-  case object Healthy extends Usable { val asString = "up" }
-  // Pings are arriving fine, the invoker returns system errors though
-  case object Unhealthy extends Unusable { val asString = "unhealthy" }
-  // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
-  case object Unresponsive extends Unusable { val asString = "unresponsive" }
-  // Pings are not arriving for this invoker
-  case object Offline extends Unusable { val asString = "down" }
-}
-
 // Possible answers of an activation
 sealed trait InvocationFinishedResult
 object InvocationFinishedResult {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index ee839b3..4c87615 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -17,34 +17,18 @@
 
 package org.apache.openwhisk.core.loadBalancer
 
-import scala.concurrent.Future
 import akka.actor.{ActorRefFactory, ActorSystem, Props}
 import akka.stream.ActorMaterializer
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.controller.Controller
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.spi.Spi
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
-/**
- * Describes an abstract invoker. An invoker is a local container pool manager that
- * is in charge of the container life cycle management.
- *
- * @param id a unique instance identifier for the invoker
- * @param status it status (healthy, unhealthy, offline)
- */
-class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
-  override def equals(obj: scala.Any): Boolean = obj match {
-    case that: InvokerHealth => that.id == this.id && that.status == this.status
-    case _                   => false
-  }
-
-  override def toString = s"InvokerHealth($id, $status)"
-}
-
 trait LoadBalancer {
 
   /**
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 14d3ff4..1aaf68f 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -28,6 +28,7 @@
 import akka.management.cluster.bootstrap.ClusterBootstrap
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
 import pureconfig._
 import pureconfig.generic.auto._
 import org.apache.openwhisk.common._
@@ -37,7 +38,6 @@
 import org.apache.openwhisk.core.entity.size.SizeLong
 import org.apache.openwhisk.common.LoggingMarkers._
 import org.apache.openwhisk.core.controller.Controller
-import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.spi.SpiLoader
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index 71ab9a0..54bd174 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -20,6 +20,7 @@
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
 import akka.util.Timeout
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool.ContainerRemoved
@@ -350,23 +351,6 @@
   }
 }
 
-// States an Invoker can be in
-sealed trait InvokerState {
-  val asString: String
-}
-
-case object Offline extends InvokerState {
-  val asString = "down"
-}
-
-case object Healthy extends InvokerState {
-  val asString = "up"
-}
-
-case object Unhealthy extends InvokerState {
-  val asString = "unhealthy"
-}
-
 //recevied from ContainerProxy actor
 case class HealthMessage(state: Boolean)
 
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
new file mode 100644
index 0000000..fc62ef0
--- /dev/null
+++ b/core/scheduler/src/main/resources/application.conf
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+whisk{
+  # tracing configuration
+  tracing {
+    component = "Scheduler"
+  }
+
+  fraction {
+    managed-fraction: 90%
+    blackbox-fraction: 10%
+  }
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
new file mode 100644
index 0000000..e8ce26f
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.event.Logging.InfoLevel
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.connector.ContainerCreationError.{
+  NoAvailableInvokersError,
+  NoAvailableResourceInvokersError
+}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{
+  Annotations,
+  ByteSize,
+  DocInfo,
+  DocRevision,
+  FullyQualifiedEntityName,
+  InvokerInstanceId,
+  MemoryLimit,
+  SchedulerInstanceId
+}
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.scheduler.Scheduler
+import org.apache.openwhisk.core.scheduler.message.{
+  ContainerCreation,
+  ContainerDeletion,
+  ContainerKeyMeta,
+  CreationJobState,
+  FailedCreationJob,
+  RegisterCreationJob,
+  ReschedulingCreationJob,
+  SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.service.{
+  DeleteEvent,
+  PutEvent,
+  UnwatchEndpoint,
+  WatchEndpoint,
+  WatchEndpointInserted,
+  WatchEndpointRemoved
+}
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import pureconfig.generic.auto._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
+import scala.collection.concurrent.TrieMap
+import scala.util.{Failure, Success}
+
+case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId)
+
+case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double)
+
+class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
+                       provider: MessagingProvider,
+                       schedulerInstanceId: SchedulerInstanceId,
+                       etcdClient: EtcdClient,
+                       config: WhiskConfig,
+                       watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
+    extends Actor {
+  private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+  private val creationJobManager = jobManagerFactory(context)
+
+  private val messagingProducer = provider.getProducer(config)
+
+  private var warmedContainers = Set.empty[String]
+
+  private val warmedInvokers = TrieMap[Int, String]()
+
+  private val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+  private val warmKey = ContainerKeys.warmedPrefix
+  private val invokerKey = InvokerKeys.prefix
+  private val watcherName = s"container-manager"
+
+  watcherService ! WatchEndpoint(warmKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+  watcherService ! WatchEndpoint(invokerKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+
+  override def receive: Receive = {
+    case ContainerCreation(msgs, memory, invocationNamespace) =>
+      createContainer(msgs, memory, invocationNamespace)
+
+    case ContainerDeletion(invocationNamespace, fqn, revision, whiskActionMetaData) =>
+      getInvokersWithOldContainer(invocationNamespace, fqn, revision)
+        .map { invokers =>
+          val msg = ContainerDeletionMessage(
+            TransactionId.containerDeletion,
+            invocationNamespace,
+            fqn,
+            revision,
+            whiskActionMetaData)
+          invokers.foreach(sendDeletionContainerToInvoker(messagingProducer, _, msg))
+        }
+
+    case rescheduling: ReschedulingCreationJob =>
+      val msg = rescheduling.toCreationMessage(schedulerInstanceId, rescheduling.retry + 1)
+      createContainer(
+        List(msg),
+        rescheduling.actionMetaData.limits.memory.megabytes.MB,
+        rescheduling.invocationNamespace)
+
+    case WatchEndpointInserted(watchKey, key, _, true) =>
+      watchKey match {
+        case `warmKey` => warmedContainers += key
+        case `invokerKey` =>
+          val invoker = InvokerKeys.getInstanceId(key)
+          warmedInvokers.getOrElseUpdate(invoker.instance, {
+            warmUpInvoker(invoker)
+            invoker.toString
+          })
+      }
+
+    case WatchEndpointRemoved(watchKey, key, _, true) =>
+      watchKey match {
+        case `warmKey` => warmedContainers -= key
+        case `invokerKey` =>
+          val invoker = InvokerKeys.getInstanceId(key)
+          warmedInvokers.remove(invoker.instance)
+      }
+
+    case FailedCreationJob(cid, _, _, _, _, _) =>
+      inProgressWarmedContainers.remove(cid.asString)
+
+    case SuccessfulCreationJob(cid, _, _, _) =>
+      inProgressWarmedContainers.remove(cid.asString)
+
+    case GracefulShutdown =>
+      watcherService ! UnwatchEndpoint(warmKey, isPrefix = true, watcherName)
+      watcherService ! UnwatchEndpoint(invokerKey, isPrefix = true, watcherName)
+      creationJobManager ! GracefulShutdown
+
+    case _ =>
+  }
+
+  private def createContainer(msgs: List[ContainerCreationMessage],
+                              memory: ByteSize,
+                              invocationNamespace: String): Unit = {
+    logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
+    val coldCreations = filterWarmedCreations(msgs)
+    if (coldCreations.nonEmpty)
+      ContainerManager
+        .getAvailableInvokers(etcdClient, memory, invocationNamespace)
+        .flatMap { invokers =>
+          if (invokers.isEmpty) {
+            coldCreations.foreach { msg =>
+              ContainerManager.sendState(
+                FailedCreationJob(
+                  msg.creationId,
+                  msg.invocationNamespace,
+                  msg.action,
+                  msg.revision,
+                  NoAvailableInvokersError,
+                  s"No available invokers."))
+            }
+            Future.failed(NoCapacityException("No available invokers."))
+          } else {
+            coldCreations.foreach { msg =>
+              creationJobManager ! RegisterCreationJob(msg)
+            }
+
+            Future {
+              ContainerManager
+                .schedule(invokers, coldCreations, memory)
+                .map { pair =>
+                  sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg)
+                }
+            }
+          }.andThen {
+            case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t")
+          }
+        }
+  }
+
+  private def getInvokersWithOldContainer(invocationNamespace: String,
+                                          fqn: FullyQualifiedEntityName,
+                                          currentRevision: DocRevision): Future[List[Int]] = {
+    val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, fqn)
+    val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, invocationNamespace, fqn)
+
+    for {
+      existing <- etcdClient
+        .getPrefix(namespacePrefix)
+        .map { res =>
+          res.getKvsList.asScala.map { kv =>
+            parseExistingContainerKey(namespacePrefix, kv.getKey)
+          }
+        }
+      warmed <- etcdClient
+        .getPrefix(warmedPrefix)
+        .map { res =>
+          res.getKvsList.asScala.map { kv =>
+            parseWarmedContainerKey(warmedPrefix, kv.getKey)
+          }
+        }
+    } yield {
+      (existing ++ warmed)
+        .dropWhile(k => k.revision > currentRevision) // remain latest revision
+        .groupBy(k => k.invokerId) // remove duplicated value
+        .map(_._2.head.invokerId)
+        .toList
+    }
+  }
+
+  /**
+   * existingKey format: {tag}/namespace/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker{id}/container/{containerId}
+   */
+  private def parseExistingContainerKey(prefix: String, existingKey: String): ContainerKeyMeta = {
+    val keys = existingKey.replace(prefix, "").split("/")
+    val revision = DocRevision(keys(0))
+    val invokerId = keys(1).replace("invoker", "").toInt
+    val containerId = keys(3)
+    ContainerKeyMeta(revision, invokerId, containerId)
+  }
+
+  /**
+   * warmedKey format: {tag}/warmed/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker/{id}/container/{containerId}
+   */
+  private def parseWarmedContainerKey(prefix: String, warmedKey: String): ContainerKeyMeta = {
+    val keys = warmedKey.replace(prefix, "").split("/")
+    val revision = DocRevision(keys(0))
+    val invokerId = keys(2).toInt
+    val containerId = keys(4)
+    ContainerKeyMeta(revision, invokerId, containerId)
+  }
+
+  // Filter out messages which can use warmed container
+  private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
+    msgs.filter { msg =>
+      val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action)
+      val chosenInvoker = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .find { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+            inProgressWarmedContainers.update(msg.creationId.asString, container)
+            true
+          } else
+            false
+        }
+        .map(_.split("/").takeRight(3).apply(0))
+      if (chosenInvoker.nonEmpty) {
+        creationJobManager ! RegisterCreationJob(msg)
+        sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
+        false
+      } else
+        true
+    }
+  }
+
+  private def sendCreationContainerToInvoker(producer: MessageProducer,
+                                             invoker: Int,
+                                             msg: ContainerCreationMessage): Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"${Scheduler.topicPrefix}invoker$invoker"
+    val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(
+          this,
+          start,
+          s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
+            .topic()}[${status.partition()}][${status.offset()}]",
+          logLevel = InfoLevel)
+      case Failure(_) =>
+        logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
+        transid.failed(this, start, s"error on posting to topic $topic")
+    }
+  }
+
+  private def sendDeletionContainerToInvoker(producer: MessageProducer,
+                                             invoker: Int,
+                                             msg: ContainerDeletionMessage): Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"${Scheduler.topicPrefix}invoker$invoker"
+    val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic")
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(
+          this,
+          start,
+          s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
+            .topic()}[${status.partition()}][${status.offset()}]",
+          logLevel = InfoLevel)
+      case Failure(_) =>
+        logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
+        transid.failed(this, start, s"error on posting to topic $topic")
+    }
+  }
+
+  private def warmUpInvoker(invoker: InvokerInstanceId): Unit = {
+    logging.info(this, s"Warm up invoker $invoker")
+    WarmUp.warmUpContainerCreationMessage(schedulerInstanceId).foreach {
+      sendCreationContainerToInvoker(messagingProducer, invoker.instance, _)
+    }
+  }
+
+  // warm up all invokers
+  private def warmUp() = {
+    // warm up exist invokers
+    ContainerManager.getAvailableInvokers(etcdClient, MemoryLimit.MIN_MEMORY).map { invokers =>
+      invokers.foreach { invoker =>
+        warmedInvokers.getOrElseUpdate(invoker.id.instance, {
+          warmUpInvoker(invoker.id)
+          invoker.id.toString
+        })
+      }
+    }
+
+  }
+
+  warmUp()
+}
+
+object ContainerManager {
+  val fractionConfig: BlackboxFractionConfig =
+    loadConfigOrThrow[BlackboxFractionConfig](ConfigKeys.fraction)
+
+  private val managedFraction: Double = Math.max(0.0, Math.min(1.0, fractionConfig.managedFraction))
+  private val blackboxFraction: Double = Math.max(1.0 - managedFraction, Math.min(1.0, fractionConfig.blackboxFraction))
+
+  def props(jobManagerFactory: ActorRefFactory => ActorRef,
+            provider: MessagingProvider,
+            schedulerInstanceId: SchedulerInstanceId,
+            etcdClient: EtcdClient,
+            config: WhiskConfig,
+            watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging): Props =
+    Props(new ContainerManager(jobManagerFactory, provider, schedulerInstanceId, etcdClient, config, watcherService))
+
+  /**
+   * The rng algorithm is responsible for the invoker distribution, and the better the distribution, the smaller the number of rescheduling.
+   *
+   */
+  def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
+
+  /**
+   * Assign an invoker to a message
+   *
+   * Assumption
+   *  - The memory of each invoker is larger than minMemory.
+   *  - Messages that are not assigned an invoker are discarded.
+   *
+   * @param invokers Available invoker pool
+   * @param msgs Messages to which the invoker will be assigned
+   * @param minMemory Minimum memory for all invokers
+   * @return A pair of messages and assigned invokers
+   */
+  def schedule(invokers: List[InvokerHealth], msgs: List[ContainerCreationMessage], minMemory: ByteSize)(
+    implicit logging: Logging): List[ScheduledPair] = {
+    logging.info(this, s"usable total invoker size: ${invokers.size}")
+    val noTaggedInvokers = invokers.filter(_.id.tags.isEmpty)
+    val managed = Math.max(1, Math.ceil(noTaggedInvokers.size.toDouble * managedFraction).toInt)
+    val blackboxes = Math.max(1, Math.floor(noTaggedInvokers.size.toDouble * blackboxFraction).toInt)
+    val managedInvokers = noTaggedInvokers.take(managed)
+    val blackboxInvokers = noTaggedInvokers.takeRight(blackboxes)
+    logging.info(
+      this,
+      s"${msgs.size} creation messages for ${msgs.head.invocationNamespace}/${msgs.head.action}, managedFraction:$managedFraction, blackboxFraction:$blackboxFraction, managed invoker size:$managed, blackboxes invoker size:$blackboxes")
+    val list = msgs
+      .foldLeft((List.empty[ScheduledPair], invokers)) { (tuple, msg: ContainerCreationMessage) =>
+        val pairs = tuple._1
+        val candidates = tuple._2
+
+        val requiredResources =
+          msg.whiskActionMetaData.annotations
+            .getAs[Seq[String]](Annotations.InvokerResourcesAnnotationName)
+            .getOrElse(Seq.empty[String])
+        val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
+          .getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
+          .getOrElse(true)
+        val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+        if (requiredResources.isEmpty) {
+          // only choose managed invokers or blackbox invokers
+          val wantedInvokers = if (isBlackboxInvocation) {
+            candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet
+          } else {
+            candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet
+          }
+          val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
+
+          if (wantedInvokers.nonEmpty) {
+            chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg)
+          } else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then
+            chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+          } else {
+            sendState(
+              FailedCreationJob(
+                msg.creationId,
+                msg.invocationNamespace,
+                msg.action,
+                msg.revision,
+                NoAvailableInvokersError,
+                s"No available invokers."))
+            (pairs, candidates)
+          }
+        } else {
+          val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
+          if (wantedInvokers.nonEmpty) {
+            chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+          } else if (resourcesStrictPolicy) {
+            sendState(
+              FailedCreationJob(
+                msg.creationId,
+                msg.invocationNamespace,
+                msg.action,
+                msg.revision,
+                NoAvailableResourceInvokersError,
+                s"No available invokers with resources $requiredResources."))
+            (pairs, candidates)
+          } else {
+            val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty)
+            if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
+              chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg)
+            } else {
+              val leftInvokers =
+                taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet))
+              if (leftInvokers.nonEmpty)
+                chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
+              else {
+                sendState(
+                  FailedCreationJob(
+                    msg.creationId,
+                    msg.invocationNamespace,
+                    msg.action,
+                    msg.revision,
+                    NoAvailableInvokersError,
+                    s"No available invokers."))
+                (pairs, candidates)
+              }
+            }
+          }
+        }
+      }
+      ._1 // pairs
+    list
+  }
+
+  private def chooseInvokerFromCandidates(
+    candidates: List[InvokerHealth],
+    wholeInvokers: List[InvokerHealth],
+    pairs: List[ScheduledPair],
+    msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = {
+    val idx = rng(mod = candidates.size)
+    val instance = candidates(idx)
+    // it must be compared to the instance unique id
+    val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head)
+    val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
+    val updated =
+      if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares.
+        wholeInvokers.updated(
+          idxInWhole,
+          instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB)))
+      } else {
+        // drop the nth element
+        val split = wholeInvokers.splitAt(idxInWhole)
+        val _ :: t = split._2
+        split._1 ::: t
+      }
+
+    (ScheduledPair(msg, instance.id) :: pairs, updated)
+  }
+
+  private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = {
+    QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
+      case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+        memoryQueueValue.queue ! state
+      case _ =>
+        logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
+    }
+  }
+
+  protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize, invocationNamespace: String)(
+    implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
+    etcd
+      .getPrefix(InvokerKeys.prefix)
+      .map { res =>
+        res.getKvsList.asScala
+          .map { kv =>
+            InvokerResourceMessage
+              .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+              .map { resourceMessage =>
+                val status = resourceMessage.status match {
+                  case Healthy.asString   => Healthy
+                  case Unhealthy.asString => Unhealthy
+                  case _                  => Offline
+                }
+
+                val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+                val invoker = temporalId.copy(
+                  userMemory = resourceMessage.freeMemory.MB,
+                  busyMemory = Some(resourceMessage.busyMemory.MB),
+                  tags = resourceMessage.tags,
+                  dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
+
+                InvokerHealth(invoker, status)
+              }
+              .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
+          }
+          .filter(i => i.status.isUsable)
+          .filter(_.id.userMemory >= minMemory)
+          .filter { invoker =>
+            invoker.id.dedicatedNamespaces.isEmpty || invoker.id.dedicatedNamespaces.contains(invocationNamespace)
+          }
+          .toList
+      }
+  }
+
+  protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize)(
+    implicit executor: ExecutionContext): Future[List[InvokerHealth]] = {
+    etcd
+      .getPrefix(InvokerKeys.prefix)
+      .map { res =>
+        res.getKvsList.asScala
+          .map { kv =>
+            InvokerResourceMessage
+              .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+              .map { resourceMessage =>
+                val status = resourceMessage.status match {
+                  case Healthy.asString   => Healthy
+                  case Unhealthy.asString => Unhealthy
+                  case _                  => Offline
+                }
+
+                val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+                val invoker = temporalId.copy(
+                  userMemory = resourceMessage.freeMemory.MB,
+                  busyMemory = Some(resourceMessage.busyMemory.MB),
+                  tags = resourceMessage.tags,
+                  dedicatedNamespaces = resourceMessage.dedicatedNamespaces)
+                InvokerHealth(invoker, status)
+              }
+              .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline))
+          }
+          .filter(i => i.status.isUsable)
+          .filter(_.id.userMemory >= minMemory)
+          .toList
+      }
+  }
+
+}
+
+case class NoCapacityException(msg: String) extends Exception(msg)
+
+/**
+ * TODO This needs to be moved to the QueueManager component that will be added later.
+ */
+object QueuePool {
+  private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
+
+  private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
+
+  private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
+
+  private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
+
+  private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
+
+  private[scheduler] def clear() = _queuePool.clear()
+
+  private[scheduler] def size = _queuePool.size
+
+  private[scheduler] def values = _queuePool.values
+
+  private[scheduler] def keys = _queuePool.keys
+}
+case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
+case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala
new file mode 100644
index 0000000..b34149e
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/message/ContainerMessage.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.message
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.{
+  ContainerCreationAckMessage,
+  ContainerCreationError,
+  ContainerCreationMessage
+}
+import org.apache.openwhisk.core.entity.{
+  ByteSize,
+  CreationId,
+  DocRevision,
+  FullyQualifiedEntityName,
+  SchedulerInstanceId,
+  WhiskActionMetaData
+}
+
+case class ContainerKeyMeta(revision: DocRevision, invokerId: Int, containerId: String)
+
+case class ContainerCreation(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)
+case class ContainerDeletion(invocationNamespace: String,
+                             action: FullyQualifiedEntityName,
+                             revision: DocRevision,
+                             whiskActionMetaData: WhiskActionMetaData)
+
+sealed trait CreationJob
+case class RegisterCreationJob(msg: ContainerCreationMessage) extends CreationJob
+case class FinishCreationJob(ack: ContainerCreationAckMessage) extends CreationJob
+case class ReschedulingCreationJob(tid: TransactionId,
+                                   creationId: CreationId,
+                                   invocationNamespace: String,
+                                   action: FullyQualifiedEntityName,
+                                   revision: DocRevision,
+                                   actionMetaData: WhiskActionMetaData,
+                                   schedulerHost: String,
+                                   rpcPort: Int,
+                                   retry: Int)
+    extends CreationJob {
+
+  def toCreationMessage(sid: SchedulerInstanceId, retryCount: Int): ContainerCreationMessage =
+    ContainerCreationMessage(
+      tid,
+      invocationNamespace,
+      action,
+      revision,
+      actionMetaData,
+      sid,
+      schedulerHost,
+      rpcPort,
+      retryCount,
+      creationId)
+}
+
+abstract class CreationJobState(val creationId: CreationId,
+                                val invocationNamespace: String,
+                                val action: FullyQualifiedEntityName,
+                                val revision: DocRevision)
+case class FailedCreationJob(override val creationId: CreationId,
+                             override val invocationNamespace: String,
+                             override val action: FullyQualifiedEntityName,
+                             override val revision: DocRevision,
+                             error: ContainerCreationError,
+                             message: String)
+    extends CreationJobState(creationId, invocationNamespace, action, revision)
+case class SuccessfulCreationJob(override val creationId: CreationId,
+                                 override val invocationNamespace: String,
+                                 override val action: FullyQualifiedEntityName,
+                                 override val revision: DocRevision)
+    extends CreationJobState(creationId, invocationNamespace, action, revision)
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 5ce7f31..944f2af 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -132,6 +132,11 @@
             threads = "{{ etcd.pool_threads }}"
         }
     }
+
+    scheduler {
+        protocol = "{{ scheduler.protocol }}"
+    }
+
 }
 
 #test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
index 8ab5946..d574069 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
@@ -22,6 +22,7 @@
 import akka.stream.ActorMaterializer
 import akka.testkit.{ImplicitSender, TestActor, TestFSMRef, TestKit, TestProbe}
 import common.StreamLogging
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.apache.openwhisk.common.{Enable, GracefulShutdown, RingBuffer}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector.InvokerResourceMessage
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 7e0d88f..a4a6145 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -43,7 +43,8 @@
 import akka.testkit.TestProbe
 import akka.util.Timeout
 import common.{LoggedFunction, StreamLogging}
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
+import org.apache.openwhisk.common.{InvokerHealth, InvokerState, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.connector.PingMessage
@@ -52,13 +53,10 @@
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.loadBalancer.ActivationRequest
 import org.apache.openwhisk.core.loadBalancer.GetStatus
-import org.apache.openwhisk.core.loadBalancer.InvokerState._
 import org.apache.openwhisk.core.loadBalancer.InvocationFinishedResult
 import org.apache.openwhisk.core.loadBalancer.InvocationFinishedMessage
 import org.apache.openwhisk.core.loadBalancer.InvokerActor
 import org.apache.openwhisk.core.loadBalancer.InvokerPool
-import org.apache.openwhisk.core.loadBalancer.InvokerState
-import org.apache.openwhisk.core.loadBalancer.InvokerHealth
 import org.apache.openwhisk.utils.retry
 import org.apache.openwhisk.core.connector.test.TestConnector
 import org.apache.openwhisk.core.entity.ControllerInstanceId
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 33dddf9..dbadbce 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -27,6 +27,7 @@
 
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.junit.JUnitRunner
@@ -36,10 +37,8 @@
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.NestedSemaphore
+import org.apache.openwhisk.common.{InvokerHealth, Logging, NestedSemaphore, TransactionId}
 import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
-import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.connector.CompletionMessage
@@ -67,7 +66,6 @@
 import org.apache.openwhisk.core.entity.test.ExecHelpers
 import org.apache.openwhisk.core.loadBalancer.FeedFactory
 import org.apache.openwhisk.core.loadBalancer.InvokerPoolFactory
-import org.apache.openwhisk.core.loadBalancer.InvokerState._
 import org.apache.openwhisk.core.loadBalancer._
 
 /**
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
new file mode 100644
index 0000000..aa68832
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container.test
+
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import com.ibm.etcd.api.{KeyValue, RangeResponse}
+import common.{StreamLogging, WskActorSystem}
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.InvokerState.{Healthy, Unhealthy}
+import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, TransactionId}
+import org.apache.openwhisk.core.connector.ContainerCreationError.{
+  NoAvailableInvokersError,
+  NoAvailableResourceInvokersError
+}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.{ContainerId, Uninitialized}
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.test.ExecHelpers
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.message.{
+  ContainerCreation,
+  ContainerDeletion,
+  FailedCreationJob,
+  RegisterCreationJob,
+  ReschedulingCreationJob,
+  SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.service.WatchEndpointInserted
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsArray, JsBoolean, JsString}
+
+import pureconfig.generic.auto._
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration.{FiniteDuration, _}
+
+@RunWith(classOf[JUnitRunner])
+class ContainerManagerTests
+    extends TestKit(ActorSystem("ContainerManager"))
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with StreamLogging {
+
+  val config = new WhiskConfig(ExecManifest.requiredProperties)
+
+  val testInvocationNamespace = "test-invocation-namespace"
+  val testNamespace = "test-namespace"
+  val testAction = "test-action"
+  val testfqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction))
+  val blackboxInvocation = false
+  val testCreationId = CreationId.generate()
+  val testRevision = DocRevision("1-testRev")
+  val testMemory = 256.MB
+  val testResources = Seq.empty[String]
+  val resourcesStrictPolicy = false
+
+  val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+  val action = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+  val execMetadata = CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
+  val actionMetadata =
+    WhiskActionMetaData(
+      action.namespace,
+      action.name,
+      execMetadata,
+      action.parameters,
+      action.limits,
+      action.version,
+      action.publish,
+      action.annotations)
+
+  val invokers: List[InvokerHealth] = List(
+    InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(3, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(4, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(5, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(6, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(7, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(8, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    InvokerHealth(InvokerInstanceId(9, userMemory = testMemory, tags = Seq.empty[String]), Healthy))
+
+  val testsid = SchedulerInstanceId("0")
+
+  val schedulerHost = "127.17.0.1"
+  val rpcPort = 13001
+
+  override def afterAll(): Unit = {
+    logLines.foreach(println)
+    QueuePool.clear()
+    TestKit.shutdownActorSystem(system)
+    super.afterAll()
+  }
+
+  override def beforeEach(): Unit = {
+    QueuePool.clear()
+  }
+
+  def mockMessaging(receiver: Option[ActorRef] = None): MessagingProvider = {
+    val messaging = stub[MessagingProvider]
+    val producer = receiver.map(fakeProducer(_)).getOrElse(stub[MessageProducer])
+    val consumer = stub[MessageConsumer]
+    (messaging
+      .getProducer(_: WhiskConfig, _: Option[ByteSize])(_: Logging, _: ActorSystem))
+      .when(*, *, *, *)
+      .returns(producer)
+    (messaging
+      .getConsumer(_: WhiskConfig, _: String, _: String, _: Int, _: FiniteDuration)(_: Logging, _: ActorSystem))
+      .when(*, *, *, *, *, *, *)
+      .returns(consumer)
+    // this is a stub producer
+    if (receiver.isEmpty) {
+      (producer
+        .send(_: String, _: Message, _: Int))
+        .when(*, *, *)
+        .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+    }
+
+    messaging
+  }
+
+  private def fakeProducer(receiver: ActorRef) = new MessageProducer {
+
+    /** Count of messages sent. */
+    override def sentCount(): Long = 0
+
+    /** Sends msg to topic. This is an asynchronous operation. */
+    override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+      val message = s"$topic-$msg"
+      receiver ! message
+
+      Future.successful(
+        new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+    }
+
+    /** Closes producer. */
+    override def close(): Unit = {}
+  }
+
+  def expectGetInvokers(etcd: EtcdClient, invokers: List[InvokerHealth] = invokers): Unit = {
+    (etcd
+      .getPrefix(_: String))
+      .expects(InvokerKeys.prefix)
+      .returning(Future.successful {
+        invokers
+          .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+            val msg = InvokerResourceMessage(
+              invoker.status.asString,
+              invoker.id.userMemory.toMB,
+              invoker.id.userMemory.toMB,
+              invoker.id.userMemory.toMB,
+              invoker.id.tags,
+              invoker.id.dedicatedNamespaces)
+
+            builder.addKvs(
+              KeyValue
+                .newBuilder()
+                .setKey(InvokerKeys.health(invoker.id))
+                .setValue(msg.toString)
+                .build())
+          }
+          .build()
+      })
+  }
+
+  /** Registers the transition callback and expects the first message */
+  def registerCallback(c: ActorRef) = {
+    c ! SubscribeTransitionCallBack(testActor)
+    expectMsg(CurrentState(c, Uninitialized))
+  }
+
+  def factory(t: TestProbe)(f: ActorRefFactory): ActorRef = t.ref
+
+  behavior of "ContainerManager"
+
+  it should "create container" in {
+    val mockEtcd = mock[EtcdClient]
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse.newBuilder().build()
+      })
+    expectGetInvokers(mockEtcd)
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+
+    val manager =
+      system.actorOf(
+        ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+    val creationMsg = ContainerCreation(msgs, testMemory, testInvocationNamespace)
+
+    manager ! creationMsg
+
+    mockJobManager.expectMsgPF() {
+      case RegisterCreationJob(`msg1`) => true
+      case RegisterCreationJob(`msg2`) => true
+      case RegisterCreationJob(`msg3`) => true
+    }
+  }
+
+  it should "try warmed containers first" in {
+    val mockEtcd = mock[EtcdClient]
+
+    // for test, only invoker2 is healthy, so that no-warmed creations can be only sent to invoker2
+    val invokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    )
+    expectGetInvokers(mockEtcd, invokers)
+    expectGetInvokers(mockEtcd, invokers)
+    expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+    val receiver = TestProbe()
+
+    val manager =
+      system.actorOf(ContainerManager
+        .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+    // there are 1 warmed container for `test-namespace/test-action` and 1 for `test-namespace/test-action-2`
+    manager ! WatchEndpointInserted(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+    manager ! WatchEndpointInserted(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msgs = List(msg1, msg2, msg3)
+
+    // it should reuse 2 warmed containers
+    manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
+
+    // ignore warmUp message
+    receiver.ignoreMsg {
+      case s: String => s.contains("warmUp")
+    }
+
+    // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
+    receiver.expectMsg(s"invoker0-$msg1")
+    receiver.expectMsg(s"invoker1-$msg2")
+    receiver.expectMsg(s"invoker2-$msg3")
+
+    mockJobManager.expectMsgPF() {
+      case RegisterCreationJob(`msg1`) => true
+      case RegisterCreationJob(`msg2`) => true
+      case RegisterCreationJob(`msg3`) => true
+    }
+
+    // now warmed container for action2 become warmed again
+    manager ! SuccessfulCreationJob(msg2.creationId, msg2.invocationNamespace, msg2.action, msg2.revision)
+    manager ! SuccessfulCreationJob(msg3.creationId, msg3.invocationNamespace, msg3.action, msg3.revision)
+    // it still need to use invoker2
+    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+    receiver.expectMsg(s"invoker2-$msg1")
+    // it will use warmed container on invoker1
+    manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
+    receiver.expectMsg(s"invoker1-$msg2")
+
+    // warmed container for action1 become warmed
+    manager ! SuccessfulCreationJob(msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision)
+    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+    receiver.expectMsg(s"invoker0-$msg1")
+  }
+
+  it should "rescheduling container creation" in {
+    val mockEtcd = mock[EtcdClient]
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse.newBuilder().build()
+      })
+    expectGetInvokers(mockEtcd)
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+
+    val manager =
+      system.actorOf(
+        ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+    val reschedulingMsg =
+      ReschedulingCreationJob(
+        TransactionId.testing,
+        testCreationId,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        schedulerHost,
+        rpcPort,
+        0)
+
+    val creationMsg = reschedulingMsg.toCreationMessage(testsid, reschedulingMsg.retry + 1)
+
+    manager ! reschedulingMsg
+
+    mockJobManager.expectMsg(RegisterCreationJob(creationMsg))
+  }
+
+  it should "forward GracefulShutdown to creation job manager" in {
+    val mockEtcd = mock[EtcdClient]
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse.newBuilder().build()
+      })
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+
+    val manager =
+      system.actorOf(
+        ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+    manager ! GracefulShutdown
+
+    mockJobManager.expectMsg(GracefulShutdown)
+  }
+
+  it should "generate random number less than mod" in {
+    val mod = 10
+    (1 to 100).foreach(_ => {
+      val num = ContainerManager.rng(mod)
+      num should be < mod
+    })
+  }
+
+  it should "choice invokers" in {
+    val healthyInvokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB), Healthy))
+
+    val minMemory = 512.MB
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns1")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns2")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns3")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msgs = List(msg1, msg2, msg3)
+
+    val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
+
+    pairs.map(_.msg) should contain theSameElementsAs msgs
+    pairs.map(_.invokerId).foreach {
+      healthyInvokers.map(_.id) should contain(_)
+    }
+  }
+
+  it should "choose invoker even if there is only one invoker" in {
+    val healthyInvokers: List[InvokerHealth] = List(InvokerHealth(InvokerInstanceId(0, userMemory = 1024.MB), Healthy))
+
+    val minMemory = 128.MB
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns1")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns2")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns3")),
+        testRevision,
+        actionMetadata.copy(limits = actionMetadata.limits.copy(memory = MemoryLimit(minMemory))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msgs = List(msg1, msg2, msg3)
+
+    val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory)
+
+    pairs.map(_.msg) should contain theSameElementsAs msgs
+    pairs.map(_.invokerId.instance).foreach {
+      healthyInvokers.map(_.id.instance) should contain(_)
+    }
+  }
+
+  it should "filter invokers based on tags" in {
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns1")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("cpu"), JsString("memory")))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns2")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("memory"), JsString("disk")))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns3")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("disk"), JsString("cpu")))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg4 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns4")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg5 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns5")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("fake"))) ++ Parameters(
+              Annotations.InvokerResourcesStrictPolicyAnnotationName,
+              JsBoolean(true))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val probe = TestProbe()
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+      MemoryQueueValue(probe.ref, true))
+
+    val healthyInvokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("memory", "disk")), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB, tags = Seq("disk", "cpu")), Healthy),
+      InvokerHealth(InvokerInstanceId(3, userMemory = 512.MB), Healthy))
+
+    // for msg1/2/3 we choose the exact invokers for them, for msg4, we choose no tagged invokers first, here is the invoker3
+    // for msg5, there is no available invokers, and the resource strict policy is true, so return an error
+    val pairs = ContainerManager.schedule(
+      healthyInvokers,
+      List(msg1, msg2, msg3, msg4, msg5),
+      msg1.whiskActionMetaData.limits.memory.megabytes.MB) // the memory is same for all msgs
+    pairs should contain theSameElementsAs List(
+      ScheduledPair(msg1, healthyInvokers(0).id),
+      ScheduledPair(msg2, healthyInvokers(1).id),
+      ScheduledPair(msg3, healthyInvokers(2).id),
+      ScheduledPair(msg4, healthyInvokers(3).id))
+    probe.expectMsg(
+      FailedCreationJob(
+        msg5.creationId,
+        testInvocationNamespace,
+        msg5.action,
+        testRevision,
+        NoAvailableResourceInvokersError,
+        "No available invokers with resources List(fake)."))
+  }
+
+  it should "respect the resource policy while use resource filter" in {
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns1")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+              Annotations.InvokerResourcesStrictPolicyAnnotationName,
+              JsBoolean(true))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns2")),
+        testRevision,
+        actionMetadata.copy(
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+              Annotations.InvokerResourcesStrictPolicyAnnotationName,
+              JsBoolean(false))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns3")),
+        testRevision,
+        actionMetadata.copy(
+          limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+              Annotations.InvokerResourcesStrictPolicyAnnotationName,
+              JsBoolean(false))),
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val probe = TestProbe()
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+      MemoryQueueValue(probe.ref, true))
+    val healthyInvokers: List[InvokerHealth] =
+      List(
+        InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
+        InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy))
+
+    // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
+    val pairs =
+      ContainerManager.schedule(healthyInvokers, List(msg1), msg1.whiskActionMetaData.limits.memory.megabytes.MB)
+    pairs.size shouldBe 0
+    probe.expectMsg(
+      FailedCreationJob(
+        msg1.creationId,
+        testInvocationNamespace,
+        msg1.action,
+        testRevision,
+        NoAvailableResourceInvokersError,
+        "No available invokers with resources List(non-exist)."))
+
+    // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
+    // here is the invoker0
+    val pairs2 =
+      ContainerManager.schedule(healthyInvokers, List(msg2), msg2.whiskActionMetaData.limits.memory.megabytes.MB)
+    pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
+
+    // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
+    // if there is none, then choose other invokers, here is the invoker1
+    val pairs3 = ContainerManager.schedule(
+      healthyInvokers.takeRight(1),
+      List(msg3),
+      msg3.whiskActionMetaData.limits.memory.megabytes.MB)
+    pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
+  }
+
+  it should "send FailedCreationJob to queue manager when no invokers are available" in {
+    val mockEtcd = mock[EtcdClient]
+    val probe = TestProbe()
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(InvokerKeys.prefix)
+      .returning(Future.successful {
+        RangeResponse.newBuilder().build()
+      })
+      .twice()
+
+    val fqn = FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction))
+
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, fqn.toDocId.asDocInfo(testRevision)),
+      MemoryQueueValue(probe.ref, true))
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+
+    val manager =
+      system.actorOf(
+        ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+    val msg =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        fqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    manager ! ContainerCreation(List(msg), testMemory, testInvocationNamespace)
+    probe.expectMsg(
+      FailedCreationJob(
+        msg.creationId,
+        testInvocationNamespace,
+        msg.action,
+        testRevision,
+        NoAvailableInvokersError,
+        "No available invokers."))
+  }
+
+  it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in {
+    stream.reset()
+    val mockEtcd = mock[EtcdClient]
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(*)
+      .returning(Future.successful {
+        RangeResponse.newBuilder().build()
+      })
+    expectGetInvokers(mockEtcd)
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+
+    val manager =
+      system.actorOf(
+        ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))
+
+    val exec = BlackBoxExec(ExecManifest.ImageName("image"), None, None, native = false, binary = false)
+    val action = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+    val execMetadata = BlackBoxExecMetaData(exec.image, exec.entryPoint, exec.native, exec.binary)
+    val actionMetadata =
+      WhiskActionMetaData(
+        action.namespace,
+        action.name,
+        execMetadata,
+        action.parameters,
+        action.limits,
+        action.version,
+        action.publish,
+        action.annotations)
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1)
+    val creationMsg = ContainerCreation(msgs, testMemory, testInvocationNamespace)
+
+    manager ! creationMsg
+
+    mockJobManager.expectMsgPF() {
+      case RegisterCreationJob(`msg1`) => true
+    }
+
+    Thread.sleep(1000)
+
+    // blackbox invoker number = 10 * 0.1 = 1, so the last blackbox invoker will be scheduled
+    // Because the debugging invoker is excluded, it sends a message to invoker9.
+    stream.toString should include(s"posting to invoker9")
+  }
+
+  it should "delete container" in {
+    val mockEtcd = mock[EtcdClient]
+    val invokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    )
+    val fqn = FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction))
+
+    expectGetInvokers(mockEtcd, invokers)
+
+    // both warmed and existing containers are in all invokers.
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(containerPrefix(ContainerKeys.namespacePrefix, testInvocationNamespace, fqn))
+      .returning(Future.successful {
+        invokers
+          .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+            builder.addKvs(
+              KeyValue
+                .newBuilder()
+                .setKey(
+                  ContainerKeys.existingContainers(
+                    testInvocationNamespace,
+                    fqn,
+                    testRevision,
+                    Some(invoker.id),
+                    Some(ContainerId("testContainer"))))
+                .build())
+          }
+          .build()
+      })
+
+    (mockEtcd
+      .getPrefix(_: String))
+      .expects(containerPrefix(ContainerKeys.warmedPrefix, testInvocationNamespace, fqn))
+      .returning(Future.successful {
+        invokers
+          .foldLeft(RangeResponse.newBuilder()) { (builder, invoker) =>
+            builder.addKvs(KeyValue
+              .newBuilder()
+              .setKey(ContainerKeys
+                .warmedContainers(testInvocationNamespace, fqn, testRevision, invoker.id, ContainerId("testContainer")))
+              .build())
+          }
+          .build()
+      })
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+    val receiver = TestProbe()
+
+    val manager =
+      system.actorOf(ContainerManager
+        .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+    val msg = ContainerDeletionMessage(
+      TransactionId.containerDeletion,
+      testInvocationNamespace,
+      fqn,
+      testRevision,
+      actionMetadata)
+    val deletionMessage = ContainerDeletion(testInvocationNamespace, fqn, testRevision, actionMetadata)
+
+    manager ! deletionMessage
+
+    val expectedMsgs = invokers.map(i => s"invoker${i.id.instance}-$msg")
+
+    receiver.expectMsgPF() {
+      case msg: String if msg.contains("warmUp") => true
+      case msg: String                           => expectedMsgs.contains(msg)
+      case msg =>
+        println(s"unexpected message: $msg")
+        fail()
+    }
+  }
+
+  it should "allow managed partition to overlap with blackbox for small N" in {
+    Seq((0.1, 0.9), (0.2, 0.8), (0.3, 0.7), (0.4, 0.6), (0.5, 0.5)).foreach { fraction =>
+      val blackboxFraction = fraction._1
+      val managedFraction = fraction._2
+
+      (1 to 100).toSeq.foreach { i =>
+        val m = Math.max(1, Math.ceil(i.toDouble * managedFraction).toInt)
+        val b = Math.max(1, Math.floor(i.toDouble * blackboxFraction).toInt)
+
+        m should be <= i
+        b shouldBe Math.max(1, (blackboxFraction * i).toInt)
+
+        blackboxFraction match {
+          case 0.1 if i < 10 => m + b shouldBe i + 1
+          case 0.2 if i < 5  => m + b shouldBe i + 1
+          case 0.3 if i < 4  => m + b shouldBe i + 1
+          case 0.4 if i < 3  => m + b shouldBe i + 1
+          case 0.5 if i < 2  => m + b shouldBe i + 1
+          case _             => m + b shouldBe i
+        }
+      }
+    }
+  }
+
+  it should "return the same pools if managed- and blackbox-pools are overlapping" in {
+    val blackboxFraction = 1.0
+    val managedFraction = 1.0
+    val totalInvokerSize = 100
+    var result = mutable.Buffer[InvokerHealth]()
+    (1 to totalInvokerSize).foreach { i =>
+      result = result :+ InvokerHealth(InvokerInstanceId(i, userMemory = 256.MB), Healthy)
+    }
+
+    val m = Math.max(1, Math.ceil(totalInvokerSize.toDouble * managedFraction).toInt)
+    val b = Math.max(1, Math.floor(totalInvokerSize.toDouble * blackboxFraction).toInt)
+
+    m shouldBe totalInvokerSize
+    b shouldBe totalInvokerSize
+
+    result.take(m) shouldBe result.takeRight(b)
+  }
+}
+
+@RunWith(classOf[JUnitRunner])
+class ContainerManager2Tests
+    extends FlatSpecLike
+    with Matchers
+    with StreamLogging
+    with ExecHelpers
+    with MockFactory
+    with ScalaFutures
+    with WskActorSystem
+    with BeforeAndAfterEach
+    with DbUtils {
+
+  implicit val dispatcher = actorSystem.dispatcher
+  val etcdClient = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+  val testInvocationNamespace = "test-invocation-namespace"
+
+  override def afterAll(): Unit = {
+    etcdClient.close()
+    super.afterAll()
+  }
+
+  it should "load invoker from specified clusterName only" in {
+    val clusterName1 = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+    val clusterName2 = "clusterName2"
+    val invokerResourceMessage =
+      InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq.empty[String])
+    etcdClient.put(s"${clusterName1}/invokers/0", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName1}/invokers/1", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName1}/invokers/2", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName2}/invokers/3", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName2}/invokers/4", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName2}/invokers/5", invokerResourceMessage.serialize)
+    // Make sure store above data in etcd
+    Thread.sleep(5.seconds.toMillis)
+    ContainerManager.getAvailableInvokers(etcdClient, 0.MB, testInvocationNamespace).map { invokers =>
+      invokers.length shouldBe 3
+      invokers.foreach { invokerHealth =>
+        List(0, 1, 2) should contain(invokerHealth.id.instance)
+      }
+    }
+    // Delete etcd data finally
+    List(
+      s"${clusterName1}/invokers/0",
+      s"${clusterName1}/invokers/1",
+      s"${clusterName1}/invokers/2",
+      s"${clusterName2}/invokers/3",
+      s"${clusterName2}/invokers/4",
+      s"${clusterName2}/invokers/5").foreach(etcdClient.del(_))
+  }
+
+  it should "load invoker from specified invocation namespace only" in {
+    val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+    val invokerResourceMessage =
+      InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq.empty[String])
+    val invokerResourceMessage2 =
+      InvokerResourceMessage(Healthy.asString, 1024, 0, 0, Seq.empty[String], Seq(testInvocationNamespace))
+    etcdClient.put(s"${clusterName}/invokers/0", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName}/invokers/1", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName}/invokers/2", invokerResourceMessage.serialize)
+    etcdClient.put(s"${clusterName}/invokers/3", invokerResourceMessage2.serialize)
+    etcdClient.put(s"${clusterName}/invokers/4", invokerResourceMessage2.serialize)
+    etcdClient.put(s"${clusterName}/invokers/5", invokerResourceMessage2.serialize)
+    // Make sure store above data in etcd
+    Thread.sleep(5.seconds.toMillis)
+    ContainerManager.getAvailableInvokers(etcdClient, 0.MB, testInvocationNamespace).map { invokers =>
+      invokers.length shouldBe 6
+      invokers.foreach { invokerHealth =>
+        List(0, 1, 2, 3, 4, 5) should contain(invokerHealth.id.instance)
+      }
+    }
+
+    // this new namespace should not use invoker3/4/5
+    ContainerManager.getAvailableInvokers(etcdClient, 0.MB, "new-namespace").map { invokers =>
+      invokers.length shouldBe 3
+      invokers.foreach { invokerHealth =>
+        List(0, 1, 2) should contain(invokerHealth.id.instance)
+      }
+    }
+    // Delete etcd data finally
+    List(
+      s"${clusterName}/invokers/0",
+      s"${clusterName}/invokers/1",
+      s"${clusterName}/invokers/2",
+      s"${clusterName}/invokers/3",
+      s"${clusterName}/invokers/4",
+      s"${clusterName}/invokers/5").foreach(etcdClient.del(_))
+  }
+}