[New Scheduler]Add CreationJobManager (#5116)

* Add CreationJobManager

* Remove unused import

* Use finite duration string for a config

* Fix tests

* Resolve rebase conflicts

* Fix tests
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index c7ec206..1755abe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -295,6 +295,7 @@
   val azBlob = "whisk.azure-blob"
 
   val schedulerMaxPeek = "whisk.scheduler.max-peek"
+  val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
 
   val whiskClusterName = "whisk.cluster.name"
 
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index fc62ef0..cc5fa49 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-whisk{
+whisk {
   # tracing configuration
   tracing {
     component = "Scheduler"
@@ -25,4 +25,8 @@
     managed-fraction: 90%
     blackbox-fraction: 10%
   }
+
+  scheduler {
+    in-progress-job-retention = "20 seconds"
+  }
 }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index e8ce26f..a940f42 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -18,7 +18,6 @@
 package org.apache.openwhisk.core.scheduler.container
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
-
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.event.Logging.InfoLevel
 import org.apache.kafka.clients.producer.RecordMetadata
@@ -33,7 +32,6 @@
 import org.apache.openwhisk.core.entity.{
   Annotations,
   ByteSize,
-  DocInfo,
   DocRevision,
   FullyQualifiedEntityName,
   InvokerInstanceId,
@@ -55,6 +53,7 @@
   ReschedulingCreationJob,
   SuccessfulCreationJob
 }
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
 import org.apache.openwhisk.core.service.{
   DeleteEvent,
   PutEvent,
@@ -570,28 +569,3 @@
 }
 
 case class NoCapacityException(msg: String) extends Exception(msg)
-
-/**
- * TODO This needs to be moved to the QueueManager component that will be added later.
- */
-object QueuePool {
-  private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
-
-  private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
-
-  private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
-
-  private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
-
-  private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
-
-  private[scheduler] def clear() = _queuePool.clear()
-
-  private[scheduler] def size = _queuePool.size
-
-  private[scheduler] def values = _queuePool.values
-
-  private[scheduler] def keys = _queuePool.keys
-}
-case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
-case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
new file mode 100644
index 0000000..2cf2a71
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
+import org.apache.openwhisk.common.{GracefulShutdown, Logging}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
+import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.scheduler.message.{
+  CreationJobState,
+  FailedCreationJob,
+  FinishCreationJob,
+  RegisterCreationJob,
+  ReschedulingCreationJob,
+  SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+case object GetPoolStatus
+
+case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)
+
+class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
+                         schedulerInstanceId: SchedulerInstanceId,
+                         dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
+    extends Actor {
+  private implicit val ec: ExecutionContext = actorSystem.dispatcher
+  private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
+  private val retryLimit = 5
+  private val retryDelayTime = 100.milliseconds
+
+  /**
+   * Store a JobEntry in local to get an alarm for key timeout
+   * It does not matter whether the information stored in Local is redundant or null.
+   * When a new JobEntry is created, it is overwritten if it is duplicated.
+   * If there is no corresponding JobEntry at the time of deletion, nothing is done.
+   */
+  protected val creationJobPool = TrieMap[CreationId, JobEntry]()
+
+  override def receive: Receive = {
+    case RegisterCreationJob(
+        ContainerCreationMessage(_, invocationNamespace, action, revision, actionMetaData, _, _, _, _, creationId)) =>
+      val isBlackboxInvocation = actionMetaData.toExecutableWhiskAction.exists(a => a.exec.pull)
+      registerJob(invocationNamespace, action, revision, creationId, isBlackboxInvocation)
+
+    case FinishCreationJob(
+        ContainerCreationAckMessage(
+          tid,
+          creationId,
+          invocationNamespace,
+          action,
+          revision,
+          actionMetaData,
+          _,
+          schedulerHost,
+          rpcPort,
+          retryCount,
+          error,
+          reason)) =>
+      if (error.isEmpty) {
+        logging.info(this, s"[$creationId] create container successfully")
+        deleteJob(
+          invocationNamespace,
+          action,
+          revision,
+          creationId,
+          SuccessfulCreationJob(creationId, invocationNamespace, action, revision))
+
+      } else {
+        val cause = reason.getOrElse("unknown reason")
+        // if exceed the retry limit or meet errors which we don't need to reschedule, make it a failure
+        if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
+          logging.error(
+            this,
+            s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
+          // Delete from pool after all retries are failed
+          deleteJob(
+            invocationNamespace,
+            action,
+            revision,
+            creationId,
+            FailedCreationJob(creationId, invocationNamespace, action, revision, error.get, cause))
+        } else {
+          // Reschedule
+          logging.error(
+            this,
+            s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
+          // Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
+          actorSystem.scheduler.scheduleOnce(retryDelayTime) {
+            context.parent ! ReschedulingCreationJob(
+              tid,
+              creationId,
+              invocationNamespace,
+              action,
+              revision,
+              actionMetaData,
+              schedulerHost,
+              rpcPort,
+              retryCount)
+          }
+        }
+      }
+
+    case GracefulShutdown =>
+      ackFeed ! GracefulShutdown
+  }
+
+  private def registerJob(invocationNamespace: String,
+                          action: FullyQualifiedEntityName,
+                          revision: DocRevision,
+                          creationId: CreationId,
+                          isBlackboxInvocation: Boolean) = {
+    creationJobPool getOrElseUpdate (creationId, {
+      val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
+      dataManagementService ! RegisterData(key, "", failoverEnabled = false)
+      JobEntry(action, createTimer(invocationNamespace, action, revision, creationId, isBlackboxInvocation))
+    })
+  }
+
+  private def deleteJob(invocationNamespace: String,
+                        action: FullyQualifiedEntityName,
+                        revision: DocRevision,
+                        creationId: CreationId,
+                        state: CreationJobState) = {
+    val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
+
+    // If there is a JobEntry, delete it.
+    creationJobPool
+      .remove(creationId)
+      .foreach(entry => {
+        sendState(state)
+        entry.timer.cancel()
+      })
+
+    dataManagementService ! UnregisterData(key)
+    Future.successful({})
+  }
+
+  private def sendState(state: CreationJobState): Unit = {
+    context.parent ! state // send state to ContainerManager
+    QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
+      case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+        memoryQueueValue.queue ! state
+      case _ =>
+        logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
+    }
+  }
+
+  protected def createTimer(invocationNamespace: String,
+                            action: FullyQualifiedEntityName,
+                            revision: DocRevision,
+                            creationId: CreationId,
+                            isBlackbox: Boolean): Cancellable = {
+    val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout
+    actorSystem.scheduler.scheduleOnce(timeout) {
+      logging.warn(
+        this,
+        s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
+      creationJobPool
+        .remove(creationId)
+        .foreach(
+          _ =>
+            sendState(
+              FailedCreationJob(
+                creationId,
+                invocationNamespace,
+                action,
+                revision,
+                ContainerCreationError.TimeoutError,
+                s"timeout waiting for the ack of $creationId after $timeout")))
+      dataManagementService ! UnregisterData(
+        inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
+    }
+  }
+
+  private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+  private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
+  private val maxActiveAcksPerPoll = 128
+  private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
+
+  def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
+    Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
+      .flatMap(Future.fromTry)
+      .flatMap { msg =>
+        // forward msg to job manager
+        self ! FinishCreationJob(msg)
+        ackFeed ! MessageFeed.Processed
+        Future.successful(())
+      }
+      .recoverWith {
+        case t =>
+          // Iff everything above failed, we have a terminal error at hand. Either the message failed
+          // to deserialize, or something threw an error where it is not expected to throw.
+          ackFeed ! MessageFeed.Processed
+          logging.error(this, s"terminal failure while processing container creation ack message: $t")
+          Future.successful(())
+      }
+  }
+}
+
+object CreationJobManager {
+  def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
+            schedulerInstanceId: SchedulerInstanceId,
+            dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
+    Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index aa68832..a8b747e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -50,6 +50,7 @@
   ReschedulingCreationJob,
   SuccessfulCreationJob
 }
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
 import org.apache.openwhisk.core.service.WatchEndpointInserted
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.junit.runner.RunWith
@@ -59,8 +60,8 @@
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
 import pureconfig.loadConfigOrThrow
 import spray.json.{JsArray, JsBoolean, JsString}
-
 import pureconfig.generic.auto._
+
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration.{FiniteDuration, _}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
new file mode 100644
index 0000000..243a58e
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.container.test
+
+import java.util.concurrent.TimeUnit
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
+import org.apache.openwhisk.core.scheduler.container._
+import org.apache.openwhisk.core.scheduler.message._
+import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
+import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContextExecutor, Future}
+
+@RunWith(classOf[JUnitRunner])
+class CreationJobManagerTests
+    extends TestKit(ActorSystem("CreationJobManager"))
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with StreamLogging {
+
+  private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
+  val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
+  implicit val ece: ExecutionContextExecutor = system.dispatcher
+  val config = new WhiskConfig(ExecManifest.requiredProperties)
+  val creationIdTest = CreationId.generate()
+  val isBlackboxInvocation = false
+
+  val testInvocationNamespace = "test-invocation-namespace"
+  val testNamespace = "test-namespace"
+  val testAction = "test-action"
+  val schedulerHost = "127.17.0.1"
+  val rpcPort = 13001
+  val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
+  val execAction = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
+  val execMetadata =
+    CodeExecMetaDataAsString(RuntimeManifest(execAction.exec.kind, ImageName("test")), entryPoint = Some("test"))
+  val revision = DocRevision("1-testRev")
+  val actionMetadata =
+    WhiskActionMetaData(
+      execAction.namespace,
+      execAction.name,
+      execMetadata,
+      execAction.parameters,
+      execAction.limits,
+      execAction.version,
+      execAction.publish,
+      execAction.annotations)
+
+  override def afterAll(): Unit = {
+    client.close()
+    QueuePool.clear()
+    TestKit.shutdownActorSystem(system)
+    super.afterAll()
+  }
+
+  override def beforeEach(): Unit = {
+    QueuePool.clear()
+  }
+
+  def feedFactory(actorRefFactory: ActorRefFactory,
+                  topic: String,
+                  maxActiveAcksPerPoll: Int,
+                  handler: Array[Byte] => Future[Unit]): ActorRef = {
+    TestProbe().ref
+  }
+
+  def createRegisterMessage(action: FullyQualifiedEntityName,
+                            revision: DocRevision,
+                            sid: SchedulerInstanceId): RegisterCreationJob = {
+    val message =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        action,
+        revision,
+        actionMetadata,
+        sid,
+        schedulerHost,
+        rpcPort,
+        creationId = creationIdTest)
+    RegisterCreationJob(message)
+  }
+
+  val action = FullyQualifiedEntityName(EntityPath("test namespace"), EntityName("actionName"))
+  val sid = SchedulerInstanceId("0")
+  val iid = InvokerInstanceId(0, userMemory = 1024.MB)
+  val testKey = inProgressContainer(testInvocationNamespace, action, revision, sid, creationIdTest)
+  val memory = 256.MB
+  val resources = Seq.empty[String]
+  val resourcesStrictPolicy = true
+
+  val registerMessage = createRegisterMessage(action, revision, sid)
+
+  val client: Client = {
+    val hostAndPorts = "172.17.0.1:2379"
+    Client.forEndpoints(hostAndPorts).withPlainText().build()
+  }
+
+  behavior of "CreationJobManager"
+
+  it should "register creation job" in {
+    val probe = TestProbe()
+
+    val manager =
+      system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+
+    manager ! registerMessage
+
+    probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+  }
+
+  it should "skip duplicated creation job" in {
+    val probe = TestProbe()
+
+    val manager =
+      system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+
+    manager ! registerMessage
+    manager ! registerMessage
+
+    probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+    probe.expectNoMessage()
+  }
+
+  def createFinishMessage(action: FullyQualifiedEntityName,
+                          revision: DocRevision,
+                          memory: ByteSize,
+                          invokerInstanceId: InvokerInstanceId,
+                          retryCount: Int = 0,
+                          error: Option[ContainerCreationError] = None): FinishCreationJob = {
+    val message =
+      ContainerCreationAckMessage(
+        TransactionId.testing,
+        creationIdTest,
+        testInvocationNamespace,
+        action,
+        revision,
+        actionMetadata,
+        invokerInstanceId,
+        schedulerHost,
+        rpcPort,
+        retryCount,
+        error)
+    FinishCreationJob(message)
+  }
+
+  def createRescheduling(finishMsg: FinishCreationJob): ReschedulingCreationJob =
+    ReschedulingCreationJob(
+      finishMsg.ack.transid,
+      finishMsg.ack.creationId,
+      finishMsg.ack.invocationNamespace,
+      finishMsg.ack.action,
+      finishMsg.ack.revision,
+      actionMetadata,
+      finishMsg.ack.schedulerHost,
+      finishMsg.ack.rpcPort,
+      finishMsg.ack.retryCount)
+
+  val normalFinish = createFinishMessage(action, revision, memory, iid, retryCount = 0)
+  val failedFinish =
+    createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.UnknownError))
+  val unrescheduleFinish =
+    createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.BlackBoxError))
+  val tooManyFinish =
+    createFinishMessage(action, revision, memory, iid, retryCount = 100, Some(ContainerCreationError.UnknownError))
+
+  it should "delete a creation job normally and send a SuccessfulCreationJob to a queue" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+    val probe = TestProbe()
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+      MemoryQueueValue(probe.ref, true))
+    jobManager ! registerMessage
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    jobManager ! normalFinish
+
+    dataManagementService.expectMsg(UnregisterData(testKey))
+
+    containerManager.expectMsg(
+      SuccessfulCreationJob(
+        normalFinish.ack.creationId,
+        normalFinish.ack.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision))
+    probe.expectMsg(
+      SuccessfulCreationJob(
+        normalFinish.ack.creationId,
+        normalFinish.ack.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision))
+  }
+
+  it should "only delete a creation job with failed msg after all retries are failed" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    jobManager ! registerMessage
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    jobManager ! failedFinish
+
+    containerManager.expectMsg(createRescheduling(failedFinish))
+
+    jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
+    dataManagementService.expectMsg(UnregisterData(testKey))
+  }
+
+  it should "delete a creation job with failed msg and send a FailedCreationJob to a queue" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+    val probe = TestProbe()
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+      MemoryQueueValue(probe.ref, true))
+
+    jobManager ! registerMessage
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    jobManager ! unrescheduleFinish
+
+    dataManagementService.expectMsg(UnregisterData(testKey))
+
+    containerManager.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.BlackBoxError,
+        "unknown reason"))
+    probe.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.BlackBoxError,
+        "unknown reason"))
+  }
+
+  it should "delete a creation job that does not exist with failed msg" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
+
+    dataManagementService.expectMsg(UnregisterData(testKey))
+  }
+
+  it should "delete a creation job with timeout" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    jobManager ! registerMessage
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    Thread.sleep(timeout.toMillis) // sleep 5s to wait for the timeout handler to be executed
+    dataManagementService.expectMsg(UnregisterData(testKey))
+    containerManager.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.TimeoutError,
+        s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
+  }
+
+  it should "increase the timeout if an action is a blackbox action" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+
+    val execMetadata =
+      BlackBoxExecMetaData(ImageName("test image"), Some("main"), native = false);
+
+    val actionMetaData = WhiskActionMetaData(
+      execAction.namespace,
+      execAction.name,
+      execMetadata,
+      execAction.parameters,
+      execAction.limits,
+      execAction.version,
+      execAction.publish,
+      execAction.annotations)
+
+    val message =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        action,
+        revision,
+        actionMetaData,
+        sid,
+        schedulerHost,
+        rpcPort,
+        creationId = creationIdTest)
+    val creationMsg = RegisterCreationJob(message)
+
+    jobManager ! creationMsg
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    // no message for timeout
+    dataManagementService.expectNoMessage(timeout)
+    Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox actions
+    dataManagementService.expectMsg(UnregisterData(testKey))
+    containerManager.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.TimeoutError,
+        s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
+  }
+
+  it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in {
+    val containerManager = TestProbe()
+    val dataManagementService = TestProbe()
+    val probe = TestProbe()
+    val jobManager =
+      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
+      MemoryQueueValue(probe.ref, true))
+
+    jobManager ! registerMessage
+
+    dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
+
+    jobManager ! tooManyFinish
+
+    dataManagementService.expectMsg(UnregisterData(testKey))
+    containerManager.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.UnknownError,
+        "unknown reason"))
+    probe.expectMsg(
+      FailedCreationJob(
+        registerMessage.msg.creationId,
+        registerMessage.msg.invocationNamespace,
+        registerMessage.msg.action,
+        registerMessage.msg.revision,
+        ContainerCreationError.UnknownError,
+        "unknown reason"))
+  }
+}