[New Scheduler] Manage memory queues in scheduler (#5118)

* Add QueueManager

* Add test configuration

* Remove deprecated functions

* Fix memory queue test

* Change string to FiniteDuration
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79112a9..33011c8 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -465,5 +465,9 @@
 
 scheduler:
   protocol: "{{ scheduler_protocol | default('http') }}"
+  maxPeek: "{{ scheduler_max_peek | default(128) }}"
+  queueManager:
+    maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
+    maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
   dataManagementService:
     retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index a710d60..463e3f3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -562,9 +562,17 @@
 
   // Time that is needed to produce message in kafka
   val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
+  val SCHEDULER_WAIT_TIME =
+    LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)
 
   def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
+  def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
+  def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
+  def SCHEDULER_QUEUE_UPDATE(reason: String) =
+    LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
+  def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
+    LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
   /*
    * General markers
    */
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 00d876e..33fe9fd 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -296,6 +296,7 @@
   val azBlob = "whisk.azure-blob"
 
   val schedulerMaxPeek = "whisk.scheduler.max-peek"
+  val schedulerQueueManager = "whisk.scheduler.queue-manager"
   val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
 
   val whiskClusterName = "whisk.cluster.name"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index ba05c17..9123747 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -60,7 +60,7 @@
                              lockedArgs: Map[String, String] = Map.empty,
                              cause: Option[ActivationId] = None,
                              traceContext: Option[Map[String, String]] = None)
