blob: bc103507a24b302d272380b1eef7d110d794ded4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.test
import java.io.{ByteArrayOutputStream, PrintStream}
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration._
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.FlatSpecLike
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, ReactivePrewarmingConfig, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
import org.scalatest.concurrent.Eventually
/**
* Behavior tests for the ContainerPool
*
* These tests test the runtime behavior of a ContainerPool actor.
*/
@RunWith(classOf[JUnitRunner])
class ContainerPoolTests
extends TestKit(ActorSystem("ContainerPool"))
with ImplicitSender
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with MockFactory
with Eventually
with StreamLogging {
override def afterAll = TestKit.shutdownActorSystem(system)
val timeout = 5.seconds
// Common entities to pass to the tests. We don't really care what's inside
// those for the behavior testing here, as none of the contents will really
// reach a container anyway. We merely assert that passing and extraction of
// the values is done properly.
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
val threshold = 1
val increment = 1
/** Creates a `Run` message */
def createRunMessage(action: ExecutableWhiskAction, invocationNamespace: EntityName) = {
val uuid = UUID()
val message = ActivationMessage(
TransactionId.testing,
action.fullyQualifiedName(true),
action.rev,
Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret())),
ActivationId.generate(),
ControllerInstanceId("0"),
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
Run(action, message)
}
val invocationNamespace = EntityName("invocationSpace")
val differentInvocationNamespace = EntityName("invocationSpace2")
val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val concurrentAction = ExecutableWhiskAction(
EntityPath("actionSpace"),
EntityName("actionName"),
exec,
limits = ActionLimits(concurrency = ConcurrencyLimit(if (concurrencyEnabled) 3 else 1)))
val differentAction = action.copy(name = EntityName("actionName2"))
val largeAction =
action.copy(
name = EntityName("largeAction"),
limits = ActionLimits(memory = MemoryLimit(MemoryLimit.STD_MEMORY * 2)))
val runMessage = createRunMessage(action, invocationNamespace)
val runMessageLarge = createRunMessage(largeAction, invocationNamespace)
val runMessageDifferentAction = createRunMessage(differentAction, invocationNamespace)
val runMessageDifferentVersion = createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace)
val runMessageDifferentNamespace = createRunMessage(action, differentInvocationNamespace)
val runMessageDifferentEverything = createRunMessage(differentAction, differentInvocationNamespace)
val runMessageConcurrent = createRunMessage(concurrentAction, invocationNamespace)
val runMessageConcurrentDifferentNamespace = createRunMessage(concurrentAction, differentInvocationNamespace)
/** Helper to create PreWarmedData */
def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit, expires: Option[Deadline] = None) =
PreWarmedData(stub[MockableContainer], kind, memoryLimit, expires = expires)
/** Helper to create WarmedData */
def warmedData(run: Run, lastUsed: Instant = Instant.now) = {
WarmedData(stub[MockableContainer], run.msg.user.namespace.name, run.action, lastUsed)
}
/** Creates a sequence of containers and a factory returning this sequence. */
def testContainers(n: Int) = {
val containers = (0 to n).map(_ => TestProbe())
val queue = mutable.Queue(containers: _*)
val factory = (fac: ActorRefFactory) => queue.dequeue().ref
(containers, factory)
}
def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, 1.minute, None, 100)
behavior of "ContainerPool"
/*
* CONTAINER SCHEDULING
*
* These tests only test the simplest approaches. Look below for full coverage tests
* of the respective scheduling methods.
*/
it should "reuse a warm container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(1).expectNoMessage(100.milliseconds)
}
it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
pool ! runMessageDifferentVersion
containers(0).expectMsg(runMessageDifferentVersion)
containers(1).expectNoMessage(100.milliseconds)
}
it should "create a container if it cannot find a matching container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
// Note that the container doesn't respond, thus it's not free to take work
pool ! runMessage
containers(1).expectMsg(runMessage)
}
it should "remove a container to make space in the pool if it is already full and a different action arrives" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentEverything
containers(0).expectMsg(Remove)
containers(1).expectMsg(runMessageDifferentEverything)
}
it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within(
timeout) {
val (containers, factory) = testContainers(3)
val feed = TestProbe()
// a pool with slots for 2 actions with default memory limit.
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
pool ! runMessageDifferentAction // 2 * stdMemory taken -> full
containers(1).expectMsg(runMessageDifferentAction)
containers(0).send(pool, NeedWork(warmedData(runMessage))) // first action finished -> 1 * stdMemory taken
feed.expectMsg(MessageFeed.Processed)
containers(1)
.send(pool, NeedWork(warmedData(runMessageDifferentAction))) // second action finished -> 1 * stdMemory taken
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory)
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(2).expectMsg(runMessageLarge)
}
it should "cache a container if there is still space in the pool" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 active slot but 2 slots in total
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Run the first container
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage, lastUsed = Instant.EPOCH)))
feed.expectMsg(MessageFeed.Processed)
// Run the second container, don't remove the first one
pool ! runMessageDifferentEverything
containers(1).expectMsg(runMessageDifferentEverything)
containers(1).send(pool, NeedWork(warmedData(runMessageDifferentEverything, lastUsed = Instant.now)))
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentNamespace
containers(2).expectMsg(runMessageDifferentNamespace)
// 2 Slots exhausted, remove the first container to make space
containers(0).expectMsg(Remove)
}
it should "remove a container to make space in the pool if it is already full and another action with different invocation namespace arrives" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentNamespace
containers(0).expectMsg(Remove)
containers(1).expectMsg(runMessageDifferentNamespace)
}
it should "reschedule job when container is removed prematurely without sending message to feed" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// a pool with only 1 slot
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, RescheduleJob) // emulate container failure ...
containers(0).send(pool, runMessage) // ... causing job to be rescheduled
feed.expectNoMessage(100.millis)
containers(1).expectMsg(runMessage) // job resent to new actor
}
it should "not start a new container if there is not enough space in the pool" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Start first action
pool ! runMessage // 1 * stdMemory taken
containers(0).expectMsg(runMessage)
// Send second action to the pool
pool ! runMessageLarge // message is too large to be processed immediately.
containers(1).expectNoMessage(100.milliseconds)
// First action is finished
containers(0).send(pool, NeedWork(warmedData(runMessage))) // pool is empty again.
feed.expectMsg(MessageFeed.Processed)
// Second action should run now
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
}
containers(1).send(pool, NeedWork(warmedData(runMessageLarge)))
feed.expectMsg(MessageFeed.Processed)
}
it should "not create prewarm container when used memory reaches the limit" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool =
system.actorOf(ContainerPool
.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
containers(1).expectNoMessage(100.milliseconds)
}
/*
* CONTAINER PREWARMING
*/
it should "create prewarmed containers on startup" in within(timeout) {
val (containers, factory) = testContainers(1)
val feed = TestProbe()
val pool =
system.actorOf(
ContainerPool
.props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
}
it should "use a prewarmed container and create a new one to fill its place" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool =
system.actorOf(ContainerPool
.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
pool ! runMessage
containers(1).expectMsg(Start(exec, memoryLimit))
}
it should "use a prewarmed container with ttl and create a new one to fill its place" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val ttl = 5.seconds //make sure replaced prewarm has ttl
val pool =
system.actorOf(
ContainerPool
.props(
factory,
poolConfig(MemoryLimit.STD_MEMORY * 2),
feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(1, 1, ttl, 1, 1))))))
containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = Some(ttl.fromNow))))
pool ! runMessage
containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
}
it should "not use a prewarmed container if it doesn't fit the kind" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None)
val pool = system.actorOf(
ContainerPool
.props(
factory,
poolConfig(MemoryLimit.STD_MEMORY * 2),
feed.ref,
List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
pool ! runMessage
containers(1).expectMsg(runMessage) // but container1 is used
}
it should "not use a prewarmed container if it doesn't fit memory wise" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val alternativeLimit = 128.MB
val pool =
system.actorOf(
ContainerPool
.props(
factory,
poolConfig(MemoryLimit.STD_MEMORY * 2),
feed.ref,
List(PrewarmingConfig(1, exec, alternativeLimit))))
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
pool ! runMessage
containers(1).expectMsg(runMessage) // but container1 is used
}
/*
* CONTAINER DELETION
*/
it should "not reuse a container which is scheduled for deletion" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// container0 is reused
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// container0 is deleted
containers(0).send(pool, ContainerRemoved(true))
// container1 is created and used
pool ! runMessage
containers(1).expectMsg(runMessage)
}
/*
* Run buffer
*/
it should "first put messages into the queue and retrying them and then put messages only into the queue" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Pool with 512 MB usermemory
val pool =
system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Send action that blocks the pool
pool ! runMessageLarge
// Action 0 starts -> 0MB free
containers(0).expectMsg(runMessageLarge)
// Send action that should be written to the queue and retried in invoker
pool ! runMessage
containers(1).expectNoMessage(100.milliseconds)
// Send another message that should not be retried, but put into the queue as well
pool ! runMessageDifferentAction
containers(2).expectNoMessage(100.milliseconds)
// Action with 512 MB is finished
// Action 0 completes -> 512MB free
containers(0).send(pool, NeedWork(warmedData(runMessageLarge)))
// Action 1 should start immediately -> 256MB free
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessage.action, runMessage.msg, Some(_)) => true
}
// Action 2 should start immediately as well -> 0MB free (without any retries, as there is already enough space in the pool)
containers(2).expectMsg(runMessageDifferentAction)
// When buffer is emptied, process next feed message
feed.expectMsg(MessageFeed.Processed)
// Action 1 completes, process feed
containers(1).send(pool, NeedWork(warmedData(runMessage)))
feed.expectMsg(MessageFeed.Processed)
// Action 2 completes, process feed
containers(2).send(pool, NeedWork(warmedData(runMessageDifferentAction)))
feed.expectMsg(MessageFeed.Processed)
}
it should "process activations in the order they are arriving" in within(timeout) {
val (containers, factory) = testContainers(6)
val feed = TestProbe()
// Pool with 512 MB usermemory
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 2), feed.ref))
// Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 256 MB and Action 1 with 512 MB)
pool ! runMessage
containers(0).expectMsg(runMessage)
pool ! runMessageLarge
containers(1).expectNoMessage(100.milliseconds)
pool ! runMessageDifferentNamespace
containers(2).expectNoMessage(100.milliseconds)
pool ! runMessageDifferentAction
containers(3).expectNoMessage(100.milliseconds)
// Action 0 is finished -> 512 free; Large action should be executed now (2 more in queue)
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// Buffer still has 2, so feed will not be used
feed.expectNoMessage(100.milliseconds)
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
}
// Send another action to the container pool, that would fit memory-wise (3 in queue)
pool ! runMessageDifferentEverything
containers(4).expectNoMessage(100.milliseconds)
// Action 1 is finished -> 512 free; Action 2 and Action 3 should be executed now (1 more in queue)
containers(1).send(pool, NeedWork(warmedData(runMessageLarge)))
// Buffer still has 1, so feed will not be used
feed.expectNoMessage(100.milliseconds)
containers(2).expectMsg(runMessageDifferentNamespace)
// Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system
containers(3).expectMsg(runMessageDifferentAction)
// Action 3 is finished -> 256 free; Action 4 should start (0 in queue)
containers(3).send(pool, NeedWork(warmedData(runMessageDifferentAction)))
// Buffer is empty, so go back to processing feed
feed.expectMsg(MessageFeed.Processed)
// Run the 5th message from the buffer
containers(4).expectMsg(runMessageDifferentEverything)
// Action 2 is finished -> 256 free
containers(2).send(pool, NeedWork(warmedData(runMessageDifferentNamespace)))
feed.expectMsg(MessageFeed.Processed)
pool ! runMessage
// Back to buffering
pool ! runMessageDifferentVersion
containers(5).expectMsg(runMessage)
// Action 6 won't start because it is buffered, waiting for Action 4 to complete
containers(6).expectNoMessage(100.milliseconds)
// Action 4 is finished -> 256 free
containers(4).send(pool, NeedWork(warmedData(runMessageDifferentEverything)))
// Run the 6th message from the buffer
containers(6).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageDifferentVersion.action, runMessageDifferentVersion.msg, Some(_)) => true
}
// When buffer is emptied, process next feed message
feed.expectMsg(MessageFeed.Processed)
// Action 5 is finished -> 256 free, process feed
containers(5).send(pool, NeedWork(warmedData(runMessage)))
feed.expectMsg(MessageFeed.Processed)
// Action 6 is finished -> 512 free, process feed
containers(6).send(pool, NeedWork(warmedData(runMessageDifferentVersion)))
feed.expectMsg(MessageFeed.Processed)
}
it should "process runbuffer instead of requesting new messages" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val run3 = createRunMessage(action, invocationNamespace)
pool ! run1
pool ! run2 //will be buffered since the pool can only fit 1
pool ! run3 //will be buffered since the pool can only fit 1
//start first run
containers(0).expectMsg(run1)
//cannot launch more containers, so make sure additional containers are not created
containers(1).expectNoMessage(100.milliseconds)
//complete processing of first run
containers(0).send(pool, NeedWork(warmedData(run1)))
//don't feed till runBuffer is emptied
feed.expectNoMessage(100.milliseconds)
//start second run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
}
//complete processing of second run
containers(0).send(pool, NeedWork(warmedData(run2)))
//feed as part of last buffer item processing
feed.expectMsg(MessageFeed.Processed)
//start third run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run3.action, run3.msg, None) => true
}
//complete processing of third run
containers(0).send(pool, NeedWork(warmedData(run3)))
//now we expect feed to send a new message (1 per completion = 2 new messages)
feed.expectMsg(MessageFeed.Processed)
//make sure only one though
feed.expectNoMessage(100.milliseconds)
}
it should "process runbuffer when container is removed" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
//these will get buffered since allowLaunch is false
pool ! run1
pool ! run2
//start first run
containers(0).expectMsg(run1)
//trigger removal of the container ref, but don't start processing
containers(0).send(pool, RescheduleJob)
//trigger buffer processing by ContainerRemoved message
pool ! ContainerRemoved(true)
//start second run
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
}
}
it should "process runbuffered items only once" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 1), feed.ref))
val run1 = createRunMessage(action, invocationNamespace)
val run2 = createRunMessage(action, invocationNamespace)
val run3 = createRunMessage(action, invocationNamespace)
pool ! run1
pool ! run2 //will be buffered since the pool can only fit 1
pool ! run3 //will be buffered since the pool can only fit 1
//start first run
containers(0).expectMsg(run1)
//cannot launch more containers, so make sure additional containers are not created
containers(1).expectNoMessage(100.milliseconds)
//ContainerRemoved triggers buffer processing - if we don't prevent duplicates, this will cause the buffer head to be resent!
pool ! ContainerRemoved(true)
pool ! ContainerRemoved(true)
pool ! ContainerRemoved(true)
//complete processing of first run
containers(0).send(pool, NeedWork(warmedData(run1)))
//don't feed till runBuffer is emptied
feed.expectNoMessage(100.milliseconds)
//start second run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run2.action, run2.msg, Some(_)) => true
}
//complete processing of second run
containers(0).send(pool, NeedWork(warmedData(run2)))
//feed as part of last buffer item processing
feed.expectMsg(MessageFeed.Processed)
//start third run
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(run3.action, run3.msg, None) => true
}
//complete processing of third run
containers(0).send(pool, NeedWork(warmedData(run3)))
//now we expect feed to send a new message (1 per completion = 2 new messages)
feed.expectMsg(MessageFeed.Processed)
//make sure only one though
feed.expectNoMessage(100.milliseconds)
}
it should "increase activation counts when scheduling to containers whose actions support concurrency" in {
assume(concurrencyEnabled)
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container0 is reused
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container0 is reused
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container1 is created and used (these concurrent containers are configured with max 3 concurrent activations)
pool ! runMessageConcurrent
containers(1).expectMsg(runMessageConcurrent)
}
it should "schedule concurrent activations to different containers for different namespaces" in {
assume(concurrencyEnabled)
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container1 is created and used
pool ! runMessageConcurrentDifferentNamespace
containers(1).expectMsg(runMessageConcurrentDifferentNamespace)
}
it should "decrease activation counts when receiving NeedWork for actions that support concurrency" in {
assume(concurrencyEnabled)
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 4), feed.ref))
// container0 is created and used
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container0 is reused
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container0 is reused
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
// container1 is created and used (these concurrent containers are configured with max 3 concurrent activations)
pool ! runMessageConcurrent
containers(1).expectMsg(runMessageConcurrent)
// container1 is reused
pool ! runMessageConcurrent
containers(1).expectMsg(runMessageConcurrent)
// container1 is reused
pool ! runMessageConcurrent
containers(1).expectMsg(runMessageConcurrent)
containers(0).send(pool, NeedWork(warmedData(runMessageConcurrent)))
// container0 is reused (since active count decreased)
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
}
it should "backfill prewarms when prewarm containers are removed" in {
val (containers, factory) = testContainers(6)
val feed = TestProbe()
val pool =
system.actorOf(ContainerPool
.props(factory, poolConfig(MemoryLimit.STD_MEMORY * 5), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(1).expectMsg(Start(exec, memoryLimit))
//removing 2 prewarm containers will start 2 containers via backfill
containers(0).send(pool, ContainerRemoved(true))
containers(1).send(pool, ContainerRemoved(true))
containers(2).expectMsg(Start(exec, memoryLimit))
containers(3).expectMsg(Start(exec, memoryLimit))
//make sure extra prewarms are not started
containers(4).expectNoMessage(100.milliseconds)
containers(5).expectNoMessage(100.milliseconds)
}
it should "adjust prewarm container run well without reactive config" in {
val (containers, factory) = testContainers(4)
val feed = TestProbe()
stream.reset()
val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
val poolConfig =
ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val initialCount = 2
val pool =
system.actorOf(
ContainerPool
.props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(1).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
containers(1).send(pool, NeedWork(preWarmedData(exec.kind)))
// when invoker starts, include 0 prewarm container at the very beginning
stream.toString should include(s"found 0 started")
// the desiredCount should equal with initialCount when invoker starts
stream.toString should include(s"desired count: ${initialCount}")
stream.reset()
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
// Because already supplemented the prewarmed container, so currentCount should equal with initialCount
eventually {
stream.toString should not include ("started")
}
}
it should "adjust prewarm container run well with reactive config" in {
val (containers, factory) = testContainers(15)
val feed = TestProbe()
stream.reset()
val prewarmExpirationCheckIntervel = 2.seconds
val poolConfig =
ContainerPoolConfig(MemoryLimit.STD_MEMORY * 12, 0.5, false, prewarmExpirationCheckIntervel, None, 100)
val minCount = 0
val initialCount = 2
val maxCount = 4
val deadline: Option[Deadline] = Some(ttl.fromNow)
val reactive: Option[ReactivePrewarmingConfig] =
Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold, increment))
val pool =
system.actorOf(
ContainerPool
.props(factory, poolConfig, feed.ref, List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
//start 2 prewarms
containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(1).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
// when invoker starts, include 0 prewarm container at the very beginning
stream.toString should include(s"found 0 started")
// the desiredCount should equal with initialCount when invoker starts
stream.toString should include(s"desired count: ${initialCount}")
stream.reset()
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
//expire 2 prewarms
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(0).send(pool, ContainerRemoved(false))
containers(1).send(pool, ContainerRemoved(false))
// currentCount should equal with 0 due to these 2 prewarmed containers are expired
stream.toString should not include (s"found 0 started")
// the desiredCount should equal with minCount because cold start didn't happen
stream.toString should not include (s"desired count: ${minCount}")
// Previously created prewarmed containers should be removed
stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
stream.reset()
val action = ExecutableWhiskAction(
EntityPath("actionSpace"),
EntityName("actionName"),
exec,
limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
val run = createRunMessage(action, invocationNamespace)
// 2 cold start happened
pool ! run
pool ! run
containers(2).expectMsg(run)
containers(3).expectMsg(run)
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
eventually {
// Because already removed expired prewarmed containrs, so currentCount should equal with 0
stream.toString should include(s"found 0 started")
// the desiredCount should equal with 2 due to cold start happened
stream.toString should include(s"desired count: 2")
}
//add 2 prewarms due to increments
containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(5).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
stream.reset()
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
containers(4).expectMsg(Remove)
containers(5).expectMsg(Remove)
containers(4).send(pool, ContainerRemoved(false))
containers(5).send(pool, ContainerRemoved(false))
// removed previous 2 prewarmed container due to expired
stream.toString should include(s"removing up to ${poolConfig.prewarmExpirationLimit} of 2 expired containers")
stream.reset()
// 5 code start happened(5 > maxCount)
pool ! run
pool ! run
pool ! run
pool ! run
pool ! run
containers(6).expectMsg(run)
containers(7).expectMsg(run)
containers(8).expectMsg(run)
containers(9).expectMsg(run)
containers(10).expectMsg(run)
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
eventually {
// Because already removed expired prewarmed containrs, so currentCount should equal with 0
stream.toString should include(s"found 0 started")
// in spite of the cold start number > maxCount, but the desiredCount can't be greater than maxCount
stream.toString should include(s"desired count: ${maxCount}")
}
containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
containers(11).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(12).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(13).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
containers(14).send(pool, NeedWork(preWarmedData(exec.kind, expires = deadline)))
}
}
abstract class MockableContainer extends Container {
protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
}
/**
* Unit tests for the ContainerPool object.
*
* These tests test only the "static" methods "schedule" and "remove"
* of the ContainerPool object.
*/
@RunWith(classOf[JUnitRunner])
class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
val actionExec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val standardNamespace = EntityName("standardNamespace")
val differentNamespace = EntityName("differentNamespace")
/** Helper to create a new action from String representations */
def createAction(namespace: String = "actionNS", name: String = "actionName", limits: ActionLimits = ActionLimits()) =
ExecutableWhiskAction(EntityPath(namespace), EntityName(name), actionExec, limits = limits)
/** Helper to create WarmedData with sensible defaults */
def warmedData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant = Instant.now,
active: Int = 0) =
WarmedData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant = Instant.now,
active: Int = 0) =
WarmingData(stub[MockableContainer], EntityName(namespace), action, lastUsed, active)
/** Helper to create WarmingData with sensible defaults */
def warmingColdData(action: ExecutableWhiskAction = createAction(),
namespace: String = standardNamespace.asString,
lastUsed: Instant = Instant.now,
active: Int = 0) =
WarmingColdData(EntityName(namespace), action, lastUsed, active)
/** Helper to create PreWarmedData with sensible defaults */
def preWarmedData(kind: String = "anyKind", expires: Option[Deadline] = None) =
PreWarmedData(stub[MockableContainer], kind, 256.MB, expires = expires)
/** Helper to create NoData */
def noData() = NoData()
behavior of "ContainerPool schedule()"
it should "not provide a container if idle pool is empty" in {
ContainerPool.schedule(createAction(), standardNamespace, Map.empty) shouldBe None
}
it should "reuse an applicable warm container from idle pool with one container" in {
val data = warmedData()
val pool = Map('name -> data)
// copy to make sure, referencial equality doesn't suffice
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe Some('name, data)
}
it should "reuse an applicable warm container from idle pool with several applicable containers" in {
val data = warmedData()
val pool = Map('first -> data, 'second -> data)
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) should (be(Some('first, data)) or be(
Some('second, data)))
}
it should "reuse an applicable warm container from idle pool with several different containers" in {
val matchingData = warmedData()
val pool = Map('none -> noData(), 'pre -> preWarmedData(), 'warm -> matchingData)
ContainerPool.schedule(matchingData.action.copy(), matchingData.invocationNamespace, pool) shouldBe Some(
'warm,
matchingData)
}
it should "not reuse a container from idle pool with non-warm containers" in {
val data = warmedData()
// data is **not** in the pool!
val pool = Map('none -> noData(), 'pre -> preWarmedData())
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe None
}
it should "not reuse a warm container with different invocation namespace" in {
val data = warmedData()
val pool = Map('warm -> data)
val differentNamespace = EntityName(data.invocationNamespace.asString + "butDifferent")
data.invocationNamespace should not be differentNamespace
ContainerPool.schedule(data.action.copy(), differentNamespace, pool) shouldBe None
}
it should "not reuse a warm container with different action name" in {
val data = warmedData()
val differentAction = data.action.copy(name = EntityName(data.action.name.asString + "butDifferent"))
val pool = Map('warm -> data)
data.action.name should not be differentAction.name
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
}
it should "not reuse a warm container with different action version" in {
val data = warmedData()
val differentAction = data.action.copy(version = data.action.version.upMajor)
val pool = Map('warm -> data)
data.action.version should not be differentAction.version
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
}
it should "not use a container when active activation count >= maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val data = warmedData(
active = maxConcurrent,
action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
val pool = Map('warm -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe None
val data2 = warmedData(
active = maxConcurrent - 1,
action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent))))
val pool2 = Map('warm -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2)
}
it should "use a warming when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingData(active = maxConcurrent - 1, action = action)
val pool = Map('warming -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warming, data)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool2 = pool ++ Map('warm -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe Some('warm, data2)
}
it should "prefer warm to warming when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool = Map('warming -> data, 'warm -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warm, data2)
}
it should "use a warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warmingCold, data)
//after scheduling, the pool will update with new data to set active = maxConcurrent
val data2 = warmingColdData(active = maxConcurrent, action = action)
val pool2 = Map('warmingCold -> data2)
ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) shouldBe None
}
it should "prefer warm to warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmedData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data, 'warm -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warm, data2)
}
it should "prefer warming to warmingCold when active activation count < maxconcurrent" in {
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val maxConcurrent = if (concurrencyEnabled) 25 else 1
val action = createAction(limits = ActionLimits(concurrency = ConcurrencyLimit(maxConcurrent)))
val data = warmingColdData(active = maxConcurrent - 1, action = action)
val data2 = warmingData(active = maxConcurrent - 1, action = action)
val pool = Map('warmingCold -> data, 'warming -> data2)
ContainerPool.schedule(data.action, data.invocationNamespace, pool) shouldBe Some('warming, data2)
}
behavior of "ContainerPool remove()"
it should "not provide a container if pool is empty" in {
ContainerPool.remove(Map.empty, MemoryLimit.STD_MEMORY) shouldBe List.empty
}
it should "not provide a container from busy pool with non-warm containers" in {
val pool = Map('none -> noData(), 'pre -> preWarmedData())
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List.empty
}
it should "not provide a container from pool if there is not enough capacity" in {
val pool = Map('first -> warmedData())
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY * 2) shouldBe List.empty
}
it should "provide a container from pool with one single free container" in {
val data = warmedData()
val pool = Map('warm -> data)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('warm)
}
it should "provide oldest container from busy pool with multiple containers" in {
val commonNamespace = differentNamespace.asString
val first = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(1))
val second = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(2))
val oldest = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(0))
val pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('oldest)
}
it should "provide a list of the oldest containers from pool, if several containers have to be removed" in {
val namespace = differentNamespace.asString
val first = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(1))
val second = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(2))
val third = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(3))
val oldest = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(0))
val pool = Map('first -> first, 'second -> second, 'third -> third, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY * 2) shouldBe List('oldest, 'first)
}
it should "provide oldest container (excluding concurrently busy) from busy pool with multiple containers" in {
val commonNamespace = differentNamespace.asString
val first = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(1), active = 0)
val second = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(2), active = 0)
val oldest = warmedData(namespace = commonNamespace, lastUsed = Instant.ofEpochMilli(0), active = 3)
var pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('first)
pool = pool - 'first
ContainerPool.remove(pool, MemoryLimit.STD_MEMORY) shouldBe List('second)
}
it should "remove expired in order of expiration" in {
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 1)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
List(
PrewarmingConfig(1, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))),
PrewarmingConfig(1, exec2, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
val oldestDeadline = Deadline.now - 1.seconds
val newerDeadline = Deadline.now
val newestDeadline = Deadline.now + 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind2", Some(oldestDeadline)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest))
}
it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 2)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(0, 10, 10.seconds, 1, 1))))
//all are overdue, with different expiration times
val oldestDeadline = Deadline.now - 5.seconds
val newerDeadline = Deadline.now - 4.seconds
//the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
val newestDeadline = Deadline.now - 3.seconds
val newestDeadline2 = Deadline.now - 2.seconds
val newestDeadline3 = Deadline.now - 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)),
'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List('oldest, 'newer))
}
it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 10.seconds, None, 100)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
val prewarmConfig =
List(PrewarmingConfig(3, exec, memoryLimit, Some(ReactivePrewarmingConfig(2, 10, 10.seconds, 1, 1))))
//all are overdue, with different expiration times
val oldestDeadline = Deadline.now - 5.seconds
val newerDeadline = Deadline.now - 4.seconds
//the newest* ones are expired, but not the oldest, and not within the limit of 2 prewarms, so won't be removed
val newestDeadline = Deadline.now - 3.seconds
val newestDeadline2 = Deadline.now - 2.seconds
val newestDeadline3 = Deadline.now - 1.seconds
val prewarmedPool = Map(
'newest -> preWarmedData("actionKind", Some(newestDeadline)),
'oldest -> preWarmedData("actionKind", Some(oldestDeadline)),
'newest3 -> preWarmedData("actionKind", Some(newestDeadline3)),
'newer -> preWarmedData("actionKind", Some(newerDeadline)),
'newest2 -> preWarmedData("actionKind", Some(newestDeadline2)))
lazy val stream = new ByteArrayOutputStream
lazy val printstream = new PrintStream(stream)
lazy implicit val logging: Logging = new PrintStreamLogging(printstream)
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool) shouldBe (List(
'oldest,
'newer,
'newest,
'newest2,
'newest3))
}
}