blob: 479739e23b059746713d01e655eee18bf9441dda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.v2.test
import java.net.InetSocketAddress
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.http.scaladsl.model
import akka.io.Tcp.Connect
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import akka.util.ByteString
import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse}
import com.ibm.etcd.client.{EtcdClient => Client}
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
import org.apache.openwhisk.core.containerpool.v2._
import org.apache.openwhisk.core.containerpool.{
ContainerRemoved,
Paused => _,
Pausing => _,
Removing => _,
Running => _,
Start => _,
Uninitialized => _,
_
}
import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserContext}
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.types.AuthStore
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _}
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.invoker.{Invoker, NamespaceBlacklist}
import org.apache.openwhisk.core.service.{GetLease, Lease, RegisterData, UnregisterData}
import org.apache.openwhisk.http.Messages
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers}
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, _}
import scala.collection.mutable
import scala.collection.mutable.{Map => MutableMap}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise}
@RunWith(classOf[JUnitRunner])
class FunctionPullingContainerProxyTests
extends TestKit(ActorSystem("FunctionPullingContainerProxy"))
with ImplicitSender
with FlatSpecLike
with Matchers
with MockFactory
with BeforeAndAfterAll
with StreamLogging {
override def afterAll: Unit = {
client.close()
TestKit.shutdownActorSystem(system)
super.afterAll()
}
implicit val mat = ActorMaterializer()
implicit val ece: ExecutionContextExecutor = system.dispatcher
val timeout = 20.seconds
val longTimeout = timeout * 2
val log = logging
val defaultUserMemory: ByteSize = 1024.MB
val invocationNamespace = EntityName("invocationSpace")
val schedulerHost = "127.17.0.1"
val rpcPort = 13001
val neverMatchNamespace = EntityName("neverMatchNamespace")
val uuid = UUID()
val poolConfig =
ContainerPoolConfig(
2.MB,
0.5,
false,
FiniteDuration(2, TimeUnit.SECONDS),
FiniteDuration(1, TimeUnit.MINUTES),
None,
100,
3,
false,
1.second)
val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds)
val messageTransId = TransactionId(TransactionId.testing.meta.id)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
val fqn = FullyQualifiedEntityName(action.namespace, action.name, Some(action.version))
val message = ActivationMessage(
messageTransId,
action.fullyQualifiedName(true),
action.rev,
Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret()), Set.empty),
ActivationId.generate(),
ControllerInstanceId("0"),
blocking = false,
content = None)
val entityStore = WhiskEntityStore.datastore()
val invokerHealthManager = TestProbe()
val testContainerId = ContainerId("testcontainerId")
val testLease = Lease(60, 10)
def keepAliveService: ActorRef =
system.actorOf(Props(new Actor {
override def receive: Receive = {
case GetLease =>
Thread.sleep(1000)
sender() ! testLease
}
}))
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val client: Client = {
val hostAndPorts = "172.17.0.1:2379"
Client.forEndpoints(hostAndPorts).withPlainText().build()
}
class MockEtcdClient() extends EtcdClient(client)(ece) {
val etcdStore = MutableMap[String, String]()
var putFlag = false
override def put(key: String, value: String): Future[PutResponse] = {
etcdStore.update(key, value)
putFlag = true
Future.successful(
PutResponse.newBuilder().setPrevKv(KeyValue.newBuilder().setKey(key).setValue(value).build()).build())
}
override def del(key: String): Future[DeleteRangeResponse] = {
etcdStore.remove(key)
val res = DeleteRangeResponse.getDefaultInstance
Future.successful(
DeleteRangeResponse.newBuilder().setPrevKvs(0, KeyValue.newBuilder().setKey(key).build()).build())
}
}
val initInterval = {
val now = messageTransId.meta.start.plusMillis(50) // this is the queue time for cold start
Interval(now, now.plusMillis(100))
}
val runInterval = {
val now = initInterval.end.plusMillis(75) // delay between init and run
Interval(now, now.plusMillis(200))
}
val errorInterval = {
val now = initInterval.end.plusMillis(75) // delay between init and run
Interval(now, now.plusMillis(150))
}
/** Creates a client and a factory returning this ref of the client. */
def testClient
: (TestProbe,
(ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, String, Int, ContainerId) => ActorRef) = {
val client = TestProbe()
val factory =
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: String, _: Int, _: ContainerId) =>
client.ref
(client, factory)
}
/** get WhiskAction*/
def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction {
(_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
response
}
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
(_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int, _: Option[ExecutableWhiskAction]) =>
response
}
trait LoggedAcker extends ActiveAck {
def calls =
mutable.Buffer[(TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, AcknowledegmentMessage)]()
def verifyAnnotations(activation: WhiskActivation, a: ExecutableWhiskAction) = {
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
}
}
/** Creates an inspectable version of the ack method, which records all calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = new LoggedAcker {
val acker = LoggedFunction {
(_: TransactionId, _: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: AcknowledegmentMessage) =>
Future.successful(())
}
override def calls = acker.calls
override def apply(tid: TransactionId,
activation: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any] = {
verifyAnnotations(activation, a)
acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment)
}
}
/** Creates an synchronized inspectable version of the ack method, which records all calls in a buffer */
def createAckerForNamespaceBlacklist(a: ExecutableWhiskAction = action,
mockNamespaceBlacklist: MockNamespaceBlacklist) = new LoggedAcker {
val acker = SynchronizedLoggedFunction {
(_: TransactionId, _: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: AcknowledegmentMessage) =>
Future.successful(())
}
override def calls = acker.calls
override def verifyAnnotations(activation: WhiskActivation, a: ExecutableWhiskAction): Assertion = {
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
}
override def apply(tid: TransactionId,
activation: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any] = {
verifyAnnotations(activation, a)
acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment)
}
}
/** Registers the transition callback and expects the first message */
def registerCallback(c: ActorRef, probe: TestProbe) = {
c ! SubscribeTransitionCallBack(probe.ref)
probe.expectMsg(CurrentState(c, Uninitialized))
}
def createStore = LoggedFunction {
(transid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) =>
Future.successful(())
}
def getLiveContainerCount(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.successful(count)
}
def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) = LoggedFunction { (_: String) =>
limit
}
class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit) extends Invoker.LogsCollector {
val collector = LoggedFunction {
(transid: TransactionId,
user: Identity,
activation: WhiskActivation,
container: Container,
action: ExecutableWhiskAction) =>
response
}
def calls = collector.calls
override def apply(transid: TransactionId,
user: Identity,
activation: WhiskActivation,
container: Container,
action: ExecutableWhiskAction) = {
invokeCallback()
collector(transid, user, activation, container, action)
}
}
def createCollector(response: Future[ActivationLogs] = Future.successful(ActivationLogs()),
invokeCallback: () => Unit = () => ()) =
new LoggedCollector(response, invokeCallback)
/** Expect a NeedWork message with prewarmed data */
def expectPreWarmed(kind: String, probe: TestProbe) = probe.expectMsgPF() {
case ReadyToWork(PreWarmData(_, kind, memoryLimit, _)) => true
}
/** Expect a Initialized message with prewarmed data */
def expectInitialized(probe: TestProbe) = probe.expectMsgPF() {
case Initialized(InitializedData(_, _, _, _)) => true
}
/** Pre-warms the given state-machine, assumes good cases */
def preWarm(machine: ActorRef, probe: TestProbe) = {
machine ! Start(exec, memoryLimit)
probe.expectMsg(Transition(machine, Uninitialized, CreatingContainer))
expectPreWarmed(exec.kind, probe)
probe.expectMsg(Transition(machine, CreatingContainer, ContainerCreated))
}
behavior of "FunctionPullingContainerProxy"
it should "create a prewarm container" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val probe = TestProbe()
val (_, clientFactory) = testClient
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
createAcker(),
store,
collector,
counter,
limit,
InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
factory.calls should have size 1
val (tid, name, _, _, memory, _, _) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit
}
it should "run actions to a started prewarm container with get activationMessage successfully" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, transid)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 0
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "run actions to a started prewarm container with get no activationMessage" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, RetryRequestActivation)
client.expectMsg(RequestActivation())
client.send(machine, StateTimeout)
client.send(machine, RetryRequestActivation)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
client.expectMsg(StopClientProxy)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "run actions to a cold start container with get activationMessage successfully" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
val (tid, name, _, _, memory, _, _) = factory.calls(0)
tid shouldBe TransactionId.invokerColdstart
name should fullyMatch regex """wskmyname\d+_\d+_actionSpace_actionName"""
memory shouldBe memoryLimit
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 0
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "run actions to a cold start container with get no activationMessage" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, RetryRequestActivation)
client.expectMsg(RequestActivation())
client.send(machine, StateTimeout)
client.send(machine, RetryRequestActivation)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
client.expectMsg(StopClientProxy)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "not run activations and destory the prewarm container when get client failed" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
def clientFactory(f: ActorRefFactory,
invocationNamespace: String,
fqn: FullyQualifiedEntityName,
d: DocRevision,
schedulerHost: String,
rpcPort: Int,
c: ContainerId): ActorRef = {
throw new Exception("failed to create activation client")
}
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
probe.expectMsg(ContainerRemoved(true))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "not run activations and don't create cold start container when get client failed" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
def clientFactory(f: ActorRefFactory,
invocationNamespace: String,
fqn: FullyQualifiedEntityName,
d: DocRevision,
schedulerHost: String,
rpcPort: Int,
c: ContainerId): ActorRef = {
throw new Exception("failed to create activation client")
}
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
probe.expectMsg(ContainerRemoved(true))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "destroy container proxy when the client closed in CreatingClient" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientClosed)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, CreatingClient, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "destroy container proxy when the client closed in ContainerCreated" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, ClientClosed)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "destroy container proxy when the client closed in Running" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, ClientClosed)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls.length shouldBe 2
container.destroyCount shouldBe 1
acker.calls.length shouldBe 2
store.calls.length shouldBe 2
}
}
it should "destroy container proxy when stopping due to timeout" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
machine ! StateTimeout
client.expectMsg(StopClientProxy)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
client.send(machine, ClientClosed)
probe expectTerminated machine
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "destroy container proxy even if there is no message from the client when stopping due to timeout" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
client.send(machine, StateTimeout)
client.expectMsg(StopClientProxy)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
probe expectTerminated (machine, timeout)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "destroy container proxy even Even if there is 1 container, if timeout in keep" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
client.send(machine, StateTimeout)
client.send(machine, v2.Remove)
client.expectMsg(StopClientProxy)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
probe expectTerminated (machine, timeout)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "Keep the container if live count is less than warmed container keeping count configuration" in within(
timeout) {
stream.reset()
val warmedContainerTimeout = 10.seconds
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((2, warmedContainerTimeout)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
machine ! StateTimeout
probe.expectNoMessage(warmedContainerTimeout)
probe.expectMsgAllOf(warmedContainerTimeout, ContainerRemoved(true), Transition(machine, Paused, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "Remove the container if live count is greater than warmed container keeping count configuration" in within(
timeout) {
stream.reset()
val warmedContainerTimeout = 10.seconds
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(2)
val limit = getWarmedContainerLimit(Future.successful((1, warmedContainerTimeout)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
machine ! StateTimeout
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
probe expectTerminated (machine, timeout)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory)
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instanceId,
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
probe watch machine
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
dataManagementService.expectMsg(
RegisterData(
ContainerKeys.existingContainers(
invocationNamespace.asString,
fqn,
DocRevision.empty,
Some(instanceId),
Some(testContainerId)),
""))
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
dataManagementService.expectMsgAllOf(
RegisterData(
ContainerKeys
.warmedContainers(invocationNamespace.asString, fqn, DocRevision.empty, instanceId, testContainerId),
""),
UnregisterData(
ContainerKeys.existingContainers(
invocationNamespace.asString,
fqn,
DocRevision.empty,
Some(instanceId),
Some(testContainerId))))
probe.expectMsg(Transition(machine, Pausing, Paused))
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
dataManagementService.expectMsgAllOf(
UnregisterData(
ContainerKeys
.warmedContainers(invocationNamespace.asString, fqn, DocRevision.empty, instanceId, testContainerId)),
RegisterData(
ContainerKeys.existingContainers(
invocationNamespace.asString,
fqn,
DocRevision.empty,
Some(instanceId),
Some(testContainerId)),
""))
inAnyOrder {
probe.expectMsg(Transition(machine, Paused, Running))
probe.expectMsgType[Resumed]
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 0
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}
it should "not collect logs if the log-limit is set to 0" in within(timeout) {
val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(noLogsAction.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker(noLogsAction)
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, noLogsAction, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount should be > 1
collector.calls.length shouldBe 0
container.destroyCount shouldBe 0
acker.calls.length should be > 1
store.calls.length should be > 1
}
}
it should "complete the transaction and abort if container creation fails" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val exception = new Exception()
val container = new TestContainer
val factory = createFactory(Future.failed(exception))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (_, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsgAllOf(
Transition(machine, Uninitialized, CreatingClient),
ContainerCreationFailed(exception),
ContainerRemoved(true))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 0
acker.calls.length shouldBe 0
store.calls.length shouldBe 0
}
}
it should "complete the transaction and destroy the prewarm container on a failed init" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer {
override def initialize(initializer: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom")))
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 0 // should not run the action
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
val activation = acker.calls(0)._2
activation.response shouldBe ActivationResponse.developerError("boom")
activation.annotations
.get(WhiskActivation.initTimeAnnotation)
.get
.convertTo[Int] shouldBe initInterval.duration.toMillis
store.calls.length shouldBe 1
}
}
it should "complete the transaction and destroy the cold start container on a failed init" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer {
override def initialize(initializer: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom")))
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 0 // should not run the action
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
val activation = acker.calls(0)._2
activation.response shouldBe ActivationResponse.developerError("boom")
activation.annotations
.get(WhiskActivation.initTimeAnnotation)
.get
.convertTo[Int] shouldBe initInterval.duration.toMillis
store.calls.length shouldBe 1
}
}
it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer {
override def run(parameters: JsObject,
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
acker.calls(0)._2.response shouldBe ActivationResponse.developerError("boom")
store.calls.length shouldBe 1
}
}
it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer {
override def run(parameters: JsObject,
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
atomicRunCount.incrementAndGet()
//every other run fails
if (runCount % 2 == 0) {
Future.successful((runInterval, ActivationResponse.success()))
} else {
Future.successful((errorInterval, ActivationResponse.applicationError(("boom"))))
}
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount should be > 1
collector.calls.length should be > 1
container.destroyCount shouldBe 0
acker.calls.length should be > 1
store.calls.length should be > 1
val initErrorActivation = acker.calls(0)._2
initErrorActivation.duration shouldBe Some((initInterval.duration + errorInterval.duration).toMillis)
initErrorActivation.annotations
.get(WhiskActivation.initTimeAnnotation)
.get
.convertTo[Int] shouldBe initInterval.duration.toMillis
initErrorActivation.annotations
.get(WhiskActivation.waitTimeAnnotation)
.get
.convertTo[Int] shouldBe
Interval(message.transid.meta.start, initInterval.start).duration.toMillis
val runOnlyActivation = acker.calls(1)._2
runOnlyActivation.duration shouldBe Some(runInterval.duration.toMillis)
runOnlyActivation.annotations.get(WhiskActivation.initTimeAnnotation) shouldBe empty
runOnlyActivation.annotations.get(WhiskActivation.waitTimeAnnotation).get.convertTo[Int] shouldBe {
Interval(message.transid.meta.start, runInterval.start).duration.toMillis
}
}
}
it should "complete the transaction and destroy the container if log reading failed" in {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val partialLogs = Vector("this log line made it", Messages.logFailure)
val collector = createCollector(Future.failed(LogCollectingException(ActivationLogs(partialLogs))))
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
acker.calls(0)._2.response shouldBe ActivationResponse.success()
store.calls.length shouldBe 1
store.calls(0)._2.logs shouldBe ActivationLogs(partialLogs)
}
}
it should "complete the transaction and destroy the container if log reading failed terminally" in {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector(Future.failed(new Exception))
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
acker.calls(0)._2.response shouldBe ActivationResponse.success()
store.calls.length shouldBe 1
store.calls(0)._2.logs shouldBe ActivationLogs(Vector(Messages.logFailure))
}
}
it should "save the container id to etcd when don't destroy the container" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val instance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instance,
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 0
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
dataManagementService.expectMsg(RegisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}",
""))
}
}
it should "save the container id to etcd first and delete it when destroy the container" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val instance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instance,
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
dataManagementService.expectMsg(RegisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}",
""))
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, StateTimeout)
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
client.send(machine, StateTimeout)
client.expectMsg(StopClientProxy)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
dataManagementService.expectMsg(UnregisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}"))
}
}
it should "not destroy itself when time out happens but a new activation message comes" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val instance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instance,
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
dataManagementService.expectMsg(RegisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, DocRevision.empty, Some(instance), Some(testContainerId))}",
""))
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, StateTimeout) // make container time out
client.send(machine, message)
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls.length shouldBe 2
container.destroyCount shouldBe 0
acker.calls.length shouldBe 2
store.calls.length shouldBe 2
dataManagementService.expectNoMessage()
}
}
it should "get the latest NamespaceBlacklist when NamespaceBlacklist is updated in db" in within(timeout) {
stream.reset()
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val mockNamespaceBlacklist: MockNamespaceBlacklist = new MockNamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAckerForNamespaceBlacklist(mockNamespaceBlacklist = mockNamespaceBlacklist)
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
mockNamespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
mockNamespaceBlacklist.refreshBlacklist()
//the namespace:invocationSpace will be added to namespaceBlackboxlist
mockNamespaceBlacklist.refreshBlacklist()
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, ClientCreated, Removing))
stream.toString should include(s"namespace invocationSpace was blocked in containerProxy")
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 0
container.runCount shouldBe 0
collector.calls.length shouldBe 0
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 0
}
}
it should "block further invocations after invocation space is added in the namespace blacklist" in within(timeout) {
stream.reset()
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val mockNamespaceBlacklist: MockNamespaceBlacklist = new MockNamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAckerForNamespaceBlacklist(mockNamespaceBlacklist = mockNamespaceBlacklist)
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
mockNamespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
//first refresh, the namespace:invocationSpace is not in namespaceBlacklist, so activations are executed successfully
mockNamespaceBlacklist.refreshBlacklist()
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
stream.toString should include(s"namespace ${message.user.namespace.name} is not in the namespaceBlacklist")
//refresh again, the namespace:invocationSpace will be added to namespaceBlacklist
stream.reset()
Thread.sleep(1000)
mockNamespaceBlacklist.refreshBlacklist()
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
client.expectMsg(StopClientProxy)
stream.toString should include(s"namespace ${message.user.namespace.name} was blocked in containerProxy")
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount should be >= 1
collector.calls.length should be >= 1
container.destroyCount shouldBe 1
acker.calls.length should be >= 1
store.calls.length should be >= 1
}
}
it should "not timeout when running long time action" in within(longTimeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer {
override def run(parameters: JsObject,
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
reschedule: Boolean)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
Thread.sleep((timeoutConfig.pauseGrace + 1.second).toMillis) // 6 sec actions
super.run(parameters, environment, timeout, concurrent)
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF(8.seconds) { // wait more than container running time(6 seconds)
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF(8.seconds) { // wait more than container running time(6 seconds)
case RequestActivation(Some(_), None) => true
}
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount should be > 1
collector.calls.length should be > 1
container.destroyCount shouldBe 0
acker.calls.length should be > 1
store.calls.length should be > 1
}
}
it should "start tcp ping to containers when action healthcheck enabled" in within(timeout) {
implicit val transid: TransactionId = messageTransId
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val healthchecks = healthchecksConfig(true)
val probe = TestProbe()
val tcpProbe = TestProbe()
val (client, clientFactory) = testClient
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
createAcker(),
store,
collector,
counter,
limit,
InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig,
healthchecks,
tcp = Some(tcpProbe.ref)))
registerCallback(machine, probe)
preWarm(machine, probe)
tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345)))
tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345)))
tcpProbe.expectMsg(Connect(new InetSocketAddress("0.0.0.0", 12345)))
//pings should repeat till the container goes into Running state
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, transid)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
// Receive any unhandled messages
tcpProbe.receiveWhile(3.seconds, 200.milliseconds, 10) {
case Connect(_, None, Nil, None, false) =>
true
}
tcpProbe.expectNoMessage(healthchecks.checkPeriod + 100.milliseconds)
awaitAssert {
factory.calls should have size 1
}
}
it should "reschedule the job to the queue if /init fails connection on ClientCreated" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
// test container
val initPromise = Promise[Interval]()
val container = new TestContainer(initPromise = Some(initPromise))
val factory = createFactory(Future.successful(container))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
// throw health error
initPromise.failure(ContainerHealthError(messageTransId, "intentional failure"))
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
// message should be rescheduled
val fqn = action.fullyQualifiedName(withVersion = true)
val rev = action.rev
client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message))
}
it should "reschedule the job to the queue if /run fails connection on ClientCreated" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
// test container
val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
val container = new TestContainer(runPromises = runPromises)
val factory = createFactory(Future.successful(container))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
// throw health error
runPromises.head.failure(ContainerHealthError(messageTransId, "intentional failure"))
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
// message should be rescheduled
val fqn = action.fullyQualifiedName(withVersion = true)
val rev = action.rev
client.expectMsg(ContainerWarmed)
client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message))
}
it should "reschedule the job to the queue if /run fails connection on Running" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
// test container
val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
val container = new TestContainer(runPromises = runPromises)
val factory = createFactory(Future.successful(container))
val (client, clientFactory) = testClient
val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))
registerCallback(machine, probe)
preWarm(machine, probe)
// pass first request to become Running state
runPromises(0).success(runInterval, ActivationResponse.success())
// throw health error
runPromises(1).failure(ContainerHealthError(messageTransId, "intentional failure"))
machine ! Initialize(invocationNamespace.asString, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)
probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
// message should be rescheduled
val fqn = action.fullyQualifiedName(withVersion = true)
val rev = action.rev
client.expectMsg(RescheduleActivation(invocationNamespace.asString, fqn, rev, message))
}
/**
* Implements all the good cases of a perfect run to facilitate error case overriding.
*/
class TestContainer(initPromise: Option[Promise[Interval]] = None,
runPromises: Seq[Promise[(Interval, ActivationResponse)]] = Seq.empty,
apiKeyMustBePresent: Boolean = true)
extends Container {
protected val id = testContainerId
protected[core] val addr = ContainerAddress("0.0.0.0", 12345)
protected implicit val logging: Logging = log
protected implicit val ec: ExecutionContext = system.dispatcher
override implicit protected val as: ActorSystem = system
var destroyCount = 0
var initializeCount = 0
val atomicRunCount = new AtomicInteger(0) //need atomic tracking since we will test concurrent runs
var atomicLogsCount = new AtomicInteger(0)
def runCount = atomicRunCount.get()
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
destroyCount += 1
super.destroy()
}
override def initialize(initializer: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
entity: Option[WhiskAction] = None)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
val envField = "env"
(initializer.fields - envField) shouldBe action.containerInitializer().fields - envField
timeout shouldBe action.limits.timeout.duration
val initializeEnv = initializer.fields(envField).asJsObject
initializeEnv.fields("__OW_NAMESPACE") shouldBe invocationNamespace.name.toJson
initializeEnv.fields("__OW_ACTION_NAME") shouldBe message.action.qualifiedNameWithLeadingSlash.toJson
initializeEnv.fields("__OW_ACTION_VERSION") shouldBe message.action.version.toJson
initializeEnv.fields("__OW_ACTIVATION_ID") shouldBe message.activationId.toJson
initializeEnv.fields("__OW_TRANSACTION_ID") shouldBe transid.id.toJson
val convertedAuthKey = message.user.authkey.toEnvironment.fields.map(f => ("__OW_" + f._1.toUpperCase(), f._2))
val authEnvironment = initializeEnv.fields.filterKeys(convertedAuthKey.contains)
convertedAuthKey shouldBe authEnvironment
val deadline = Instant.ofEpochMilli(initializeEnv.fields("__OW_DEADLINE").convertTo[String].toLong)
val maxDeadline = Instant.now.plusMillis(timeout.toMillis)
// The deadline should be in the future but must be smaller than or equal
// a freshly computed deadline, as they get computed slightly after each other
deadline should (be <= maxDeadline and be >= Instant.now)
initPromise.map(_.future).getOrElse(Future.successful(initInterval))
}
override def run(
parameters: JsObject,
environment: JsObject,
timeout: FiniteDuration,
concurrent: Int,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val runCount = atomicRunCount.incrementAndGet()
environment.fields("namespace") shouldBe invocationNamespace.name.toJson
environment.fields("action_name") shouldBe message.action.qualifiedNameWithLeadingSlash.toJson
environment.fields("action_version") shouldBe message.action.version.toJson
environment.fields("activation_id") shouldBe message.activationId.toJson
environment.fields("transaction_id") shouldBe transid.id.toJson
val authEnvironment = environment.fields.filterKeys(message.user.authkey.toEnvironment.fields.contains).toMap
message.user.authkey.toEnvironment shouldBe authEnvironment.toJson.asJsObject
val deadline = Instant.ofEpochMilli(environment.fields("deadline").convertTo[String].toLong)
val maxDeadline = Instant.now.plusMillis(timeout.toMillis)
// The deadline should be in the future but must be smaller than or equal
// a freshly computed deadline, as they get computed slightly after each other
deadline should (be <= maxDeadline and be >= Instant.now)
//return the future for this run (if runPromises no empty), or a default response
runPromises
.lift(runCount - 1)
.map(_.future)
.getOrElse(Future.successful((runInterval, ActivationResponse.success())))
}
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
atomicLogsCount.incrementAndGet()
Source.empty
}
}
abstract class ArtifactWhiskAuthStore extends ArtifactStore[WhiskAuth] {
override protected[core] implicit val executionContext: ExecutionContext = ece
override implicit val logging: Logging = log
override protected[core] def put(d: WhiskAuth)(implicit transid: TransactionId): Future[DocInfo] = ???
override protected[core] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = ???
override protected[core] def get[A <: WhiskAuth](doc: DocInfo,
attachmentHandler: Option[(A, Attachments.Attached) => A])(
implicit transid: TransactionId,
ma: Manifest[A]): Future[A] = ???
override protected[core] def query(table: String,
startKey: List[Any],
endKey: List[Any],
skip: Int,
limit: Int,
includeDocs: Boolean,
descending: Boolean,
reduce: Boolean,
stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] =
???
override protected[core] def count(table: String,
startKey: List[Any],
endKey: List[Any],
skip: Int,
stale: StaleParameter)(implicit transid: TransactionId): Future[Long] = ???
override protected[core] def putAndAttach[A <: WhiskAuth](d: A,
update: (A, Attachments.Attached) => A,
contentType: model.ContentType,
docStream: Source[ByteString, _],
oldAttachment: Option[Attachments.Attached])(
implicit transid: TransactionId): Future[(DocInfo, Attachments.Attached)] = ???
override protected[core] def readAttachment[T](
doc: DocInfo,
attached: Attachments.Attached,
sink: Sink[ByteString, Future[T]])(implicit transid: TransactionId): Future[T] = ???
override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] =
???
override def shutdown(): Unit = ???
}
class MockNamespaceBlacklist(authStore: AuthStore) extends NamespaceBlacklist(authStore) {
var count = 0
var blacklist: Set[String] = Set.empty
override def isBlacklisted(identity: Identity): Boolean = {
blacklist.contains(identity.namespace.name.asString)
}
override def refreshBlacklist()(implicit ec: ExecutionContext, tid: TransactionId): Future[Set[String]] = {
count += 1
if (count == 1) {
//neverMatchNamespace is in the namespaceBlacklist
blacklist = Set(neverMatchNamespace.name)
Future.successful(blacklist)
} else {
//invocationNamespace is not in the namespaceBlacklist
blacklist = Set(invocationNamespace.name)
Future.successful(blacklist)
}
}
}
}