[New Scheduler]Add CreationJobManager (#5116)
* Add CreationJobManager
* Remove unused import
* Use finite duration string for a config
* Fix tests
* Resolve rebase conflicts
* Fix tests
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index c7ec206..1755abe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -295,6 +295,7 @@
val azBlob = "whisk.azure-blob"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
+ val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
val whiskClusterName = "whisk.cluster.name"
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index fc62ef0..cc5fa49 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -15,7 +15,7 @@
# limitations under the License.
#
-whisk{
+whisk {
# tracing configuration
tracing {
component = "Scheduler"
@@ -25,4 +25,8 @@
managed-fraction: 90%
blackbox-fraction: 10%
}
+
+ scheduler {
+ in-progress-job-retention = "20 seconds"
+ }
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index e8ce26f..a940f42 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.scheduler.container
import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
-
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import org.apache.kafka.clients.producer.RecordMetadata
@@ -33,7 +32,6 @@
import org.apache.openwhisk.core.entity.{
Annotations,
ByteSize,
- DocInfo,
DocRevision,
FullyQualifiedEntityName,
InvokerInstanceId,
@@ -55,6 +53,7 @@
ReschedulingCreationJob,
SuccessfulCreationJob
}
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
import org.apache.openwhisk.core.service.{
DeleteEvent,
PutEvent,
@@ -570,28 +569,3 @@
}
case class NoCapacityException(msg: String) extends Exception(msg)
-
-/**
- * TODO This needs to be moved to the QueueManager component that will be added later.
- */
-object QueuePool {
- private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
-
- private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
-
- private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
-
- private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
-
- private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
-
- private[scheduler] def clear() = _queuePool.clear()
-
- private[scheduler] def size = _queuePool.size
-
- private[scheduler] def values = _queuePool.values
-
- private[scheduler] def keys = _queuePool.keys
-}
-case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
-case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
new file mode 100644
index 0000000..2cf2a71
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
+import org.apache.openwhisk.common.{GracefulShutdown, Logging}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
+import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.scheduler.message.{
+ CreationJobState,
+ FailedCreationJob,
+ FinishCreationJob,
+ RegisterCreationJob,
+ ReschedulingCreationJob,
+ SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+case object GetPoolStatus
+
+case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)
+
+class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
+ schedulerInstanceId: SchedulerInstanceId,
+ dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
+ extends Actor {
+ private implicit val ec: ExecutionContext = actorSystem.dispatcher
+ private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
+ private val retryLimit = 5
+ private val retryDelayTime = 100.milliseconds
+
+ /**
+ * Store a JobEntry in local to get an alarm for key timeout
+ * It does not matter whether the information stored in Local is redundant or null.
+ * When a new JobEntry is created, it is overwritten if it is duplicated.
+ * If there is no corresponding JobEntry at the time of deletion, nothing is done.
+ */
+ protected val creationJobPool = TrieMap[CreationId, JobEntry]()
+
+ override def receive: Receive = {
+ case RegisterCreationJob(
+ ContainerCreationMessage(_, invocationNamespace, action, revision, actionMetaData, _, _, _, _, creationId)) =>
+ val isBlackboxInvocation = actionMetaData.toExecutableWhiskAction.exists(a => a.exec.pull)
+ registerJob(invocationNamespace, action, revision, creationId, isBlackboxInvocation)
+
+ case FinishCreationJob(
+ ContainerCreationAckMessage(
+ tid,
+ creationId,
+ invocationNamespace,
+ action,
+ revision,
+ actionMetaData,
+ _,
+ schedulerHost,
+ rpcPort,
+ retryCount,
+ error,
+ reason)) =>
+ if (error.isEmpty) {
+ logging.info(this, s"[$creationId] create container successfully")
+ deleteJob(
+ invocationNamespace,
+ action,
+ revision,
+ creationId,
+ SuccessfulCreationJob(creationId, invocationNamespace, action, revision))
+
+ } else {
+ val cause = reason.getOrElse("unknown reason")
+ // if exceed the retry limit or meet errors which we don't need to reschedule, make it a failure
+ if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
+ logging.error(
+ this,
+ s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
+ // Delete from pool after all retries are failed
+ deleteJob(
+ invocationNamespace,
+ action,
+ revision,
+ creationId,
+ FailedCreationJob(creationId, invocationNamespace, action, revision, error.get, cause))
+ } else {
+ // Reschedule
+ logging.error(
+ this,
+ s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
+ // Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
+ actorSystem.scheduler.scheduleOnce(retryDelayTime) {
+ context.parent ! ReschedulingCreationJob(
+ tid,
+ creationId,
+ invocationNamespace,
+ action,
+ revision,
+ actionMetaData,
+ schedulerHost,
+ rpcPort,
+ retryCount)
+ }
+ }
+ }
+
+ case GracefulShutdown =>
+ ackFeed ! GracefulShutdown
+ }
+
+ private def registerJob(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ creationId: CreationId,
+ isBlackboxInvocation: Boolean) = {
+ creationJobPool getOrElseUpdate (creationId, {
+ val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
+ dataManagementService ! RegisterData(key, "", failoverEnabled = false)
+ JobEntry(action, createTimer(invocationNamespace, action, revision, creationId, isBlackboxInvocation))
+ })
+ }
+
+ private def deleteJob(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ creationId: CreationId,
+ state: CreationJobState) = {
+ val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
+
+ // If there is a JobEntry, delete it.
+ creationJobPool
+ .remove(creationId)
+ .foreach(entry => {
+ sendState(state)
+ entry.timer.cancel()
+ })
+
+ dataManagementService ! UnregisterData(key)
+ Future.successful({})
+ }
+
+ private def sendState(state: CreationJobState): Unit = {
+ context.parent ! state // send state to ContainerManager
+ QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
+ case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+ memoryQueueValue.queue ! state
+ case _ =>
+ logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
+ }
+ }
+
+ protected def createTimer(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ creationId: CreationId,
+ isBlackbox: Boolean): Cancellable = {
+ val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout
+ actorSystem.scheduler.scheduleOnce(timeout) {
+ logging.warn(
+ this,
+ s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
+ creationJobPool
+ .remove(creationId)
+ .foreach(
+ _ =>
+ sendState(
+ FailedCreationJob(
+ creationId,
+ invocationNamespace,
+ action,
+ revision,
+ ContainerCreationError.TimeoutError,
+ s"timeout waiting for the ack of $creationId after $timeout")))
+ dataManagementService ! UnregisterData(
+ inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
+ }
+ }
+
+ private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+ private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
+ private val maxActiveAcksPerPoll = 128
+ private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
+
+ def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
+ Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
+ .flatMap(Future.fromTry)
+ .flatMap { msg =>
+ // forward msg to job manager
+ self ! FinishCreationJob(msg)
+ ackFeed ! MessageFeed.Processed
+ Future.successful(())
+ }
+ .recoverWith {
+ case t =>
+ // Iff everything above failed, we have a terminal error at hand. Either the message failed
+ // to deserialize, or something threw an error where it is not expected to throw.
+ ackFeed ! MessageFeed.Processed
+ logging.error(this, s"terminal failure while processing container creation ack message: $t")
+ Future.successful(())
+ }
+ }
+}
+
+object CreationJobManager {
+ def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
+ schedulerInstanceId: SchedulerInstanceId,
+ dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
+ Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index aa68832..a8b747e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -50,6 +50,7 @@
ReschedulingCreationJob,
SuccessfulCreationJob
}
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
import org.apache.openwhisk.core.service.WatchEndpointInserted
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.junit.runner.RunWith
@@ -59,8 +60,8 @@
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
import pureconfig.loadConfigOrThrow
import spray.json.{JsArray, JsBoolean, JsString}
-
import pureconfig.generic.auto._
+
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{FiniteDuration, _}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
new file mode 100644
index 0000000..243a58e
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container.test
+
+import java.util.concurrent.TimeUnit
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
+import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.message._
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
+import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContextExecutor, Future}
+
+@RunWith(classOf[JUnitRunner])
+class CreationJobManagerTests
+ extends TestKit(ActorSystem("CreationJobManager"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with StreamLogging {
+
+ private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
+ val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
+ implicit val ece: ExecutionContextExecutor = system.dispatcher
+ val config = new WhiskConfig(ExecManifest.requiredProperties)
+ val creationIdTest = CreationId.generate()
+ val isBlackboxInvocation = false
+
+ val testInvocationNamespace = "test-invocation-namespace"
+ val testNamespace = "test-namespace"
+ val testAction = "test-action"
+ val schedulerHost = "127.17.0.1"
+ val rpcPort = 13001
+ val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+ val execAction = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+ val execMetadata =
+ CodeExecMetaDataAsString(RuntimeManifest(execAction.exec.kind, ImageName("test")), entryPoint = Some("test"))
+ val revision = DocRevision("1-testRev")
+ val actionMetadata =
+ WhiskActionMetaData(
+ execAction.namespace,
+ execAction.name,
+ execMetadata,
+ execAction.parameters,
+ execAction.limits,
+ execAction.version,
+ execAction.publish,
+ execAction.annotations)
+
+ override def afterAll(): Unit = {
+ client.close()
+ QueuePool.clear()
+ TestKit.shutdownActorSystem(system)
+ super.afterAll()
+ }
+
+ override def beforeEach(): Unit = {
+ QueuePool.clear()
+ }
+
+ def feedFactory(actorRefFactory: ActorRefFactory,
+ topic: String,
+ maxActiveAcksPerPoll: Int,
+ handler: Array[Byte] => Future[Unit]): ActorRef = {
+ TestProbe().ref
+ }
+
+ def createRegisterMessage(action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ sid: SchedulerInstanceId): RegisterCreationJob = {
+ val message =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ action,
+ revision,
+ actionMetadata,
+ sid,
+ schedulerHost,
+ rpcPort,
+ creationId = creationIdTest)
+ RegisterCreationJob(message)
+ }
+
+ val action = FullyQualifiedEntityName(EntityPath("test namespace"), EntityName("actionName"))
+ val sid = SchedulerInstanceId("0")
+ val iid = InvokerInstanceId(0, userMemory = 1024.MB)
+ val testKey = inProgressContainer(testInvocationNamespace, action, revision, sid, creationIdTest)
+ val memory = 256.MB
+ val resources = Seq.empty[String]
+ val resourcesStrictPolicy = true
+
+ val registerMessage = createRegisterMessage(action, revision, sid)
+
+ val client: Client = {
+ val hostAndPorts = "172.17.0.1:2379"
+ Client.forEndpoints(hostAndPorts).withPlainText().build()
+ }
+
+ behavior of "CreationJobManager"
+
+ it should "register creation job" in {
+ val probe = TestProbe()
+
+ val manager =
+ system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+
+ manager ! registerMessage
+
+ probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+ }
+
+ it should "skip duplicated creation job" in {
+ val probe = TestProbe()
+
+ val manager =
+ system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+
+ manager ! registerMessage
+ manager ! registerMessage
+
+ probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+ probe.expectNoMessage()
+ }
+
+ def createFinishMessage(action: FullyQualifiedEntityName,
+ revision: DocRevision,
+ memory: ByteSize,
+ invokerInstanceId: InvokerInstanceId,
+ retryCount: Int = 0,
+ error: Option[ContainerCreationError] = None): FinishCreationJob = {
+ val message =
+ ContainerCreationAckMessage(
+ TransactionId.testing,
+ creationIdTest,
+ testInvocationNamespace,
+ action,
+ revision,
+ actionMetadata,
+ invokerInstanceId,
+ schedulerHost,
+ rpcPort,
+ retryCount,
+ error)
+ FinishCreationJob(message)
+ }
+
+ def createRescheduling(finishMsg: FinishCreationJob): ReschedulingCreationJob =
+ ReschedulingCreationJob(
+ finishMsg.ack.transid,
+ finishMsg.ack.creationId,
+ finishMsg.ack.invocationNamespace,
+ finishMsg.ack.action,
+ finishMsg.ack.revision,
+ actionMetadata,
+ finishMsg.ack.schedulerHost,
+ finishMsg.ack.rpcPort,
+ finishMsg.ack.retryCount)
+
+ val normalFinish = createFinishMessage(action, revision, memory, iid, retryCount = 0)
+ val failedFinish =
+ createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.UnknownError))
+ val unrescheduleFinish =
+ createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.BlackBoxError))
+ val tooManyFinish =
+ createFinishMessage(action, revision, memory, iid, retryCount = 100, Some(ContainerCreationError.UnknownError))
+
+ it should "delete a creation job normally and send a SuccessfulCreationJob to a queue" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+ val probe = TestProbe()
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+ MemoryQueueValue(probe.ref, true))
+ jobManager ! registerMessage
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ jobManager ! normalFinish
+
+ dataManagementService.expectMsg(UnregisterData(testKey))
+
+ containerManager.expectMsg(
+ SuccessfulCreationJob(
+ normalFinish.ack.creationId,
+ normalFinish.ack.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision))
+ probe.expectMsg(
+ SuccessfulCreationJob(
+ normalFinish.ack.creationId,
+ normalFinish.ack.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision))
+ }
+
+ it should "only delete a creation job with failed msg after all retries are failed" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ jobManager ! registerMessage
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ jobManager ! failedFinish
+
+ containerManager.expectMsg(createRescheduling(failedFinish))
+
+ jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
+ dataManagementService.expectMsg(UnregisterData(testKey))
+ }
+
+ it should "delete a creation job with failed msg and send a FailedCreationJob to a queue" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+ val probe = TestProbe()
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+ MemoryQueueValue(probe.ref, true))
+
+ jobManager ! registerMessage
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ jobManager ! unrescheduleFinish
+
+ dataManagementService.expectMsg(UnregisterData(testKey))
+
+ containerManager.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.BlackBoxError,
+ "unknown reason"))
+ probe.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.BlackBoxError,
+ "unknown reason"))
+ }
+
+ it should "delete a creation job that does not exist with failed msg" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
+
+ dataManagementService.expectMsg(UnregisterData(testKey))
+ }
+
+ it should "delete a creation job with timeout" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ jobManager ! registerMessage
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ Thread.sleep(timeout.toMillis) // sleep 5s to wait for the timeout handler to be executed
+ dataManagementService.expectMsg(UnregisterData(testKey))
+ containerManager.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.TimeoutError,
+ s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
+ }
+
+ it should "increase the timeout if an action is a blackbox action" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+ val execMetadata =
+ BlackBoxExecMetaData(ImageName("test image"), Some("main"), native = false);
+
+ val actionMetaData = WhiskActionMetaData(
+ execAction.namespace,
+ execAction.name,
+ execMetadata,
+ execAction.parameters,
+ execAction.limits,
+ execAction.version,
+ execAction.publish,
+ execAction.annotations)
+
+ val message =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ action,
+ revision,
+ actionMetaData,
+ sid,
+ schedulerHost,
+ rpcPort,
+ creationId = creationIdTest)
+ val creationMsg = RegisterCreationJob(message)
+
+ jobManager ! creationMsg
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ // no message for timeout
+ dataManagementService.expectNoMessage(timeout)
+ Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox actions
+ dataManagementService.expectMsg(UnregisterData(testKey))
+ containerManager.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.TimeoutError,
+ s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
+ }
+
+ it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in {
+ val containerManager = TestProbe()
+ val dataManagementService = TestProbe()
+ val probe = TestProbe()
+ val jobManager =
+ containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+ MemoryQueueValue(probe.ref, true))
+
+ jobManager ! registerMessage
+
+ dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+ jobManager ! tooManyFinish
+
+ dataManagementService.expectMsg(UnregisterData(testKey))
+ containerManager.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.UnknownError,
+ "unknown reason"))
+ probe.expectMsg(
+ FailedCreationJob(
+ registerMessage.msg.creationId,
+ registerMessage.msg.invocationNamespace,
+ registerMessage.msg.action,
+ registerMessage.msg.revision,
+ ContainerCreationError.UnknownError,
+ "unknown reason"))
+ }
+}