Don't create prewarm container when used memory reaches the limit (#5048)

* Don't create prewarm container when used memory reaches the limit

* Fix review points
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 4b66e99..724cd59 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -129,44 +129,40 @@
         val memory = r.action.limits.memory.megabytes.MB
 
         val createdContainer =
-          // Is there enough space on the invoker for this action to be executed.
-          if (hasPoolSpaceFor(busyPool ++ prewarmedPool, memory)) {
-            // Schedule a job to a warm container
-            ContainerPool
-              .schedule(r.action, r.msg.user.namespace.name, freePool)
-              .map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state
-              .orElse(
-                // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container.
-
-                // Is there enough space to create a new container or do other containers have to be removed?
-                if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, memory)) {
+          // Schedule a job to a warm container
+          ContainerPool
+            .schedule(r.action, r.msg.user.namespace.name, freePool)
+            .map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state
+            .orElse(
+              // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container.
+              // When take prewarm container, has no need to judge whether user memory is enough
+              takePrewarmContainer(r.action)
+                .map(container => (container, "prewarmed"))
+                .orElse {
+                  // Is there enough space to create a new container or do other containers have to be removed?
+                  if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, prewarmStartingPool, memory)) {
+                    val container = Some(createContainer(memory), "cold")
+                    incrementColdStartCount(kind, memory)
+                    container
+                  } else None
+                })
+            .orElse(
+              // Remove a container and create a new one for the given job
+              ContainerPool
+              // Only free up the amount, that is really needed to free up
+                .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB)
+                .map(removeContainer)
+                // If the list had at least one entry, enough containers were removed to start the new container. After
+                // removing the containers, we are not interested anymore in the containers that have been removed.
+                .headOption
+                .map(_ =>
                   takePrewarmContainer(r.action)
-                    .map(container => (container, "prewarmed"))
-                    .orElse {
-                      val container = Some(createContainer(memory), "cold")
+                    .map(container => (container, "recreatedPrewarm"))
+                    .getOrElse {
+                      val container = (createContainer(memory), "recreated")
                       incrementColdStartCount(kind, memory)
                       container
-                    }
-                } else None)
-              .orElse(
-                // Remove a container and create a new one for the given job
-                ContainerPool
-                // Only free up the amount, that is really needed to free up
-                  .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB)
-                  .map(removeContainer)
-                  // If the list had at least one entry, enough containers were removed to start the new container. After
-                  // removing the containers, we are not interested anymore in the containers that have been removed.
-                  .headOption
-                  .map(_ =>
-                    takePrewarmContainer(r.action)
-                      .map(container => (container, "recreatedPrewarm"))
-                      .getOrElse {
-                        val container = (createContainer(memory), "recreated")
-                        incrementColdStartCount(kind, memory)
-                        container
-                    }))
-
-          } else None
+                  }))
 
         createdContainer match {
           case Some(((actor, data), containerState)) =>
@@ -371,9 +367,15 @@
 
   /** Creates a new prewarmed container */
   def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl: Option[FiniteDuration]): Unit = {
-    val newContainer = childFactory(context)
-    prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
-    newContainer ! Start(exec, memoryLimit, ttl)
+    if (hasPoolSpaceFor(busyPool ++ freePool ++ prewarmedPool, prewarmStartingPool, memoryLimit)) {
+      val newContainer = childFactory(context)
+      prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, memoryLimit))
+      newContainer ! Start(exec, memoryLimit, ttl)
+    } else {
+      logging.warn(
+        this,
+        s"Cannot create prewarm container due to reach the invoker memory limit: ${poolConfig.userMemory.toMB}")
+    }
   }
 
   /** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
@@ -439,8 +441,10 @@
    * @param memory The amount of memory to check.
    * @return true, if there is enough space for the given amount of memory.
    */
-  def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
-    memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
+  def hasPoolSpaceFor[A](pool: Map[A, ContainerData],
+                         prewarmStartingPool: Map[A, (String, ByteSize)],
+                         memory: ByteSize): Boolean = {
+    memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB
   }
 
   /**
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 3fd0414..bc10350 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -310,6 +310,19 @@
     feed.expectMsg(MessageFeed.Processed)
   }
 
+  it should "not create prewarm container when used memory reaches the limit" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    val pool =
+      system.actorOf(ContainerPool
+        .props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
+
+    containers(1).expectNoMessage(100.milliseconds)
+  }
+
   /*
    * CONTAINER PREWARMING
    */
@@ -320,7 +333,7 @@
     val pool =
       system.actorOf(
         ContainerPool
-          .props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
+          .props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
     containers(0).expectMsg(Start(exec, memoryLimit))
   }
 
@@ -829,7 +842,7 @@
     stream.reset()
     val prewarmExpirationCheckIntervel = 2.seconds
     val poolConfig =
-      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
+      ContainerPoolConfig(MemoryLimit.STD_MEMORY * 12, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
     val minCount = 0
     val initialCount = 2
     val maxCount = 4