-  extends Message {
+    extends Message {
 
   override def serialize = ActivationMessage.serdes.write(this).compactPrint
 
@@ -116,11 +116,11 @@
  * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
  * Right when this message is created.
  */
-case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
-                                                      response: Either[ActivationId, WhiskActivation],
-                                                      override val isSystemError: Option[Boolean],
-                                                      instance: InstanceId)
-  extends AcknowledegmentMessage(transid) {
+case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
+                                                       response: Either[ActivationId, WhiskActivation],
+                                                       override val isSystemError: Option[Boolean],
+                                                       instance: InstanceId)
+    extends AcknowledegmentMessage(transid) {
   override def messageType = "combined"
 
   override def result = Some(response)
@@ -142,11 +142,11 @@
  * phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
  * `CompletionMessage`.
  */
-case class CompletionMessage private(override val transid: TransactionId,
-                                     override val activationId: ActivationId,
-                                     override val isSystemError: Option[Boolean],
-                                     instance: InstanceId)
-  extends AcknowledegmentMessage(transid) {
+case class CompletionMessage private (override val transid: TransactionId,
+                                      override val activationId: ActivationId,
+                                      override val isSystemError: Option[Boolean],
+                                      instance: InstanceId)
+    extends AcknowledegmentMessage(transid) {
   override def messageType = "completion"
 
   override def result = None
@@ -168,8 +168,8 @@
  * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
  * Right when this message is created.
  */
-case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
-  extends AcknowledegmentMessage(transid) {
+case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
+    extends AcknowledegmentMessage(transid) {
   override def messageType = "result"
 
   override def result = Some(response)
@@ -253,7 +253,7 @@
         Left(value.convertTo[ActivationId])
 
       case _: JsObject => Right(value.convertTo[WhiskActivation])
-      case _ => deserializationError("could not read ResultMessage")
+      case _           => deserializationError("could not read ResultMessage")
     }
   }
 
@@ -296,7 +296,7 @@
 
   implicit val format = new JsonFormat[EventMessageBody] {
     def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
-      case m: Metric => m.toJson
+      case m: Metric     => m.toJson
       case a: Activation => a.toJson
     }
 
@@ -321,7 +321,7 @@
                       causedBy: Option[String],
                       size: Option[Int] = None,
                       userDefinedStatusCode: Option[Int] = None)
-  extends EventMessageBody {
+    extends EventMessageBody {
   val typeName = Activation.typeName
 
   override def serialize = toJson.compactPrint
@@ -349,12 +349,12 @@
   private implicit val durationFormat = new RootJsonFormat[Duration] {
     override def write(obj: Duration): JsValue = obj match {
       case o if o.isFinite => JsNumber(o.toMillis)
-      case _ => JsNumber.zero
+      case _               => JsNumber.zero
     }
 
     override def read(json: JsValue): Duration = json match {
       case JsNumber(n) if n <= 0 => Duration.Zero
-      case JsNumber(n) => toDuration(n.longValue)
+      case JsNumber(n)           => toDuration(n.longValue)
     }
   }
 
@@ -437,7 +437,7 @@
                         userId: UUID,
                         eventType: String,
                         timestamp: Long = System.currentTimeMillis())
-  extends Message {
+    extends Message {
   override def serialize = EventMessage.format.write(this).compactPrint
 }
 
@@ -460,7 +460,7 @@
                                   inProgressMemory: Long,
                                   tags: Seq[String],
                                   dedicatedNamespaces: Seq[String])
-  extends Message {
+    extends Message {
 
   /**
    * Serializes message to string. Must be idempotent.
@@ -502,7 +502,7 @@
 object StatusQuery
 
 case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
-  extends Message {
+    extends Message {
 
   override def serialize: String = StatusData.serdes.write(this).compactPrint
 
@@ -524,7 +524,7 @@
                                     rpcPort: Int,
                                     retryCount: Int = 0,
                                     creationId: CreationId = CreationId.generate())
-  extends ContainerMessage(transid) {
+    extends ContainerMessage(transid) {
 
   override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
 
@@ -556,7 +556,7 @@
                                     action: FullyQualifiedEntityName,
                                     revision: DocRevision,
                                     whiskActionMetaData: WhiskActionMetaData)
-  extends ContainerMessage(transid) {
+    extends ContainerMessage(transid) {
   override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
 
   override def serialize: String = toJson.compactPrint
@@ -640,17 +640,17 @@
       ZeroNamespaceLimit)
 
   private def parse(name: String) = name.toUpperCase match {
-    case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
+    case "NOAVAILABLEINVOKERSERROR"         => NoAvailableInvokersError
     case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
-    case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
-    case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
-    case "DBFETCHERROR" => DBFetchError
-    case "WHISKERROR" => WhiskError
-    case "BLACKBOXERROR" => BlackBoxError
-    case "TIMEOUTERROR" => TimeoutError
-    case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
-    case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
-    case "UNKNOWNERROR" => UnknownError
+    case "RESOURCENOTENOUGHERROR"           => ResourceNotEnoughError
+    case "NONEXECUTBLEACTIONERROR"          => NonExecutableActionError
+    case "DBFETCHERROR"                     => DBFetchError
+    case "WHISKERROR"                       => WhiskError
+    case "BLACKBOXERROR"                    => BlackBoxError
+    case "TIMEOUTERROR"                     => TimeoutError
+    case "ZERONAMESPACELIMIT"               => ZeroNamespaceLimit
+    case "TOOMANYCONCURRENTREQUESTS"        => TooManyConcurrentRequests
+    case "UNKNOWNERROR"                     => UnknownError
   }
 
   implicit val serds = new RootJsonFormat[ContainerCreationError] {
@@ -678,7 +678,7 @@
                                        retryCount: Int = 0,
                                        error: Option[ContainerCreationError] = None,
                                        reason: Option[String] = None)
-  extends Message {
+    extends Message {
 
   /**
    * Serializes message to string. Must be idempotent.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
index 258fc89..578ac0b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
@@ -123,15 +123,15 @@
 
   override def equals(that: Any): Boolean = that match {
     case t: ByteSize => compareTo(t) == 0
-    case _ => false
+    case _           => false
   }
 
   override def toString = {
     unit match {
       case SizeUnits.BYTE => s"$size B"
-      case SizeUnits.KB => s"$size KB"
-      case SizeUnits.MB => s"$size MB"
-      case SizeUnits.GB => s"$size GB"
+      case SizeUnits.KB   => s"$size KB"
+      case SizeUnits.MB   => s"$size MB"
+      case SizeUnits.GB   => s"$size GB"
     }
   }
 }
@@ -190,7 +190,7 @@
 
     def read(value: JsValue): ByteSize = value match {
       case JsString(s) => ByteSize.fromString(s)
-      case _ => deserializationError(formatError)
+      case _           => deserializationError(formatError)
     }
   }
 }
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index cc5fa49..d9c15ac 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -27,6 +27,11 @@
   }
 
   scheduler {
+    queue-manager {
+      max-scheduling-time = "20 seconds"
+      max-retries-to-get-queue = "13"
+    }
+    max-peek = "128"
     in-progress-job-retention = "20 seconds"
   }
 }
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
new file mode 100644
index 0000000..e6f3a6b
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName}
+
+import scala.concurrent.Promise
+
+// Events sent by the actor
+case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
+case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
+case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
+case object QueueRemovedCompleted
+case object FlushPulse
+
+// Events received by the actor
+case object Start
+case object VersionUpdated
+case object StopSchedulingAsOutdated
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 1b6b818..9a45f8a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -17,16 +17,580 @@
 
 package org.apache.openwhisk.core.scheduler.queue
 
-import akka.actor.ActorRef
+import java.nio.charset.StandardCharsets
+import java.time.Instant
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSelection, PoisonPill, Props}
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.WarmUp.isWarmUpAction
+import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector._
-import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.containerpool.Interval
+import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
+import org.apache.openwhisk.core.entity.{ActivationResponse => OriginActivationResponse, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
+import org.apache.openwhisk.core.service._
+import pureconfig.loadConfigOrThrow
 import spray.json.{DefaultJsonProtocol, _}
+
+import scala.collection.JavaConverters._
 import scala.collection.concurrent.TrieMap
-import scala.util.Try
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+import pureconfig.generic.auto._
 
 object QueueSize
 case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
 case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
+case class UpdateMemoryQueue(oldAction: DocInfo,
+                             newAction: FullyQualifiedEntityName,
+                             activationMessage: ActivationMessage)
+case class CreateNewQueue(activationMessage: ActivationMessage,
+                          action: FullyQualifiedEntityName,
+                          actionMetadata: WhiskActionMetaData)
+
+case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
+
+class QueueManager(
+  entityStore: ArtifactStore[WhiskEntity],
+  getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
+  etcdClient: EtcdClient,
+  schedulerEndpoints: SchedulerEndpoints,
+  schedulerId: SchedulerInstanceId,
+  dataManagementService: ActorRef,
+  watcherService: ActorRef,
+  ack: ActiveAck,
+  store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+  childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef,
+  schedulerConsumer: MessageConsumer,
+  queueManagerConfig: QueueManagerConfig = loadConfigOrThrow[QueueManagerConfig](ConfigKeys.schedulerQueueManager))(
+  implicit logging: Logging)
+    extends Actor {
+
+  val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
+
+  /** key: leader-key, value:DocRevision */
+  private val initRevisionMap = TrieMap[String, DocRevision]()
+
+  private val actorSelectionMap = TrieMap[String, ActorSelection]()
+
+  private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
+
+  private implicit val askTimeout = Timeout(5.seconds)
+  private implicit val ec = context.dispatcher
+  private implicit val system = context.system
+
+  private val watcherName = "queue-manager"
+  // watch leaders and register them into actorSelectionMap
+  watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+
+  override def receive: Receive = {
+    case request: CreateQueue if isWarmUpAction(request.fqn) =>
+      logging.info(
+        this,
+        s"The ${request.fqn} action is an action used to connect a network level connection. So drop the message without creating a queue.")
+      sender ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)
+
+    // note: action sent from the pool balancer already includes version
+    case request: CreateQueue =>
+      val receiver = sender
+      QueuePool.get(MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision))) match {
+        case Some(_) =>
+          logging.info(this, s"Queue already exist for ${request.invocationNamespace}/${request.fqn}")
+          receiver ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)
+
+        case None =>
+          logging.info(this, s"Trying to create queue for ${request.invocationNamespace}/${request.fqn}")
+          electLeaderAndCreateQueue(request, Some(receiver))
+      }
+
+    case msg: ElectionResult =>
+      msg.leadership match {
+        case Right(EtcdLeader(key, value, lease)) =>
+          leaderElectionCallbacks.remove(key).foreach { callback =>
+            callback(Right(EtcdLeader(key, value, lease)))
+          }
+
+        case Left(EtcdFollower(key, value)) =>
+          leaderElectionCallbacks.remove(key).foreach { callback =>
+            callback(Left(EtcdFollower(key, value)))
+          }
+      }
+
+    case msg: ActivationMessage =>
+      logging.info(
+        this,
+        s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
+        msg.transid)
+
+      handleActivationMessage(msg)
+
+    case UpdateMemoryQueue(oldAction, newAction, msg) =>
+      logging.info(
+        this,
+        s"[${msg.activationId}] Update the memory queue for ${newAction.namespace}/${newAction.name}, old rev: ${oldAction.rev} new rev: ${msg.revision}, activationId: ${msg.activationId.asString}")
+      implicit val transid = msg.transid
+      QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, oldAction)) match {
+        case Some(memoryQueueValue) =>
+          QueuePool.put(
+            MemoryQueueKey(msg.user.namespace.name.asString, oldAction),
+            MemoryQueueValue(memoryQueueValue.queue, false))
+          memoryQueueValue.queue ! StopSchedulingAsOutdated
+
+        case _ =>
+        // do nothing because we will anyway create a new one
+      }
+      createNewQueue(newAction, msg)
+
+    case CreateNewQueue(msg, action, actionMetaData) =>
+      val memoryQueueKey = MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision))
+      QueuePool.get(memoryQueueKey) match {
+        case Some(queue) if queue.isLeader =>
+          queue.queue ! msg
+          logging.info(this, s"Queue for action $action is already updated, skip")(msg.transid)
+        case _ =>
+          val queue =
+            childFactory(context, msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
+          queue ! VersionUpdated
+          QueuePool.put(
+            MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision)),
+            MemoryQueueValue(queue, true))
+          updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
+          queue ! msg
+          msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
+      }
+
+    // leaderKey is now optional, it becomes None when the stale queue is removed
+    case QueueRemoved(invocationNamespace, action, leaderKey) =>
+      (QueuePool.remove(MemoryQueueKey(invocationNamespace, action)), leaderKey) match {
+        case (Some(_), Some(key)) =>
+          logging.info(this, s"Remove init revision map cause queue is removed, key: ${key}")
+          initRevisionMap.remove(key)
+        case _ => // do nothing
+      }
+      sender ! QueueRemovedCompleted // notify queue that it can stop safely
+
+    // a Removed queue backed to Running
+    case QueueReactivated(invocationNamespace, action, docInfo) =>
+      QueuePool.put(MemoryQueueKey(invocationNamespace, docInfo), MemoryQueueValue(sender(), true))
+      updateInitRevisionMap(getLeaderKey(invocationNamespace, action), docInfo.rev)
+
+    // only handle prefix watcher
+    case WatchEndpointInserted(_, key, value, true) =>
+      if (key.contains("leader") && value.contains("host")) {
+        SchedulerEndpoints
+          .parse(value)
+          .map { endpoints =>
+            logging.info(this, s"Endpoint inserted, key: $key, endpoints: $endpoints")
+            actorSelectionMap.update(key, endpoints.getRemoteRef(QueueManager.actorName))
+          }
+          .recover {
+            case t =>
+              logging.error(this, s"Unexpected error $t when put leaderKey: ${key}")
+          }
+      }
+
+    // only handle prefix watcher
+    case WatchEndpointRemoved(_, key, _, true) =>
+      if (key.contains("leader")) {
+        if (actorSelectionMap.contains(key)) {
+          logging.info(this, s"Endpoint removed for key: $key")
+          actorSelectionMap.remove(key)
+        } else {
+          logging.info(this, s"Endpoint removed for key: $key but not in this scheduler")
+        }
+      }
+
+    case GracefulShutdown =>
+      logging.info(this, s"Gracefully shutdown the queue manager")
+
+      watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
+      logScheduler.cancel()
+      healthReporter ! PoisonPill
+      dataManagementService ! UnregisterData(SchedulerKeys.scheduler(schedulerId))
+
+      QueuePool.values.foreach { queueInfo =>
+        //send GracefulShutdown as the queue is not outdated
+        queueInfo.queue ! GracefulShutdown
+      }
+
+      // this is for graceful shutdown of the feed as well.
+      // When the scheduler endpoint is removed, there can be some unprocessed data in Kafka
+      // So we would wait for some time to consume all messages in Kafka
+      akka.pattern.after(5.seconds, system.scheduler) {
+        feed ! GracefulShutdown
+        Future.successful({})
+      }
+
+    case QueueSize =>
+      sender ! QueuePool.size
+
+    case StatusQuery =>
+      val poolStatus = Future.sequence {
+        QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData])
+      }
+      sender ! poolStatus
+
+    case msg =>
+      logging.error(this, s"failed to elect a leader for ${msg}")
+
+  }
+
+  private def handler(bytes: Array[Byte]): Future[Unit] = {
+    Future(
+      ActivationMessage
+        .parse(new String(bytes, StandardCharsets.UTF_8)))
+      .flatMap(Future.fromTry)
+      .flatMap { msg =>
+        if (isWarmUpAction(msg.action)) {
+          logging.info(
+            this,
+            s"The ${msg.action} action is an action used to connect a network level connection. So drop the message without executing activation")
+        } else {
+          logging.info(
+            this,
+            s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from kafka.")(
+            msg.transid)
+          handleActivationMessage(msg)
+        }
+        feed ! MessageFeed.Processed
+        Future.successful({})
+      }
+      .recover {
+        case t: DeserializationException =>
+          feed ! MessageFeed.Processed
+          logging.warn(this, s"Failed to parse message to ActivationMessage, ${t.getMessage}")
+      }
+  }
+
+  private val feed = system.actorOf(Props {
+    new MessageFeed("activation", logging, schedulerConsumer, maxPeek, 1.second, handler)
+  })
+
+  private def updateInitRevisionMap(key: String, revision: DocRevision): Unit = {
+    logging.info(this, s"Update init revision map, key: ${key}, rev: ${revision.rev}")
+    initRevisionMap.update(key, revision)
+  }
+
+  private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Any] = {
+    val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
+
+    logging.info(this, s"Create a new queue for ${newAction}, rev: ${msg.revision}")
+
+    getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty)
+      .map { actionMetaData: WhiskActionMetaData =>
+        actionMetaData.toExecutableWhiskAction match {
+          // Always use revision got from Database, there can be 2 cases for the actionMetaData.rev
+          // 1. msg.revision == actionMetaData.rev => OK
+          // 2. msg.revision != actionMetaData.rev => the msg.revision must be empty, else an mismatch error will be
+          //                                          threw, we can use the revision got from Database
+          case Some(_) =>
+            self ! CreateNewQueue(
+              msg.copy(revision = actionMetaData.rev, action = msg.action.copy(version = Some(actionMetaData.version))),
+              newAction.copy(version = Some(actionMetaData.version)),
+              actionMetaData)
+            transid.finished(this, start, s"action is updated to ${newAction.toDocId.asDocInfo(actionMetaData.rev)}")
+
+          case None =>
+            val message = s"non-executable action: ${newAction} with rev: ${msg.revision} reached queueManager"
+            transid.failed(this, start, message)
+            completeErrorActivation(msg, message)
+        }
+      }
+      .recoverWith {
+        case DocumentRevisionMismatchException(_) =>
+          logging.warn(this, s"Document revision is mismatched for ${newAction}, rev: ${msg.revision}")
+          createNewQueue(newAction, msg.copy(revision = DocRevision.empty))
+        case t =>
+          transid.failed(
+            this,
+            start,
+            s"failed to fetch action $newAction with rev: ${msg.revision}, error ${t.getMessage}")
+          completeErrorActivation(msg, t.getMessage)
+      }
+  }
+
+  private def handleActivationMessage(msg: ActivationMessage): Any = {
+    implicit val transid = msg.transid
+
+    // Drop the message that has not been scheduled for a long time
+    val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis)
+
+    if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) {
+      logging.warn(
+        this,
+        s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
+      completeErrorActivation(msg, "The activation has not been processed")
+    } else {
+      QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
+        case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+          memoryQueueValue.queue ! msg
+        case _ =>
+          val key = QueueKeys.queue(msg.user.namespace.name.asString, msg.action.copy(version = None), true)
+
+          initRevisionMap.get(key) match {
+            case Some(revision) =>
+              if (msg.revision > revision) {
+                logging.warn(
+                  this,
+                  s"[${msg.activationId}] the action version is not matched for ${msg.action.path}/${msg.action.name}, current: ${revision}, received: ${msg.revision}")
+                MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
+                val newAction = msg.action.copy(binding = None)
+
+                self ! UpdateMemoryQueue(msg.action.toDocId.asDocInfo(revision), newAction, msg)
+              } else if (msg.revision < revision) {
+                // if revision is mismatched, the action may have been updated,
+                // so try again with the latest code
+                logging.warn(
+                  this,
+                  s"[${msg.activationId}] activation message with an old revision arrived, it will be replaced with the latest revision and invoked, current: ${revision}, received: ${msg.revision}")
+                sendActivationByLeaderKey(key, msg.copy(revision = revision))
+              } else {
+                // The code will not run here under normal cases. it's for insurance
+                logging.warn(
+                  this,
+                  s"[${msg.activationId}] The code will not run here under normal cases, rev: ${msg.revision}")
+                sendActivationByLeaderKey(key, msg)
+              }
+            case None =>
+              logging.info(
+                this,
+                s"[${msg.activationId}] the key ${key} is not in the initRevisionMap. revision: ${msg.revision}")
+              sendActivationByLeaderKey(key, msg)
+          }
+      }
+    }
+  }
+
+  private def sendActivationByLeaderKey(key: String, msg: ActivationMessage)(implicit transid: TransactionId) = {
+    actorSelectionMap.get(key) match {
+      case Some(actorSelect) =>
+        actorSelect ! msg
+      case None =>
+        sendActivationToRemoteQueue(key, msg)
+    }
+  }
+
+  private def sendActivationToRemoteQueue(key: String, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Any] = {
+    logging.info(this, s"[${msg.activationId}] send activation to remote queue, key: ${key} revision: ${msg.revision}")
+
+    getQueueEndpoint(key) map { endPoint =>
+      Future(
+        SchedulerEndpoints
+          .parse(endPoint))
+        .flatMap(Future.fromTry)
+        .map(endPoint => {
+          val actorSelection = endPoint.getRemoteRef(QueueManager.actorName)
+          logging.info(this, s"add a new actor selection to a map with key: $key")
+          actorSelectionMap.update(key, actorSelection)
+          actorSelection ! msg
+        })
+        .recoverWith {
+          case t =>
+            logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})")
+            completeErrorActivation(msg, "The activation has not been processed")
+        }
+
+    } recoverWith {
+      case t =>
+        logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})")
+        completeErrorActivation(msg, "The activation has not been processed")
+    }
+  }
+
+  private def getQueueEndpoint(key: String) = {
+    retryFuture(maxRetries = queueManagerConfig.maxRetriesToGetQueue) {
+      etcdClient.get(key).map { res =>
+        res.getKvsList.asScala.headOption match {
+          case Some(kv) => kv.getValue.toStringUtf8
+          case None     => throw new Exception(s"Failed to get endpoint ($key)")
+        }
+      }
+    }
+  }
+
+  private def retryFuture[T](maxRetries: Int = 13,
+                             retries: Int = 1,
+                             factor: Float = 2.0f,
+                             initWait: Int = 1,
+                             curWait: Int = 0)(fn: => Future[T]): Future[T] = {
+    fn recoverWith {
+      case e if retries <= maxRetries =>
+        val wait =
+          if (curWait == 0) initWait
+          else Math.ceil(curWait * factor).toInt
+        akka.pattern.after(wait.milliseconds, system.scheduler) {
+          val message = s"${e.getMessage} retrying after ${wait}ms ($retries/$maxRetries)"
+          if (retries == maxRetries) {
+            // if number of retries reaches maxRetries, print warning level log
+            logging.warn(this, message)
+          } else {
+            logging.info(this, message)
+          }
+          retryFuture(maxRetries, retries + 1, factor, initWait, wait)(fn)
+        }
+    }
+  }
+
+  private def electLeaderAndCreateQueue(request: CreateQueue, receiver: Option[ActorRef] = None) = {
+    request.whiskActionMetaData.toExecutableWhiskAction match {
+      case Some(_) =>
+        val leaderKey = getLeaderKey(request)
+
+        // callback will be executed after leader election
+        leaderElectionCallbacks.get(leaderKey) match {
+          case None =>
+            dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
+            leaderElectionCallbacks.put(
+              leaderKey, {
+                case Right(EtcdLeader(_, _, _)) =>
+                  val queue = childFactory(
+                    context,
+                    request.invocationNamespace,
+                    request.fqn,
+                    request.revision,
+                    request.whiskActionMetaData)
+                  queue ! Start
+                  QueuePool.put(
+                    MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
+                    MemoryQueueValue(queue, true))
+                  updateInitRevisionMap(leaderKey, request.revision)
+                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+
+                // in case of follower, do nothing
+                case Left(EtcdFollower(_, _)) =>
+                  receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+              })
+
+          // there is already a leader election for leaderKey, so skip it
+          case Some(_) =>
+            receiver foreach (_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+        }
+
+      case None =>
+        logging.error(this, s"non-executable action: ${request.fqn} with rev: ${request.revision} reached QueueManager")
+        receiver match {
+          case Some(recipient) =>
+            recipient ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = false)
+          case None =>
+          // do nothing
+        }
+
+    }
+  }
+
+  private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
+  })
+
+  private val healthReporter = Scheduler.scheduleWaitAtLeast(1.seconds, 1.seconds) { () =>
+    val leaderCount = QueuePool.countLeader()
+    dataManagementService ! UpdateDataOnChange(
+      SchedulerKeys.scheduler(schedulerId),
+      SchedulerStates(schedulerId, leaderCount, schedulerEndpoints).serialize)
+    Future.successful({})
+  }
+
+  private def completeErrorActivation(activation: ActivationMessage, message: String): Future[Any] = {
+    val activationResponse =
+      generateFallbackActivation(activation, OriginActivationResponse.whiskError(message))
+
+    val ackMsg = if (activation.blocking) {
+      CombinedCompletionAndResultMessage(activation.transid, activationResponse, schedulerId)
+    } else {
+      CompletionMessage(activation.transid, activationResponse, schedulerId)
+    }
+
+    ack(
+      activation.transid,
+      activationResponse,
+      activation.blocking,
+      activation.rootControllerIndex,
+      activation.user.namespace.uuid,
+      ackMsg)
+      .andThen {
+        case Failure(t) =>
+          logging.error(this, s"failed to send ack due to ${t}")
+      }
+    store(activation.transid, activationResponse, UserContext(activation.user))
+  }
+
+  /** Generates an activation with zero runtime. Usually used for error cases */
+  private def generateFallbackActivation(msg: ActivationMessage,
+                                         response: OriginActivationResponse): WhiskActivation = {
+    val now = Instant.now
+    val causedBy = if (msg.causedBySequence) {
+      Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
+    } else None
+
+    val binding =
+      msg.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString)))
+
+    WhiskActivation(
+      activationId = msg.activationId,
+      namespace = msg.user.namespace.name.toPath,
+      subject = msg.user.subject,
+      cause = msg.cause,
+      name = msg.action.name,
+      version = msg.action.version.getOrElse(SemVer()),
+      start = now,
+      end = now,
+      duration = Some(0),
+      response = response,
+      annotations = {
+        Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++
+          Parameters(WhiskActivation.kindAnnotation, JsString(Exec.UNKNOWN)) ++ causedBy ++ binding
+      })
+  }
+
+  private def getLeaderKey(request: CreateQueue) = {
+    QueueKeys.queue(request.invocationNamespace, request.fqn.copy(version = None), leader = true)
+  }
+
+  private def getLeaderKey(invocationNamespace: String, fqn: FullyQualifiedEntityName) = {
+    QueueKeys.queue(invocationNamespace, fqn.copy(version = None), leader = true)
+  }
+}
+
+object QueueManager {
+  val actorName = "QueueManager"
+
+  def props(
+    entityStore: ArtifactStore[WhiskEntity],
+    getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
+    etcdClient: EtcdClient,
+    schedulerEndpoints: SchedulerEndpoints,
+    schedulerId: SchedulerInstanceId,
+    dataManagementService: ActorRef,
+    watcherService: ActorRef,
+    ack: ActiveAck,
+    store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+    childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef,
+    schedulerConsumer: MessageConsumer)(implicit logging: Logging): Props = {
+    Props(
+      new QueueManager(
+        entityStore,
+        getWhiskActionMetaData,
+        etcdClient,
+        schedulerEndpoints,
+        schedulerId,
+        dataManagementService,
+        watcherService,
+        ack,
+        store,
+        childFactory,
+        schedulerConsumer))
+  }
+}
 
 sealed trait MemoryQueueError extends Product {
   val causedBy: String
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 067a736..6b69172 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -140,6 +140,11 @@
 
     scheduler {
         protocol = "{{ scheduler.protocol }}"
+        queue-manager {
+            max-scheduling-time     = "{{ scheduler.queueManager.maxSchedulingTime }}"
+            max-retries-to-get-queue    = "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
+        }
+        max-peek = "{{ scheduler.maxPeek }}"
     }
 
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
new file mode 100644
index 0000000..399ae82
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import java.time.{Clock, Instant}
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props}
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.RangeResponse
+import common.{LoggedFunction, StreamLogging}
+import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
+import org.apache.openwhisk.core.WarmUp.warmUpAction
+import org.apache.openwhisk.core.ack.ActiveAck
+import org.apache.openwhisk.core.connector.test.TestConnector
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
+import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader}
+import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+@RunWith(classOf[JUnitRunner])
+class QueueManagerTests
+    extends TestKit(ActorSystem("QueueManager"))
+    with CommonVariable
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with StreamLogging {
+
+  override def afterAll: Unit = {
+    QueuePool.clear()
+    TestKit.shutdownActorSystem(system)
+  }
+  override def beforeEach = QueuePool.clear()
+  implicit val askTimeout = Timeout(5 seconds)
+  implicit val ec = system.dispatcher
+
+  val entityStore = WhiskEntityStore.datastore()
+
+  val schedulerId = SchedulerInstanceId("0")
+  val testQueueCreationMessage =
+    CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData)
+
+  val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080)
+  val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true)
+
+  val messageTransId = TransactionId(TransactionId.testing.meta.id)
+  val uuid = UUID()
+
+  val action = ExecutableWhiskAction(testEntityPath, testEntityName, testExec)
+  val testLeaderKey = QueueKeys.queue(testInvocationNamespace, action.fullyQualifiedName(false), true)
+
+  val activationMessage = ActivationMessage(
+    messageTransId,
+    action.fullyQualifiedName(true),
+    testDocRevision,
+    Identity(
+      Subject(),
+      Namespace(EntityName(testInvocationNamespace), uuid),
+      BasicAuthenticationAuthKey(uuid, Secret()),
+      Set.empty),
+    ActivationId.generate(),
+    ControllerInstanceId("0"),
+    blocking = false,
+    content = None)
+
+  val activationResponse = ActivationResponse(Right(activationMessage))
+
+  val ack = new ActiveAck {
+    override def apply(tid: TransactionId,
+                       activationResult: WhiskActivation,
+                       blockingInvoke: Boolean,
+                       controllerInstance: ControllerInstanceId,
+                       userId: UUID,
+                       acknowledegment: AcknowledegmentMessage): Future[Any] = {
+      Future.successful({})
+
+    }
+  }
+
+  val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] =
+    (tid: TransactionId, activationResult: WhiskActivation, contest: UserContext) => Future.successful(())
+
+  val testLeaseId = 60
+
+  val childFactory =
+    (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) =>
+      system.actorOf(Props(new Actor() {
+        override def receive: Receive = {
+          case GetActivation(_, _, _, _, _, _) =>
+            sender ! ActivationResponse(Right(activationMessage))
+        }
+      }))
+
+  def convertToMetaData(action: WhiskAction): WhiskActionMetaData = {
+    val exec = CodeExecMetaDataAsString(RuntimeManifest(action.exec.kind, ImageName("test")), entryPoint = Some("test"))
+    WhiskActionMetaData(
+      action.namespace,
+      action.name,
+      exec,
+      action.parameters,
+      action.limits,
+      action.version,
+      action.publish,
+      action.annotations)
+      .revision[WhiskActionMetaData](action.rev)
+  }
+
+  /**get WhiskActionMetaData*/
+  def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = LoggedFunction {
+    (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+      meta
+  }
+
+  val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(testDocRevision))))
+  val failedGet = getWhiskActionMetaData(Future.failed(new Exception("error")))
+
+  val watchEndpoint =
+    WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, "queue-manager", Set(PutEvent, DeleteEvent))
+
+  behavior of "QueueManager"
+
+  it should "get the remote actor ref and send the message" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val testQueueManagerActorName = "QueueManagerActorSelectionTest"
+    val watcher = TestProbe()
+
+    system.actorOf(
+      QueueManager
+        .props(
+          entityStore,
+          get,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer),
+      testQueueManagerActorName)
+
+    watcher.expectMsg(watchEndpoint)
+    val testQueueManagerPath = s"akka://QueueManager/user/${testQueueManagerActorName}"
+
+    val selected = system.actorSelection(testQueueManagerPath)
+
+    val ActorIdentity(_, Some(ref)) = (selected ? Identify(testQueueManagerPath)).mapTo[ActorIdentity].futureValue
+
+    (ref ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+  }
+
+  it should "create a queue in response to a queue creation request" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+  }
+
+  it should "not create a queue if there is already a queue for the given fqn" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+    dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager))
+
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+  }
+
+  it should "only do leader election for one time if there are more than one create queue requests incoming" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    dataManagementService.ignoreMsg {
+      case _: UpdateDataOnChange => true
+    }
+    val watcher = TestProbe()
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+
+    val probe = TestProbe()
+    queueManager.tell(testQueueCreationMessage, probe.ref)
+    queueManager.tell(testQueueCreationMessage, probe.ref)
+    queueManager.tell(testQueueCreationMessage, probe.ref)
+    queueManager.tell(testQueueCreationMessage, probe.ref)
+
+    // dataManagementService should only do one election
+    dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager))
+    dataManagementService.expectNoMessage()
+
+    // all four requests should get responses
+    probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+    probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+    probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+    probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+  }
+
+  private def getTestDataManagementService() = {
+    val dataManagementService = TestProbe()
+    dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
+      msg match {
+        case ElectLeader(key, value, _, _) =>
+          sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+          TestActor.KeepRunning
+
+        case _ =>
+          TestActor.KeepRunning
+    })
+    dataManagementService
+  }
+
+  it should "create a new MemoryQueue when the revision matches with the one in a datastore" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+    val newRevision = DocRevision("2-test-revision")
+    val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+    val newGet = getWhiskActionMetaData(
+      Future(convertToMetaData(action.copy(version = SemVer(0, 0, 2)).toWhiskAction.revision(newRevision))))
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            newGet,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    //current queue's revision is `1-test-revision`
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    probe.expectMsg(Start)
+
+    //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision)
+    val activationMessage = ActivationMessage(
+      messageTransId,
+      newFqn,
+      newRevision,
+      Identity(
+        Subject(),
+        Namespace(EntityName(testInvocationNamespace), uuid),
+        BasicAuthenticationAuthKey(uuid, Secret()),
+        Set.empty),
+      ActivationId.generate(),
+      ControllerInstanceId("0"),
+      blocking = false,
+      content = None)
+
+    queueManager ! activationMessage
+    queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue
+    probe.expectMsg(StopSchedulingAsOutdated)
+    probe.expectMsg(VersionUpdated)
+    probe.expectMsg(activationMessage)
+  }
+
+  it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+    val queueWatcher = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory,
+       _: String,
+       fqn: FullyQualifiedEntityName,
+       revision: DocRevision,
+       metadata: WhiskActionMetaData) => {
+        queueWatcher.ref ! (fqn, revision)
+        probe.ref
+      }
+
+    val newRevision = DocRevision("2-test-revision")
+    val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+    val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
+    val finalRevision = DocRevision("3-test-revision")
+    // simulate the case that action is updated again while fetch it from database
+    def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean) = {
+      if (docRevision == DocRevision.empty) {
+        Future(convertToMetaData(action.copy(version = SemVer(0, 0, 3)).toWhiskAction.revision(finalRevision)))
+      } else
+        Future.failed(DocumentRevisionMismatchException("mismatch"))
+    }
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            newGet,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    //current queue's revision is `1-test-revision`
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    queueWatcher.expectMsg((testFQN, testDocRevision))
+    probe.expectMsg(Start)
+
+    //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision)
+    val activationMessage = ActivationMessage(
+      messageTransId,
+      newFqn,
+      newRevision,
+      Identity(
+        Subject(),
+        Namespace(EntityName(testInvocationNamespace), uuid),
+        BasicAuthenticationAuthKey(uuid, Secret()),
+        Set.empty),
+      ActivationId.generate(),
+      ControllerInstanceId("0"),
+      blocking = false,
+      content = None)
+
+    queueManager ! activationMessage
+    probe.expectMsg(StopSchedulingAsOutdated)
+    queueWatcher.expectMsg((finalFqn, finalRevision))
+    probe.expectMsg(VersionUpdated)
+    probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
+  }
+
+  it should "not skip outdated activation when the revision is older than the one in a datastore" in {
+    stream.reset()
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+    val newRevision = DocRevision("2-test-revision")
+    val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(newRevision))))
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    //current queue's revision is `2-test-revision`
+    val testQueueCreationMessage =
+      CreateQueue(testInvocationNamespace, testFQN, revision = newRevision, testActionMetaData)
+
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    //the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision)
+    queueManager ! activationMessage
+
+    stream.toString should include(s"it will be replaced with the latest revision and invoked")
+  }
+
+  it should "retry to fetch queue data if etcd does not respond" in {
+    val mockEtcdClient = stub[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    dataManagementService.ignoreMsg {
+      case _: UpdateDataOnChange => true
+    }
+    val watcher = TestProbe()
+
+    (mockEtcdClient.get _) when (*) returns (Future.failed(new Exception("failed to get for some reason")))
+
+    val queueManager =
+      TestActorRef(
+        new QueueManager(
+          entityStore,
+          get,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer,
+          QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+    queueManager ! activationMessage
+    Thread.sleep(100)
+    (mockEtcdClient.get _) verify (*) repeated (3)
+  }
+
+  it should "retry to fetch queue data if there is no data in the response" in {
+    val mockEtcdClient = stub[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    dataManagementService.ignoreMsg {
+      case _: UpdateDataOnChange => true
+    }
+    val watcher = TestProbe()
+
+    val emptyResult = Future.successful(RangeResponse.newBuilder().build())
+    (mockEtcdClient.get _) when (*) returns (emptyResult)
+
+    val queueManager =
+      TestActorRef(
+        new QueueManager(
+          entityStore,
+          get,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer,
+          QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+    queueManager ! activationMessage
+    Thread.sleep(100)
+    (mockEtcdClient.get _) verify (*) repeated (3)
+  }
+
+  it should "drop the activation message that has not been scheduled for a long time" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val probe = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+
+    val ack = new ActiveAck {
+      override def apply(tid: TransactionId,
+                         activationResult: WhiskActivation,
+                         blockingInvoke: Boolean,
+                         controllerInstance: ControllerInstanceId,
+                         userId: UUID,
+                         acknowledegment: AcknowledegmentMessage): Future[Any] = {
+        Future.successful(probe.ref ! acknowledegment.isSystemError)
+      }
+    }
+
+    val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000)
+    val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+
+    val queueManager =
+      TestActorRef(
+        new QueueManager(
+          entityStore,
+          failedGet,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer,
+          QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+    // send old activation message
+    queueManager ! oldActivationMessage
+
+    // response should be whisk internal error
+    probe.expectMsg(Some(true))
+
+    stream.toString should include(s"[${activationMessage.activationId}] the activation message has not been scheduled")
+  }
+
+  it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val probe = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+
+    val ack = new ActiveAck {
+      override def apply(tid: TransactionId,
+                         activationResult: WhiskActivation,
+                         blockingInvoke: Boolean,
+                         controllerInstance: ControllerInstanceId,
+                         userId: UUID,
+                         acknowledegment: AcknowledegmentMessage): Future[Any] = {
+        Future.successful(probe.ref ! activationResult.activationId)
+      }
+    }
+
+    val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000)
+    val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+
+    val queueManager =
+      TestActorRef(
+        new QueueManager(
+          entityStore,
+          failedGet,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer,
+          QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+    // send old activation message
+    queueManager ! oldActivationMessage
+
+    // ack is no expected
+    probe.expectNoMessage(500.milliseconds)
+  }
+
+  it should "complete the error activation when the version of action is changed but fetch is failed" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+    val consumer = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+    val ack = new ActiveAck {
+      override def apply(tid: TransactionId,
+                         activationResult: WhiskActivation,
+                         blockingInvoke: Boolean,
+                         controllerInstance: ControllerInstanceId,
+                         userId: UUID,
+                         acknowledegment: AcknowledegmentMessage): Future[Any] = {
+        Future.successful(probe.ref ! activationResult.activationId)
+      }
+    }
+    val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] =
+      (_: TransactionId, activation: WhiskActivation, _: UserContext) =>
+        Future.successful(probe.ref ! activation.activationId)
+
+    val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            failedGet,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    queueManager.tell(
+      UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage),
+      consumer.ref)
+
+    probe.expectMsg(activationMessage.activationId)
+    probe.expectMsg(activationMessage.activationId)
+  }
+
+  it should "remove the queue and consumer if it receives a QueueRemoved message" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    queueManager ! QueueRemoved(
+      testInvocationNamespace,
+      testFQN.toDocId.asDocInfo(testDocRevision),
+      Some(testLeaderKey))
+
+    QueuePool.size shouldBe 0
+  }
+
+  it should "put the queue back to pool if it receives a QueueReactive message" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    QueuePool.size shouldBe 1
+
+    queueManager ! QueueRemoved(
+      testInvocationNamespace,
+      testFQN.toDocId.asDocInfo(testDocRevision),
+      Some(testLeaderKey))
+
+    QueuePool.size shouldBe 0
+
+    queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision))
+
+    QueuePool.size shouldBe 1
+  }
+
+  it should "put pool information to data management service" in {
+    val mockEtcdClient = mock[EtcdClient]
+
+    val watcher = TestProbe()
+    val dataManagementService = TestProbe()
+    val counter1 = new AtomicInteger(0)
+    val counter2 = new AtomicInteger(0)
+    val counter3 = new AtomicInteger(0)
+
+    dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
+      msg match {
+        case ElectLeader(key, value, _, _) =>
+          sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+          TestActor.KeepRunning
+
+        case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 1, schedulerEndpoint).serialize =>
+          counter1.getAndIncrement()
+          TestActor.KeepRunning
+
+        case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 2, schedulerEndpoint).serialize =>
+          counter2.getAndIncrement()
+          TestActor.KeepRunning
+
+        case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 3, schedulerEndpoint).serialize =>
+          counter3.getAndIncrement()
+          TestActor.KeepRunning
+
+        case _ =>
+          TestActor.KeepRunning
+    })
+
+    val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
+    val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    Thread.sleep(2000)
+
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn2))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true)
+
+    Thread.sleep(2000)
+
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn3))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true)
+
+    Thread.sleep(2000)
+
+    counter1.get() should be > 0
+    counter2.get() should be > 0
+    counter3.get() should be > 0
+  }
+
+  it should "not create a queue if it is a warm-up action" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val warmUpActionMetaData =
+      WhiskActionMetaData(warmUpAction.namespace.toPath, warmUpAction.name, testExecMetadata, version = semVer)
+
+    val warmUpQueueCreationMessage =
+      CreateQueue(warmUpAction.namespace.toString, warmUpAction, testDocRevision, warmUpActionMetaData)
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+
+    (queueManager ? warmUpQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      warmUpAction.namespace.toString,
+      warmUpAction,
+      true)
+  }
+
+  behavior of "zero downtime deployment"
+
+  it should "stop all memory queues and corresponding consumers when it receives graceful shutdown message" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+    val probe = TestProbe()
+    val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
+    val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+
+    // probe will watch all actors which are created by these factories
+    val childFactory =
+      (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => {
+        system.actorOf(Props(new Actor() {
+          override def receive: Receive = {
+            case GetActivation(_, _, _, _, _, _) =>
+              sender ! ActivationResponse(Right(activationMessage))
+
+            case GracefulShutdown =>
+              probe.ref ! GracefulShutdown
+          }
+        }))
+      }
+
+    val queueManager =
+      TestActorRef(
+        QueueManager.props(
+          entityStore,
+          get,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn2))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true)
+
+    (queueManager ? testQueueCreationMessage.copy(fqn = fqn3))
+      .mapTo[CreateQueueResponse]
+      .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true)
+
+    queueManager ! GracefulShutdown
+
+    probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+  }
+}