package org.apache.openwhisk.core.containerpool.test
import{ByteArrayOutputStream, PrintStream}
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration._
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.FlatSpecLike
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, ReactivePrewarmingConfig, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
import org.scalatest.concurrent.Eventually
* Behavior tests for the ContainerPool
* These tests test the runtime behavior of a ContainerPool actor.
class ContainerPoolTests
extends TestKit(ActorSystem("ContainerPool"))
with ImplicitSender
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with MockFactory
with Eventually
with StreamLogging {
override def afterAll = TestKit.shutdownActorSystem(system)
val timeout = 5.seconds
// Common entities to pass to the tests. We don't really care what's inside
// those for the behavior testing here, as none of the contents will really
// reach a container anyway. We merely assert that passing and extraction of
// the values is done properly.
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
val threshold = 1
val increment = 1
/** Creates a `Run` message */
def createRunMessage(action: ExecutableWhiskAction, invocationNamespace: EntityName) = {
val uuid = UUID()
val message = ActivationMessage(
Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret())),
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
Run(action, message)
val invocationNamespace = EntityName("invocationSpace")
val differentInvocationNamespace = EntityName("invocationSpace2")
val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val concurrentAction = ExecutableWhiskAction(
limits = ActionLimits(concurrency = ConcurrencyLimit(if (concurrencyEnabled) 3 else 1)))
val differentAction = action.copy(name = EntityName("actionName2"))
val largeAction =
name = EntityName("largeAction"),
limits = ActionLimits(memory = MemoryLimit(MemoryLimit.STD_MEMORY * 2)))
val runMessage = createRunMessage(action, invocationNamespace)
val runMessageLarge = createRunMessage(largeAction, invocationNamespace)
val runMessageDifferentAction = createRunMessage(differentAction, invocationNamespace)
val runMessageDifferentVersion = createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace)
val runMessageDifferentNamespace = createRunMessage(action, differentInvocationNamespace)
val runMessageDifferentEverything = createRunMessage(differentAction, differentInvocationNamespace)
val runMessageConcurrent = createRunMessage(concurrentAction, invocationNamespace)
val runMessageConcurrentDifferentNamespace = createRunMessage(concurrentAction, differentInvocationNamespace)
/** Helper to create PreWarmedData */
def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit, expires: Option[Deadline] = None) =
PreWarmedData(stub[MockableContainer], kind, memoryLimit, expires = expires)
/** Helper to create WarmedData */
def warmedData(run: Run, lastUsed: Instant = = {
WarmedData(stub[MockableContainer],, run.action, lastUsed)
/** Creates a sequence of containers and a factory returning this sequence. */
def testContainers(n: Int) = {
val containers = (0 to n).map(_ => TestProbe())
val queue = mutable.Queue(containers: _*)
val factory = (fac: ActorRefFactory) => queue.dequeue().ref
(containers, factory)
def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
behavior of "ContainerPool"
* These tests only test the simplest approaches. Look below for full coverage tests
* of the respective scheduling methods.
it should "reuse a warm container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessage
it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessageDifferentVersion
it should "create a container if it cannot find a matching container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
// Note that the container doesn't respond, thus it's not free to take work
pool ! runMessage
it should "remove a container to make space in the pool if it is already full and a different action arrives" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessageDifferentEverything
it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within(
timeout) {
val (containers, factory) = testContainers(3)
val feed = TestProbe()
// a pool with slots for 2 actions with default memory limit.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref))
pool ! runMessage
pool ! runMessageDifferentAction // 2 * stdMemory taken -> full
containers(0).send(pool, NeedWork(warmedData(runMessage))) // first action finished -> 1 * stdMemory taken
.send(pool, NeedWork(warmedData(runMessageDifferentAction))) // second action finished -> 1 * stdMemory taken
pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory)
it should "cache a container if there is still space in the pool" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 active slot but 2 slots in total
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Run the first container
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage, lastUsed = Instant.EPOCH)))
// Run the second container, don't remove the first one
pool ! runMessageDifferentEverything
containers(1).send(pool, NeedWork(warmedData(runMessageDifferentEverything, lastUsed =
pool ! runMessageDifferentNamespace
// 2 Slots exhausted, remove the first container to make space
it should "remove a container to make space in the pool if it is already full and another action with different invocation namespace arrives" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessageDifferentNamespace
it should "reschedule job when container is removed prematurely without sending message to feed" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).send(pool, RescheduleJob) // emulate container failure ...
containers(0).send(pool, runMessage) // ... causing job to be rescheduled
containers(1).expectMsg(runMessage) // job resent to new actor
it should "not start a new container if there is not enough space in the pool" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Start first action
pool ! runMessage // 1 * stdMemory taken
// Send second action to the pool
pool ! runMessageLarge // message is too large to be processed immediately.
// First action is finished
containers(0).send(pool, NeedWork(warmedData(runMessage))) // pool is empty again.
// Second action should run now
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
containers(1).send(pool, NeedWork(warmedData(runMessageLarge)))
it should "create prewarmed containers on startup" in within(timeout) {
val (containers, factory) = testContainers(1)
val feed = TestProbe()
val pool =
.props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
it should "use a prewarmed container and create a new one to fill its place" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool =
.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
pool ! runMessage
containers(1).expectMsg(Start(exec, memoryLimit))
it should "use a prewarmed container with ttl and create a new one to fill its place" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val ttl = 5.seconds //make sure replaced prewarm has ttl
val pool =
poolConfig(MemoryLimit.STD_MEMORY * 2),
List(PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(1, 1, ttl, 1, 1))))))
containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = Some(ttl.fromNow))))
pool ! runMessage
containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
it should "not use a prewarmed container if it doesn't fit the kind" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None)
val pool = system.actorOf(
poolConfig(MemoryLimit.STD_MEMORY * 2),
List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
pool ! runMessage
containers(1).expectMsg(runMessage) // but container1 is used
it should "not use a prewarmed container if it doesn't fit memory wise" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val alternativeLimit = 128.MB
val pool =
poolConfig(MemoryLimit.STD_MEMORY * 2),
List(PrewarmingConfig(1, exec, alternativeLimit))))
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
pool ! runMessage
containers(1).expectMsg(runMessage) // but container1 is used
it should "not reuse a container which is scheduled for deletion" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// container0 is reused
pool ! runMessage
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// container0 is deleted
containers(0).send(pool, ContainerRemoved(true))
// container1 is created and used
pool ! runMessage
* Run buffer
it should "first put messages into the queue and retrying them and then put messages only into the queue" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Pool with 512 MB usermemory
val pool =
system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Send action that blocks the pool
pool ! runMessageLarge
// Action 0 starts -> 0MB free
// Send action that should be written to the queue and retried in invoker
pool ! runMessage
// Send another message that should not be retried, but put into the queue as well
pool ! runMessageDifferentAction
// Action with 512 MB is finished
// Action 0 completes -> 512MB free
containers(0).send(pool, NeedWork(warmedData(runMessageLarge)))
// Action 1 should start immediately -> 256MB free
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessage.action, runMessage.msg, Some(_)) => true
// Action 2 should start immediately as well -> 0MB free (without any retries, as there is already enough space in the pool)
// When buffer is emptied, process next feed message
// Action 1 completes, process feed
containers(1).send(pool, NeedWork(warmedData(runMessage)))
// Action 2 completes, process feed
containers(2).send(pool, NeedWork(warmedData(runMessageDifferentAction)))
it should "process activations in the order they are arriving" in within(timeout) {
val (containers, factory) = testContainers(6)
val feed = TestProbe()
// Pool with 512 MB usermemory
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 256 MB and Action 1 with 512 MB)
pool ! runMessage
pool ! runMessageLarge
pool ! runMessageDifferentNamespace
pool ! runMessageDifferentAction
// Action 0 is finished -> 512 free; Large action should be executed now (2 more in queue)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// Buffer still has 2, so feed will not be used
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
// Send another action to the container pool, that would fit memory-wise (3 in queue)
pool ! runMessageDifferentEverything
// Action 1 is finished -> 512 free; Action 2 and Action 3 should be executed now (1 more in queue)
containers(1).send(pool, NeedWork(warmedData(runMessageLarge)))
// Buffer still has 1, so feed will not be used
// Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system
// Action 3 is finished -> 256 free; Action 4 should start (0 in queue)
containers(3).send(pool, NeedWork(warmedData(runMessageDifferentAction)))
// Buffer is empty, so go back to processing feed
// Run the 5th message from the buffer
// Action 2 is finished -> 256 free
containers(2).send(pool, NeedWork(warmedData(runMessageDifferentNamespace)))
pool ! runMessage
// Back to buffering
pool ! runMessageDifferentVersion
// Action 6 won't start because it is buffered, waiting for Action 4 to complete
// Action 4 is finished -> 256 free
containers(4).send(pool, NeedWork(warmedData(runMessageDifferentEverything)))
// Run the 6th message from the buffer
containers(6).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageDifferentVersion.action, runMessageDifferentVersion.msg, Some(_)) => true
// When buffer is emptied, process next feed message
// Action 5 is finished -> 256 free, process feed
containers(5).send(pool, NeedWork(warmedData(runMessage)))
// Action 6 is finished -> 512 free, process feed
containers(6).send(pool, NeedWork(warmedData(runMessageDifferentVersion)))
it should "process runbuffer instead of requesting new messages" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val run3 = createRunMessage(action, invocationNamespace)
pool ! run1
pool ! run2 //will be buffered since the pool can only fit 1
pool ! run3 //will be buffered since the pool can only fit 1
//start first run
//cannot launch more containers, so make sure additional containers are not created
//complete processing of first run
containers(0).send(pool, NeedWork(warmedData(run1)))
//don't feed till runBuffer is emptied
//start second run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
//complete processing of second run
containers(0).send(pool, NeedWork(warmedData(run2)))
//feed as part of last buffer item processing
//start third run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run3.action, run3.msg, None) => true
//complete processing of third run
containers(0).send(pool, NeedWork(warmedData(run3)))
//now we expect feed to send a new message (1 per completion = 2 new messages)
//make sure only one though
it should "process runbuffer when container is removed" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
//these will get buffered since allowLaunch is false
pool ! run1
pool ! run2
//start first run
//trigger removal of the container ref, but don't start processing
containers(0).send(pool, RescheduleJob)
//trigger buffer processing by ContainerRemoved message
pool ! ContainerRemoved(true)
//start second run
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
it should "process runbuffered items only once" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val run3 = createRunMessage(action, invocationNamespace)
pool ! run1
pool ! run2 //will be buffered since the pool can only fit 1
pool ! run3 //will be buffered since the pool can only fit 1
//start first run
//cannot launch more containers, so make sure additional containers are not created
//ContainerRemoved triggers buffer processing - if we don't prevent duplicates, this will cause the buffer head to be resent!
pool ! ContainerRemoved(true)
pool ! ContainerRemoved(true)
pool ! ContainerRemoved(true)
//complete processing of first run
containers(0).send(pool, NeedWork(warmedData(run1)))
//don't feed till runBuffer is emptied
//start second run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
//complete processing of second run
containers(0).send(pool, NeedWork(warmedData(run2)))
//feed as part of last buffer item processing
//start third run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run3.action, run3.msg, None) => true
//complete processing of third run
containers(0).send(pool, NeedWork(warmedData(run3)))
//now we expect feed to send a new message (1 per completion = 2 new messages)
//make sure only one though
it should "increase activation counts when scheduling to containers whose actions support concurrency" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
// container0 is reused
pool ! runMessageConcurrent
// container0 is reused
pool ! runMessageConcurrent
// container1 is created and used (these concurrent containers are configured with max 3 concurrent activations)
pool ! runMessageConcurrent
it should "schedule concurrent activations to different containers for different namespaces" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
// container1 is created and used
pool ! runMessageConcurrentDifferentNamespace
it should "decrease activation counts when receiving NeedWork for actions that support concurrency" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
// container0 is reused
pool ! runMessageConcurrent
// container0 is reused
pool ! runMessageConcurrent
// container1 is created and used (these concurrent containers are configured with max 3 concurrent activations)
pool ! runMessageConcurrent
// container1 is reused
pool ! runMessageConcurrent
// container1 is reused
pool ! runMessageConcurrent
containers(0).send(pool, NeedWork(warmedData(runMessageConcurrent)))
// container0 is reused (since active count decreased)
pool ! runMessageConcurrent
it should "backfill prewarms when prewarm containers are removed" in {
val (containers, factory) = testContainers(6)
val feed = TestProbe()
val pool =
.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 5), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(1).expectMsg(Start(exec, memoryLimit))
//removing 2 prewarm containers will start 2 containers via backfill
containers(0).send(pool, ContainerRemoved(true))
containers(1).send(pool, ContainerRemoved(true))
containers(2).expectMsg(Start(exec, memoryLimit))
containers(3).expectMsg(Start(exec, memoryLimit))
//make sure extra prewarms are not started
it should "adjust prewarm container run well without reactive config" in {
val (containers, factory) = testContainers(4)
val feed = TestProbe()
val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
val poolConfig =
ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val initialCount = 2
val pool =
.props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(1).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
containers(1).send(pool, NeedWork(preWarmedData(exec.kind)))
// when invoker starts, include 0 prewarm container at the very beginning
stream.toString should include(s"found 0 started")
// the desiredCount should equal with initialCount when invoker starts
stream.toString should include(s"desired count: ${initialCount}")
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
// Because already supplemented the prewarmed container, so currentCount should equal with initialCount
eventually {
stream.toString should not include ("started")
it should "adjust prewarm container run well with reactive config" in {
val (containers, factory) = testContainers(15)
val feed = TestProbe()
val prewarmExpirationCheckIntervel = 2.seconds
val poolConfig =
ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val minCount = 0
val initialCount = 2
val maxCount = 4
val deadline: Option[Deadline] = Some(ttl.fromNow)
val reactive: Option[ReactivePrewarmingConfig] =
Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold, increment))
val pool =
.props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
//start 2 prewarms
containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(1).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
// when invoker starts, include 0 prewarm container at the very beginning
stream.toString should include(s"found 0 started")
// the desiredCount should equal with initialCount when invoker starts
stream.toString should include(s"desired count: ${initialCount}")
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
//expire 2 prewarms
containers(0).send(pool, ContainerRemoved(false))
containers(1).send(pool, ContainerRemoved(false))
// currentCount should equal with 0 due to these 2 prewarmed containers are expired
stream.toString should not include (s"found 0 started")
// the desiredCount should equal with minCount because cold start didn't happen
stream.toString should not include (s"desired count: ${minCount}")
// Previously created prewarmed containers should be removed
stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
val action = ExecutableWhiskAction(
limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
val run = createRunMessage(action, invocationNamespace)
// 2 cold start happened
pool ! run
pool ! run
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
eventually {
// Because already removed expired prewarmed containrs, so currentCount should equal with 0
stream.toString should include(s"found 0 started")
// the desiredCount should equal with 2 due to cold start happened
stream.toString should include(s"desired count: 2")
//add 2 prewarms due to increments
containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(5).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
containers(4).send(pool, ContainerRemoved(false))
containers(5).send(pool, ContainerRemoved(false))
// removed previous 2 prewarmed container due to expired
stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
// 5 code start happened(5 > maxCount)
pool ! run
pool ! run
pool ! run
pool ! run
pool ! run
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
eventually {
// Because already removed expired prewarmed containrs, so currentCount should equal with 0
stream.toString should include(s"found 0 started")
// in spite of the cold start number > maxCount, but the desiredCount can't be greater than maxCount
stream.toString should include(s"desired count: ${maxCount}")
containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(11).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(12).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(13).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(14).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
abstract class MockableContainer extends Container {
protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
* Unit tests for the ContainerPool object.
* These tests test only the "static" methods "schedule" and "remove"
* of the ContainerPool object.
class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
val actionExec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val standardNamespace = EntityName("standardNamespace")
val differentNamespace = EntityName("differentNamespace")
/** Helper to create a new action from String representations */
def createAction(namespace: String = "actionNS", name: String = "actionName", limits: ActionLimits = ActionLimits()) =
ExecutableWhiskAction(EntityPath(namespace), EntityName(name), actionExec, limits = limits)
/** Helper to create WarmedData with sensible defaults */
def warmedData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant =,
active: Int = 0) =
WarmedData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant =,
active: Int = 0) =
WarmingData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingColdData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant =,
active: Int = 0) =
WarmingColdData(EntityName(namespace), action, lastUsed, active)
/** Helper to create PreWarmedData with sensible defaults */
def preWarmedData(kind: String = "anyKind", expires: Option[Deadline] = None) =
PreWarmedData(stub[MockableContainer], kind, 256.MB, expires = expires)
/** Helper to create NoData */
def noData() = NoData()
behavior of "ContainerPool schedule()"
it should "not provide a container if idle pool is empty" in {
ContainerPool.schedule(createAction(), standardNamespace, Map.empty) shouldBe None
it should "reuse an applicable warm container from idle pool with one container" in {
val data = warmedData()
val pool = Map('name -> data)
// copy to make sure, referencial equality doesn't suffice
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe Some('name, data)
it should "reuse an applicable warm container from idle pool with several applicable containers" in {
val data = warmedData()
val pool = Map('first -> data, 'second -> data)
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) should (be(Some('first, data)) or be(
Some('second, data)))
it should "reuse an applicable warm container from idle pool with several different containers" in {
val matchingData = warmedData()
val pool = Map('none -> noData(), 'pre -> preWarmedData(), 'warm -> matchingData)
ContainerPool.schedule(matchingData.action.copy(), matchingData.invocationNamespace, pool) shouldBe Some(
it should "not reuse a container from idle pool with non-warm containers" in {
val data = warmedData()
// data is **not** in the pool!
val pool = Map('none -> noData(), 'pre -> preWarmedData())
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe None
it should "not reuse a warm container with different invocation namespace" in {
val data = warmedData()
val pool = Map('warm -> data)
val differentNamespace = EntityName(data.invocationNamespace.asString + "butDifferent")
data.invocationNamespace should not be differentNamespace
ContainerPool.schedule(data.action.copy(), differentNamespace, pool) shouldBe None
it should "not reuse a warm container with different action name" in {
val data = warmedData()
val differentAction = data.action.copy(name = EntityName( + "butDifferent"))
val pool = Map('warm -> data) should not be
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
it should "not reuse a warm container with different action version" in {
val data = warmedData()
val differentAction = data.action.copy(version = data.action.version.upMajor)
val pool = Map('warm -> data)
data.action.version should not be differentAction.version
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
it should "not use a container when active activation count >= maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val data = warmedData(
active = maxConcurrent,
action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
val pool = Map('warm -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe None
val data2 = warmedData(
active = maxConcurrent - 1,
action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
val pool2 = Map('warm -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2)
it should "use a warming when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingData(active = maxConcurrent - 1, action = action)
val pool = Map('warming -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warming, data)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool2 = pool ++ Map('warm -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2)
it should "prefer warm to warming when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool = Map('warming -> data, 'warm -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warm, data2)
it should "use a warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warmingCold, data)
//after scheduling, the pool will update with new data to set active = maxConcurrent
val data2 = warmingColdData(active = maxConcurrent, action = action)
val pool2 = Map('warmingCold -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe None
it should "prefer warm to warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data, 'warm -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warm, data2)
it should "prefer warming to warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmingData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data, 'warming -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warming, data2)
behavior of "ContainerPool remove()"
it should "not provide a container if pool is empty" in {
ContainerPool.remove(Map.empty, MemoryLimit.STD_MEMORY) shouldBe List.empty
it should "not provide a container from busy pool with non-warm containers" in {
val pool = Map('none -> noData(), 'pre -> preWarmedData())
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List.empty
it should "not provide a container from pool if there is not enough capacity" in {
val pool = Map('first -> warmedData())
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY * 2) shouldBe List.empty
it should "provide a container from pool with one single free container" in {
val data = warmedData()
val pool = Map('warm -> data)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('warm)
it should "provide oldest container from busy pool with multiple containers" in {
val commonNamespace = differentNamespace.asString
val first = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(1))
val second = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(2))
val oldest = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(0))
val pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('oldest)
it should "provide a list of the oldest containers from pool, if several containers have to be removed" in {
val namespace = differentNamespace.asString
val first = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(1))
val second = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(2))
val third = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(3))
val oldest = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(0))
val pool = Map('first -> first, 'second -> second, 'third -> third, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY * 2) shouldBe List('oldest, 'first)
it should "provide oldest container (excluding concurrently busy) from busy pool with multiple containers" in {
val commonNamespace = differentNamespace.asString
val first = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(1), active = 0)
val second = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(2), active = 0)
val oldest = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(0), active = 3)
var pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('first)
pool = pool - 'first
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('second)
it should "remove expired in order of expiration" in {
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))),
PrewarmingConfig(1, exec2, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
val oldestDeadline = - 1.seconds
val newerDeadline =
val newestDeadline = + 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind2", Some(oldestDeadline)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest))
it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
//all are overdue, with different expiration times
val oldestDeadline = - 5.seconds
val newerDeadline = - 4.seconds
//the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
val newestDeadline = - 3.seconds
val newestDeadline2 = - 2.seconds
val newestDeadline3 = - 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)),
'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest, 'newer))
it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
val prewarmConfig =
List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(2, 10, 10.seconds, 1, 1))))
//all are overdue, with different expiration times
val oldestDeadline = - 5.seconds
val newerDeadline = - 4.seconds
//the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
val newestDeadline = - 3.seconds
val newestDeadline2 = - 2.seconds
val newestDeadline3 = - 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)),
'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List(