[New Scheduler] Manage memory queues in scheduler (#5118)
* Add QueueManager
* Add test configuration
* Remove deprecated functions
* Fix memory queue test
* Change string to FiniteDuration
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79112a9..33011c8 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -465,5 +465,9 @@
scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
+ maxPeek: "{{ scheduler_max_peek | default(128) }}"
+ queueManager:
+ maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
+ maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index a710d60..463e3f3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -562,9 +562,17 @@
// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
+ val SCHEDULER_WAIT_TIME =
+ LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
+ def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
+ def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
+ def SCHEDULER_QUEUE_UPDATE(reason: String) =
+ LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
+ def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
+ LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
/*
* General markers
*/
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 00d876e..33fe9fd 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -296,6 +296,7 @@
val azBlob = "whisk.azure-blob"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
+ val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
val whiskClusterName = "whisk.cluster.name"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index ba05c17..9123747 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -60,7 +60,7 @@
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
- extends Message {
+ extends Message {
override def serialize = ActivationMessage.serdes.write(this).compactPrint
@@ -116,11 +116,11 @@
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
-case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
- response: Either[ActivationId, WhiskActivation],
- override val isSystemError: Option[Boolean],
- instance: InstanceId)
- extends AcknowledegmentMessage(transid) {
+case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
+ response: Either[ActivationId, WhiskActivation],
+ override val isSystemError: Option[Boolean],
+ instance: InstanceId)
+ extends AcknowledegmentMessage(transid) {
override def messageType = "combined"
override def result = Some(response)
@@ -142,11 +142,11 @@
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
* `CompletionMessage`.
*/
-case class CompletionMessage private(override val transid: TransactionId,
- override val activationId: ActivationId,
- override val isSystemError: Option[Boolean],
- instance: InstanceId)
- extends AcknowledegmentMessage(transid) {
+case class CompletionMessage private (override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError: Option[Boolean],
+ instance: InstanceId)
+ extends AcknowledegmentMessage(transid) {
override def messageType = "completion"
override def result = None
@@ -168,8 +168,8 @@
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
-case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
- extends AcknowledegmentMessage(transid) {
+case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
+ extends AcknowledegmentMessage(transid) {
override def messageType = "result"
override def result = Some(response)
@@ -253,7 +253,7 @@
Left(value.convertTo[ActivationId])
case _: JsObject => Right(value.convertTo[WhiskActivation])
- case _ => deserializationError("could not read ResultMessage")
+ case _ => deserializationError("could not read ResultMessage")
}
}
@@ -296,7 +296,7 @@
implicit val format = new JsonFormat[EventMessageBody] {
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
- case m: Metric => m.toJson
+ case m: Metric => m.toJson
case a: Activation => a.toJson
}
@@ -321,7 +321,7 @@
causedBy: Option[String],
size: Option[Int] = None,
userDefinedStatusCode: Option[Int] = None)
- extends EventMessageBody {
+ extends EventMessageBody {
val typeName = Activation.typeName
override def serialize = toJson.compactPrint
@@ -349,12 +349,12 @@
private implicit val durationFormat = new RootJsonFormat[Duration] {
override def write(obj: Duration): JsValue = obj match {
case o if o.isFinite => JsNumber(o.toMillis)
- case _ => JsNumber.zero
+ case _ => JsNumber.zero
}
override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
- case JsNumber(n) => toDuration(n.longValue)
+ case JsNumber(n) => toDuration(n.longValue)
}
}
@@ -437,7 +437,7 @@
userId: UUID,
eventType: String,
timestamp: Long = System.currentTimeMillis())
- extends Message {
+ extends Message {
override def serialize = EventMessage.format.write(this).compactPrint
}
@@ -460,7 +460,7 @@
inProgressMemory: Long,
tags: Seq[String],
dedicatedNamespaces: Seq[String])
- extends Message {
+ extends Message {
/**
* Serializes message to string. Must be idempotent.
@@ -502,7 +502,7 @@
object StatusQuery
case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
- extends Message {
+ extends Message {
override def serialize: String = StatusData.serdes.write(this).compactPrint
@@ -524,7 +524,7 @@
rpcPort: Int,
retryCount: Int = 0,
creationId: CreationId = CreationId.generate())
- extends ContainerMessage(transid) {
+ extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
@@ -556,7 +556,7 @@
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
- extends ContainerMessage(transid) {
+ extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
override def serialize: String = toJson.compactPrint
@@ -640,17 +640,17 @@
ZeroNamespaceLimit)
private def parse(name: String) = name.toUpperCase match {
- case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
+ case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
- case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
- case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
- case "DBFETCHERROR" => DBFetchError
- case "WHISKERROR" => WhiskError
- case "BLACKBOXERROR" => BlackBoxError
- case "TIMEOUTERROR" => TimeoutError
- case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
- case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
- case "UNKNOWNERROR" => UnknownError
+ case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
+ case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
+ case "DBFETCHERROR" => DBFetchError
+ case "WHISKERROR" => WhiskError
+ case "BLACKBOXERROR" => BlackBoxError
+ case "TIMEOUTERROR" => TimeoutError
+ case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
+ case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
+ case "UNKNOWNERROR" => UnknownError
}
implicit val serds = new RootJsonFormat[ContainerCreationError] {
@@ -678,7 +678,7 @@
retryCount: Int = 0,
error: Option[ContainerCreationError] = None,
reason: Option[String] = None)
- extends Message {
+ extends Message {
/**
* Serializes message to string. Must be idempotent.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
index 258fc89..578ac0b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala
@@ -123,15 +123,15 @@
override def equals(that: Any): Boolean = that match {
case t: ByteSize => compareTo(t) == 0
- case _ => false
+ case _ => false
}
override def toString = {
unit match {
case SizeUnits.BYTE => s"$size B"
- case SizeUnits.KB => s"$size KB"
- case SizeUnits.MB => s"$size MB"
- case SizeUnits.GB => s"$size GB"
+ case SizeUnits.KB => s"$size KB"
+ case SizeUnits.MB => s"$size MB"
+ case SizeUnits.GB => s"$size GB"
}
}
}
@@ -190,7 +190,7 @@
def read(value: JsValue): ByteSize = value match {
case JsString(s) => ByteSize.fromString(s)
- case _ => deserializationError(formatError)
+ case _ => deserializationError(formatError)
}
}
}
diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf
index cc5fa49..d9c15ac 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -27,6 +27,11 @@
}
scheduler {
+ queue-manager {
+ max-scheduling-time = "20 seconds"
+ max-retries-to-get-queue = "13"
+ }
+ max-peek = "128"
in-progress-job-retention = "20 seconds"
}
}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
new file mode 100644
index 0000000..e6f3a6b
--- /dev/null
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue
+
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName}
+
+import scala.concurrent.Promise
+
+// Events sent by the actor
+case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
+case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
+case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
+case object QueueRemovedCompleted
+case object FlushPulse
+
+// Events received by the actor
+case object Start
+case object VersionUpdated
+case object StopSchedulingAsOutdated
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 1b6b818..9a45f8a 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -17,16 +17,580 @@
package org.apache.openwhisk.core.scheduler.queue
-import akka.actor.ActorRef
+import java.nio.charset.StandardCharsets
+import java.time.Instant
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSelection, PoisonPill, Props}
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.WarmUp.isWarmUpAction
+import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector._
-import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.containerpool.Interval
+import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
+import org.apache.openwhisk.core.entity.{ActivationResponse => OriginActivationResponse, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
+import org.apache.openwhisk.core.service._
+import pureconfig.loadConfigOrThrow
import spray.json.{DefaultJsonProtocol, _}
+
+import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
-import scala.util.Try
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+import pureconfig.generic.auto._
object QueueSize
case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
+case class UpdateMemoryQueue(oldAction: DocInfo,
+ newAction: FullyQualifiedEntityName,
+ activationMessage: ActivationMessage)
+case class CreateNewQueue(activationMessage: ActivationMessage,
+ action: FullyQualifiedEntityName,
+ actionMetadata: WhiskActionMetaData)
+
+case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
+
+class QueueManager(
+ entityStore: ArtifactStore[WhiskEntity],
+ getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
+ etcdClient: EtcdClient,
+ schedulerEndpoints: SchedulerEndpoints,
+ schedulerId: SchedulerInstanceId,
+ dataManagementService: ActorRef,
+ watcherService: ActorRef,
+ ack: ActiveAck,
+ store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+ childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef,
+ schedulerConsumer: MessageConsumer,
+ queueManagerConfig: QueueManagerConfig = loadConfigOrThrow[QueueManagerConfig](ConfigKeys.schedulerQueueManager))(
+ implicit logging: Logging)
+ extends Actor {
+
+ val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
+
+ /** key: leader-key, value:DocRevision */
+ private val initRevisionMap = TrieMap[String, DocRevision]()
+
+ private val actorSelectionMap = TrieMap[String, ActorSelection]()
+
+ private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
+
+ private implicit val askTimeout = Timeout(5.seconds)
+ private implicit val ec = context.dispatcher
+ private implicit val system = context.system
+
+ private val watcherName = "queue-manager"
+ // watch leaders and register them into actorSelectionMap
+ watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
+
+ override def receive: Receive = {
+ case request: CreateQueue if isWarmUpAction(request.fqn) =>
+ logging.info(
+ this,
+ s"The ${request.fqn} action is an action used to connect a network level connection. So drop the message without creating a queue.")
+ sender ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)
+
+ // note: action sent from the pool balancer already includes version
+ case request: CreateQueue =>
+ val receiver = sender
+ QueuePool.get(MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision))) match {
+ case Some(_) =>
+ logging.info(this, s"Queue already exist for ${request.invocationNamespace}/${request.fqn}")
+ receiver ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)
+
+ case None =>
+ logging.info(this, s"Trying to create queue for ${request.invocationNamespace}/${request.fqn}")
+ electLeaderAndCreateQueue(request, Some(receiver))
+ }
+
+ case msg: ElectionResult =>
+ msg.leadership match {
+ case Right(EtcdLeader(key, value, lease)) =>
+ leaderElectionCallbacks.remove(key).foreach { callback =>
+ callback(Right(EtcdLeader(key, value, lease)))
+ }
+
+ case Left(EtcdFollower(key, value)) =>
+ leaderElectionCallbacks.remove(key).foreach { callback =>
+ callback(Left(EtcdFollower(key, value)))
+ }
+ }
+
+ case msg: ActivationMessage =>
+ logging.info(
+ this,
+ s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
+ msg.transid)
+
+ handleActivationMessage(msg)
+
+ case UpdateMemoryQueue(oldAction, newAction, msg) =>
+ logging.info(
+ this,
+ s"[${msg.activationId}] Update the memory queue for ${newAction.namespace}/${newAction.name}, old rev: ${oldAction.rev} new rev: ${msg.revision}, activationId: ${msg.activationId.asString}")
+ implicit val transid = msg.transid
+ QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, oldAction)) match {
+ case Some(memoryQueueValue) =>
+ QueuePool.put(
+ MemoryQueueKey(msg.user.namespace.name.asString, oldAction),
+ MemoryQueueValue(memoryQueueValue.queue, false))
+ memoryQueueValue.queue ! StopSchedulingAsOutdated
+
+ case _ =>
+ // do nothing because we will anyway create a new one
+ }
+ createNewQueue(newAction, msg)
+
+ case CreateNewQueue(msg, action, actionMetaData) =>
+ val memoryQueueKey = MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision))
+ QueuePool.get(memoryQueueKey) match {
+ case Some(queue) if queue.isLeader =>
+ queue.queue ! msg
+ logging.info(this, s"Queue for action $action is already updated, skip")(msg.transid)
+ case _ =>
+ val queue =
+ childFactory(context, msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
+ queue ! VersionUpdated
+ QueuePool.put(
+ MemoryQueueKey(msg.user.namespace.name.asString, action.toDocId.asDocInfo(msg.revision)),
+ MemoryQueueValue(queue, true))
+ updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
+ queue ! msg
+ msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
+ }
+
+ // leaderKey is now optional, it becomes None when the stale queue is removed
+ case QueueRemoved(invocationNamespace, action, leaderKey) =>
+ (QueuePool.remove(MemoryQueueKey(invocationNamespace, action)), leaderKey) match {
+ case (Some(_), Some(key)) =>
+ logging.info(this, s"Remove init revision map cause queue is removed, key: ${key}")
+ initRevisionMap.remove(key)
+ case _ => // do nothing
+ }
+ sender ! QueueRemovedCompleted // notify queue that it can stop safely
+
+ // a Removed queue backed to Running
+ case QueueReactivated(invocationNamespace, action, docInfo) =>
+ QueuePool.put(MemoryQueueKey(invocationNamespace, docInfo), MemoryQueueValue(sender(), true))
+ updateInitRevisionMap(getLeaderKey(invocationNamespace, action), docInfo.rev)
+
+ // only handle prefix watcher
+ case WatchEndpointInserted(_, key, value, true) =>
+ if (key.contains("leader") && value.contains("host")) {
+ SchedulerEndpoints
+ .parse(value)
+ .map { endpoints =>
+ logging.info(this, s"Endpoint inserted, key: $key, endpoints: $endpoints")
+ actorSelectionMap.update(key, endpoints.getRemoteRef(QueueManager.actorName))
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error $t when put leaderKey: ${key}")
+ }
+ }
+
+ // only handle prefix watcher
+ case WatchEndpointRemoved(_, key, _, true) =>
+ if (key.contains("leader")) {
+ if (actorSelectionMap.contains(key)) {
+ logging.info(this, s"Endpoint removed for key: $key")
+ actorSelectionMap.remove(key)
+ } else {
+ logging.info(this, s"Endpoint removed for key: $key but not in this scheduler")
+ }
+ }
+
+ case GracefulShutdown =>
+ logging.info(this, s"Gracefully shutdown the queue manager")
+
+ watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
+ logScheduler.cancel()
+ healthReporter ! PoisonPill
+ dataManagementService ! UnregisterData(SchedulerKeys.scheduler(schedulerId))
+
+ QueuePool.values.foreach { queueInfo =>
+ //send GracefulShutdown as the queue is not outdated
+ queueInfo.queue ! GracefulShutdown
+ }
+
+ // this is for graceful shutdown of the feed as well.
+ // When the scheduler endpoint is removed, there can be some unprocessed data in Kafka
+ // So we would wait for some time to consume all messages in Kafka
+ akka.pattern.after(5.seconds, system.scheduler) {
+ feed ! GracefulShutdown
+ Future.successful({})
+ }
+
+ case QueueSize =>
+ sender ! QueuePool.size
+
+ case StatusQuery =>
+ val poolStatus = Future.sequence {
+ QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData])
+ }
+ sender ! poolStatus
+
+ case msg =>
+ logging.error(this, s"failed to elect a leader for ${msg}")
+
+ }
+
+ private def handler(bytes: Array[Byte]): Future[Unit] = {
+ Future(
+ ActivationMessage
+ .parse(new String(bytes, StandardCharsets.UTF_8)))
+ .flatMap(Future.fromTry)
+ .flatMap { msg =>
+ if (isWarmUpAction(msg.action)) {
+ logging.info(
+ this,
+ s"The ${msg.action} action is an action used to connect a network level connection. So drop the message without executing activation")
+ } else {
+ logging.info(
+ this,
+ s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from kafka.")(
+ msg.transid)
+ handleActivationMessage(msg)
+ }
+ feed ! MessageFeed.Processed
+ Future.successful({})
+ }
+ .recover {
+ case t: DeserializationException =>
+ feed ! MessageFeed.Processed
+ logging.warn(this, s"Failed to parse message to ActivationMessage, ${t.getMessage}")
+ }
+ }
+
+ private val feed = system.actorOf(Props {
+ new MessageFeed("activation", logging, schedulerConsumer, maxPeek, 1.second, handler)
+ })
+
+ private def updateInitRevisionMap(key: String, revision: DocRevision): Unit = {
+ logging.info(this, s"Update init revision map, key: ${key}, rev: ${revision.rev}")
+ initRevisionMap.update(key, revision)
+ }
+
+ private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)(
+ implicit transid: TransactionId): Future[Any] = {
+ val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
+
+ logging.info(this, s"Create a new queue for ${newAction}, rev: ${msg.revision}")
+
+ getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, msg.revision != DocRevision.empty)
+ .map { actionMetaData: WhiskActionMetaData =>
+ actionMetaData.toExecutableWhiskAction match {
+ // Always use revision got from Database, there can be 2 cases for the actionMetaData.rev
+ // 1. msg.revision == actionMetaData.rev => OK
+ // 2. msg.revision != actionMetaData.rev => the msg.revision must be empty, else an mismatch error will be
+ // threw, we can use the revision got from Database
+ case Some(_) =>
+ self ! CreateNewQueue(
+ msg.copy(revision = actionMetaData.rev, action = msg.action.copy(version = Some(actionMetaData.version))),
+ newAction.copy(version = Some(actionMetaData.version)),
+ actionMetaData)
+ transid.finished(this, start, s"action is updated to ${newAction.toDocId.asDocInfo(actionMetaData.rev)}")
+
+ case None =>
+ val message = s"non-executable action: ${newAction} with rev: ${msg.revision} reached queueManager"
+ transid.failed(this, start, message)
+ completeErrorActivation(msg, message)
+ }
+ }
+ .recoverWith {
+ case DocumentRevisionMismatchException(_) =>
+ logging.warn(this, s"Document revision is mismatched for ${newAction}, rev: ${msg.revision}")
+ createNewQueue(newAction, msg.copy(revision = DocRevision.empty))
+ case t =>
+ transid.failed(
+ this,
+ start,
+ s"failed to fetch action $newAction with rev: ${msg.revision}, error ${t.getMessage}")
+ completeErrorActivation(msg, t.getMessage)
+ }
+ }
+
+ private def handleActivationMessage(msg: ActivationMessage): Any = {
+ implicit val transid = msg.transid
+
+ // Drop the message that has not been scheduled for a long time
+ val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis)
+
+ if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) {
+ logging.warn(
+ this,
+ s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
+ completeErrorActivation(msg, "The activation has not been processed")
+ } else {
+ QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
+ case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
+ memoryQueueValue.queue ! msg
+ case _ =>
+ val key = QueueKeys.queue(msg.user.namespace.name.asString, msg.action.copy(version = None), true)
+
+ initRevisionMap.get(key) match {
+ case Some(revision) =>
+ if (msg.revision > revision) {
+ logging.warn(
+ this,
+ s"[${msg.activationId}] the action version is not matched for ${msg.action.path}/${msg.action.name}, current: ${revision}, received: ${msg.revision}")
+ MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
+ val newAction = msg.action.copy(binding = None)
+
+ self ! UpdateMemoryQueue(msg.action.toDocId.asDocInfo(revision), newAction, msg)
+ } else if (msg.revision < revision) {
+ // if revision is mismatched, the action may have been updated,
+ // so try again with the latest code
+ logging.warn(
+ this,
+ s"[${msg.activationId}] activation message with an old revision arrived, it will be replaced with the latest revision and invoked, current: ${revision}, received: ${msg.revision}")
+ sendActivationByLeaderKey(key, msg.copy(revision = revision))
+ } else {
+ // The code will not run here under normal cases. it's for insurance
+ logging.warn(
+ this,
+ s"[${msg.activationId}] The code will not run here under normal cases, rev: ${msg.revision}")
+ sendActivationByLeaderKey(key, msg)
+ }
+ case None =>
+ logging.info(
+ this,
+ s"[${msg.activationId}] the key ${key} is not in the initRevisionMap. revision: ${msg.revision}")
+ sendActivationByLeaderKey(key, msg)
+ }
+ }
+ }
+ }
+
+ private def sendActivationByLeaderKey(key: String, msg: ActivationMessage)(implicit transid: TransactionId) = {
+ actorSelectionMap.get(key) match {
+ case Some(actorSelect) =>
+ actorSelect ! msg
+ case None =>
+ sendActivationToRemoteQueue(key, msg)
+ }
+ }
+
+ private def sendActivationToRemoteQueue(key: String, msg: ActivationMessage)(
+ implicit transid: TransactionId): Future[Any] = {
+ logging.info(this, s"[${msg.activationId}] send activation to remote queue, key: ${key} revision: ${msg.revision}")
+
+ getQueueEndpoint(key) map { endPoint =>
+ Future(
+ SchedulerEndpoints
+ .parse(endPoint))
+ .flatMap(Future.fromTry)
+ .map(endPoint => {
+ val actorSelection = endPoint.getRemoteRef(QueueManager.actorName)
+ logging.info(this, s"add a new actor selection to a map with key: $key")
+ actorSelectionMap.update(key, actorSelection)
+ actorSelection ! msg
+ })
+ .recoverWith {
+ case t =>
+ logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})")
+ completeErrorActivation(msg, "The activation has not been processed")
+ }
+
+ } recoverWith {
+ case t =>
+ logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})")
+ completeErrorActivation(msg, "The activation has not been processed")
+ }
+ }
+
+ private def getQueueEndpoint(key: String) = {
+ retryFuture(maxRetries = queueManagerConfig.maxRetriesToGetQueue) {
+ etcdClient.get(key).map { res =>
+ res.getKvsList.asScala.headOption match {
+ case Some(kv) => kv.getValue.toStringUtf8
+ case None => throw new Exception(s"Failed to get endpoint ($key)")
+ }
+ }
+ }
+ }
+
+ private def retryFuture[T](maxRetries: Int = 13,
+ retries: Int = 1,
+ factor: Float = 2.0f,
+ initWait: Int = 1,
+ curWait: Int = 0)(fn: => Future[T]): Future[T] = {
+ fn recoverWith {
+ case e if retries <= maxRetries =>
+ val wait =
+ if (curWait == 0) initWait
+ else Math.ceil(curWait * factor).toInt
+ akka.pattern.after(wait.milliseconds, system.scheduler) {
+ val message = s"${e.getMessage} retrying after ${wait}ms ($retries/$maxRetries)"
+ if (retries == maxRetries) {
+ // if number of retries reaches maxRetries, print warning level log
+ logging.warn(this, message)
+ } else {
+ logging.info(this, message)
+ }
+ retryFuture(maxRetries, retries + 1, factor, initWait, wait)(fn)
+ }
+ }
+ }
+
+ private def electLeaderAndCreateQueue(request: CreateQueue, receiver: Option[ActorRef] = None) = {
+ request.whiskActionMetaData.toExecutableWhiskAction match {
+ case Some(_) =>
+ val leaderKey = getLeaderKey(request)
+
+ // callback will be executed after leader election
+ leaderElectionCallbacks.get(leaderKey) match {
+ case None =>
+ dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
+ leaderElectionCallbacks.put(
+ leaderKey, {
+ case Right(EtcdLeader(_, _, _)) =>
+ val queue = childFactory(
+ context,
+ request.invocationNamespace,
+ request.fqn,
+ request.revision,
+ request.whiskActionMetaData)
+ queue ! Start
+ QueuePool.put(
+ MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
+ MemoryQueueValue(queue, true))
+ updateInitRevisionMap(leaderKey, request.revision)
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+
+ // in case of follower, do nothing
+ case Left(EtcdFollower(_, _)) =>
+ receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+ })
+
+ // there is already a leader election for leaderKey, so skip it
+ case Some(_) =>
+ receiver foreach (_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
+ }
+
+ case None =>
+ logging.error(this, s"non-executable action: ${request.fqn} with rev: ${request.revision} reached QueueManager")
+ receiver match {
+ case Some(recipient) =>
+ recipient ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = false)
+ case None =>
+ // do nothing
+ }
+
+ }
+ }
+
+ private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
+ })
+
+ private val healthReporter = Scheduler.scheduleWaitAtLeast(1.seconds, 1.seconds) { () =>
+ val leaderCount = QueuePool.countLeader()
+ dataManagementService ! UpdateDataOnChange(
+ SchedulerKeys.scheduler(schedulerId),
+ SchedulerStates(schedulerId, leaderCount, schedulerEndpoints).serialize)
+ Future.successful({})
+ }
+
+ private def completeErrorActivation(activation: ActivationMessage, message: String): Future[Any] = {
+ val activationResponse =
+ generateFallbackActivation(activation, OriginActivationResponse.whiskError(message))
+
+ val ackMsg = if (activation.blocking) {
+ CombinedCompletionAndResultMessage(activation.transid, activationResponse, schedulerId)
+ } else {
+ CompletionMessage(activation.transid, activationResponse, schedulerId)
+ }
+
+ ack(
+ activation.transid,
+ activationResponse,
+ activation.blocking,
+ activation.rootControllerIndex,
+ activation.user.namespace.uuid,
+ ackMsg)
+ .andThen {
+ case Failure(t) =>
+ logging.error(this, s"failed to send ack due to ${t}")
+ }
+ store(activation.transid, activationResponse, UserContext(activation.user))
+ }
+
+ /** Generates an activation with zero runtime. Usually used for error cases */
+ private def generateFallbackActivation(msg: ActivationMessage,
+ response: OriginActivationResponse): WhiskActivation = {
+ val now = Instant.now
+ val causedBy = if (msg.causedBySequence) {
+ Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
+ } else None
+
+ val binding =
+ msg.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString)))
+
+ WhiskActivation(
+ activationId = msg.activationId,
+ namespace = msg.user.namespace.name.toPath,
+ subject = msg.user.subject,
+ cause = msg.cause,
+ name = msg.action.name,
+ version = msg.action.version.getOrElse(SemVer()),
+ start = now,
+ end = now,
+ duration = Some(0),
+ response = response,
+ annotations = {
+ Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++
+ Parameters(WhiskActivation.kindAnnotation, JsString(Exec.UNKNOWN)) ++ causedBy ++ binding
+ })
+ }
+
+ private def getLeaderKey(request: CreateQueue) = {
+ QueueKeys.queue(request.invocationNamespace, request.fqn.copy(version = None), leader = true)
+ }
+
+ private def getLeaderKey(invocationNamespace: String, fqn: FullyQualifiedEntityName) = {
+ QueueKeys.queue(invocationNamespace, fqn.copy(version = None), leader = true)
+ }
+}
+
+object QueueManager {
+ val actorName = "QueueManager"
+
+ def props(
+ entityStore: ArtifactStore[WhiskEntity],
+ getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => Future[WhiskActionMetaData],
+ etcdClient: EtcdClient,
+ schedulerEndpoints: SchedulerEndpoints,
+ schedulerId: SchedulerInstanceId,
+ dataManagementService: ActorRef,
+ watcherService: ActorRef,
+ ack: ActiveAck,
+ store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
+ childFactory: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef,
+ schedulerConsumer: MessageConsumer)(implicit logging: Logging): Props = {
+ Props(
+ new QueueManager(
+ entityStore,
+ getWhiskActionMetaData,
+ etcdClient,
+ schedulerEndpoints,
+ schedulerId,
+ dataManagementService,
+ watcherService,
+ ack,
+ store,
+ childFactory,
+ schedulerConsumer))
+ }
+}
sealed trait MemoryQueueError extends Product {
val causedBy: String
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index 067a736..6b69172 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -140,6 +140,11 @@
scheduler {
protocol = "{{ scheduler.protocol }}"
+ queue-manager {
+ max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}"
+ max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
+ }
+ max-peek = "{{ scheduler.maxPeek }}"
}
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
new file mode 100644
index 0000000..399ae82
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler.queue.test
+
+import java.time.{Clock, Instant}
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props}
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.RangeResponse
+import common.{LoggedFunction, StreamLogging}
+import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
+import org.apache.openwhisk.core.WarmUp.warmUpAction
+import org.apache.openwhisk.core.ack.ActiveAck
+import org.apache.openwhisk.core.connector.test.TestConnector
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
+import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader}
+import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
+import org.apache.openwhisk.core.scheduler.queue._
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+@RunWith(classOf[JUnitRunner])
+class QueueManagerTests
+ extends TestKit(ActorSystem("QueueManager"))
+ with CommonVariable
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with StreamLogging {
+
+ override def afterAll: Unit = {
+ QueuePool.clear()
+ TestKit.shutdownActorSystem(system)
+ }
+ override def beforeEach = QueuePool.clear()
+ implicit val askTimeout = Timeout(5 seconds)
+ implicit val ec = system.dispatcher
+
+ val entityStore = WhiskEntityStore.datastore()
+
+ val schedulerId = SchedulerInstanceId("0")
+ val testQueueCreationMessage =
+ CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData)
+
+ val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080)
+ val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true)
+
+ val messageTransId = TransactionId(TransactionId.testing.meta.id)
+ val uuid = UUID()
+
+ val action = ExecutableWhiskAction(testEntityPath, testEntityName, testExec)
+ val testLeaderKey = QueueKeys.queue(testInvocationNamespace, action.fullyQualifiedName(false), true)
+
+ val activationMessage = ActivationMessage(
+ messageTransId,
+ action.fullyQualifiedName(true),
+ testDocRevision,
+ Identity(
+ Subject(),
+ Namespace(EntityName(testInvocationNamespace), uuid),
+ BasicAuthenticationAuthKey(uuid, Secret()),
+ Set.empty),
+ ActivationId.generate(),
+ ControllerInstanceId("0"),
+ blocking = false,
+ content = None)
+
+ val activationResponse = ActivationResponse(Right(activationMessage))
+
+ val ack = new ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] = {
+ Future.successful({})
+
+ }
+ }
+
+ val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] =
+ (tid: TransactionId, activationResult: WhiskActivation, contest: UserContext) => Future.successful(())
+
+ val testLeaseId = 60
+
+ val childFactory =
+ (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) =>
+ system.actorOf(Props(new Actor() {
+ override def receive: Receive = {
+ case GetActivation(_, _, _, _, _, _) =>
+ sender ! ActivationResponse(Right(activationMessage))
+ }
+ }))
+
+ def convertToMetaData(action: WhiskAction): WhiskActionMetaData = {
+ val exec = CodeExecMetaDataAsString(RuntimeManifest(action.exec.kind, ImageName("test")), entryPoint = Some("test"))
+ WhiskActionMetaData(
+ action.namespace,
+ action.name,
+ exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations)
+ .revision[WhiskActionMetaData](action.rev)
+ }
+
+ /**get WhiskActionMetaData*/
+ def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = LoggedFunction {
+ (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+ meta
+ }
+
+ val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(testDocRevision))))
+ val failedGet = getWhiskActionMetaData(Future.failed(new Exception("error")))
+
+ val watchEndpoint =
+ WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, "queue-manager", Set(PutEvent, DeleteEvent))
+
+ behavior of "QueueManager"
+
+ it should "get the remote actor ref and send the message" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val testQueueManagerActorName = "QueueManagerActorSelectionTest"
+ val watcher = TestProbe()
+
+ system.actorOf(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer),
+ testQueueManagerActorName)
+
+ watcher.expectMsg(watchEndpoint)
+ val testQueueManagerPath = s"akka://QueueManager/user/${testQueueManagerActorName}"
+
+ val selected = system.actorSelection(testQueueManagerPath)
+
+ val ActorIdentity(_, Some(ref)) = (selected ? Identify(testQueueManagerPath)).mapTo[ActorIdentity].futureValue
+
+ (ref ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+ }
+
+ it should "create a queue in response to a queue creation request" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+ }
+
+ it should "not create a queue if there is already a queue for the given fqn" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+ dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager))
+
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ }
+
+ it should "only do leader election for one time if there are more than one create queue requests incoming" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ dataManagementService.ignoreMsg {
+ case _: UpdateDataOnChange => true
+ }
+ val watcher = TestProbe()
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+
+ val probe = TestProbe()
+ queueManager.tell(testQueueCreationMessage, probe.ref)
+ queueManager.tell(testQueueCreationMessage, probe.ref)
+ queueManager.tell(testQueueCreationMessage, probe.ref)
+ queueManager.tell(testQueueCreationMessage, probe.ref)
+
+ // dataManagementService should only do one election
+ dataManagementService.expectMsg(ElectLeader(testLeaderKey, schedulerEndpoint.serialize, queueManager))
+ dataManagementService.expectNoMessage()
+
+ // all four requests should get responses
+ probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+ probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+ probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+ probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
+ }
+
+ private def getTestDataManagementService() = {
+ val dataManagementService = TestProbe()
+ dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
+ msg match {
+ case ElectLeader(key, value, _, _) =>
+ sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+ dataManagementService
+ }
+
+ it should "create a new MemoryQueue when the revision matches with the one in a datastore" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+
+ val childFactory =
+ (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+ val newRevision = DocRevision("2-test-revision")
+ val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+ val newGet = getWhiskActionMetaData(
+ Future(convertToMetaData(action.copy(version = SemVer(0, 0, 2)).toWhiskAction.revision(newRevision))))
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ newGet,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ //current queue's revision is `1-test-revision`
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ probe.expectMsg(Start)
+
+ //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision)
+ val activationMessage = ActivationMessage(
+ messageTransId,
+ newFqn,
+ newRevision,
+ Identity(
+ Subject(),
+ Namespace(EntityName(testInvocationNamespace), uuid),
+ BasicAuthenticationAuthKey(uuid, Secret()),
+ Set.empty),
+ ActivationId.generate(),
+ ControllerInstanceId("0"),
+ blocking = false,
+ content = None)
+
+ queueManager ! activationMessage
+ queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue
+ probe.expectMsg(StopSchedulingAsOutdated)
+ probe.expectMsg(VersionUpdated)
+ probe.expectMsg(activationMessage)
+ }
+
+ it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+ val queueWatcher = TestProbe()
+
+ val childFactory =
+ (_: ActorRefFactory,
+ _: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision,
+ metadata: WhiskActionMetaData) => {
+ queueWatcher.ref ! (fqn, revision)
+ probe.ref
+ }
+
+ val newRevision = DocRevision("2-test-revision")
+ val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+ val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
+ val finalRevision = DocRevision("3-test-revision")
+ // simulate the case that action is updated again while fetch it from database
+ def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: DocRevision, fromCache: Boolean) = {
+ if (docRevision == DocRevision.empty) {
+ Future(convertToMetaData(action.copy(version = SemVer(0, 0, 3)).toWhiskAction.revision(finalRevision)))
+ } else
+ Future.failed(DocumentRevisionMismatchException("mismatch"))
+ }
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ newGet,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ //current queue's revision is `1-test-revision`
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ queueWatcher.expectMsg((testFQN, testDocRevision))
+ probe.expectMsg(Start)
+
+ //the activationMessage's revision(2-test-revision) is newer than current queue's revision(1-test-revision)
+ val activationMessage = ActivationMessage(
+ messageTransId,
+ newFqn,
+ newRevision,
+ Identity(
+ Subject(),
+ Namespace(EntityName(testInvocationNamespace), uuid),
+ BasicAuthenticationAuthKey(uuid, Secret()),
+ Set.empty),
+ ActivationId.generate(),
+ ControllerInstanceId("0"),
+ blocking = false,
+ content = None)
+
+ queueManager ! activationMessage
+ probe.expectMsg(StopSchedulingAsOutdated)
+ queueWatcher.expectMsg((finalFqn, finalRevision))
+ probe.expectMsg(VersionUpdated)
+ probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
+ }
+
+ it should "not skip outdated activation when the revision is older than the one in a datastore" in {
+ stream.reset()
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+
+ val childFactory =
+ (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+ val newRevision = DocRevision("2-test-revision")
+ val get = getWhiskActionMetaData(Future(convertToMetaData(action.toWhiskAction.revision(newRevision))))
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ //current queue's revision is `2-test-revision`
+ val testQueueCreationMessage =
+ CreateQueue(testInvocationNamespace, testFQN, revision = newRevision, testActionMetaData)
+
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ //the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision)
+ queueManager ! activationMessage
+
+ stream.toString should include(s"it will be replaced with the latest revision and invoked")
+ }
+
+ it should "retry to fetch queue data if etcd does not respond" in {
+ val mockEtcdClient = stub[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ dataManagementService.ignoreMsg {
+ case _: UpdateDataOnChange => true
+ }
+ val watcher = TestProbe()
+
+ (mockEtcdClient.get _) when (*) returns (Future.failed(new Exception("failed to get for some reason")))
+
+ val queueManager =
+ TestActorRef(
+ new QueueManager(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer,
+ QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+ queueManager ! activationMessage
+ Thread.sleep(100)
+ (mockEtcdClient.get _) verify (*) repeated (3)
+ }
+
+ it should "retry to fetch queue data if there is no data in the response" in {
+ val mockEtcdClient = stub[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ dataManagementService.ignoreMsg {
+ case _: UpdateDataOnChange => true
+ }
+ val watcher = TestProbe()
+
+ val emptyResult = Future.successful(RangeResponse.newBuilder().build())
+ (mockEtcdClient.get _) when (*) returns (emptyResult)
+
+ val queueManager =
+ TestActorRef(
+ new QueueManager(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer,
+ QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+ queueManager ! activationMessage
+ Thread.sleep(100)
+ (mockEtcdClient.get _) verify (*) repeated (3)
+ }
+
+ it should "drop the activation message that has not been scheduled for a long time" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val probe = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+
+ val ack = new ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] = {
+ Future.successful(probe.ref ! acknowledegment.isSystemError)
+ }
+ }
+
+ val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000)
+ val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+
+ val queueManager =
+ TestActorRef(
+ new QueueManager(
+ entityStore,
+ failedGet,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer,
+ QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+ // send old activation message
+ queueManager ! oldActivationMessage
+
+ // response should be whisk internal error
+ probe.expectMsg(Some(true))
+
+ stream.toString should include(s"[${activationMessage.activationId}] the activation message has not been scheduled")
+ }
+
+ it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val probe = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+
+ val ack = new ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] = {
+ Future.successful(probe.ref ! activationResult.activationId)
+ }
+ }
+
+ val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000)
+ val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+
+ val queueManager =
+ TestActorRef(
+ new QueueManager(
+ entityStore,
+ failedGet,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer,
+ QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+ // send old activation message
+ queueManager ! oldActivationMessage
+
+ // ack is no expected
+ probe.expectNoMessage(500.milliseconds)
+ }
+
+ it should "complete the error activation when the version of action is changed but fetch is failed" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+ val consumer = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+ val ack = new ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] = {
+ Future.successful(probe.ref ! activationResult.activationId)
+ }
+ }
+ val store: (TransactionId, WhiskActivation, UserContext) => Future[Any] =
+ (_: TransactionId, activation: WhiskActivation, _: UserContext) =>
+ Future.successful(probe.ref ! activation.activationId)
+
+ val newFqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 2)))
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ failedGet,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ queueManager.tell(
+ UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage),
+ consumer.ref)
+
+ probe.expectMsg(activationMessage.activationId)
+ probe.expectMsg(activationMessage.activationId)
+ }
+
+ it should "remove the queue and consumer if it receives a QueueRemoved message" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ queueManager ! QueueRemoved(
+ testInvocationNamespace,
+ testFQN.toDocId.asDocInfo(testDocRevision),
+ Some(testLeaderKey))
+
+ QueuePool.size shouldBe 0
+ }
+
+ it should "put the queue back to pool if it receives a QueueReactive message" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ QueuePool.size shouldBe 1
+
+ queueManager ! QueueRemoved(
+ testInvocationNamespace,
+ testFQN.toDocId.asDocInfo(testDocRevision),
+ Some(testLeaderKey))
+
+ QueuePool.size shouldBe 0
+
+ queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision))
+
+ QueuePool.size shouldBe 1
+ }
+
+ it should "put pool information to data management service" in {
+ val mockEtcdClient = mock[EtcdClient]
+
+ val watcher = TestProbe()
+ val dataManagementService = TestProbe()
+ val counter1 = new AtomicInteger(0)
+ val counter2 = new AtomicInteger(0)
+ val counter3 = new AtomicInteger(0)
+
+ dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
+ msg match {
+ case ElectLeader(key, value, _, _) =>
+ sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+ TestActor.KeepRunning
+
+ case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 1, schedulerEndpoint).serialize =>
+ counter1.getAndIncrement()
+ TestActor.KeepRunning
+
+ case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 2, schedulerEndpoint).serialize =>
+ counter2.getAndIncrement()
+ TestActor.KeepRunning
+
+ case UpdateDataOnChange(_, value) if value == SchedulerStates(schedulerId, 3, schedulerEndpoint).serialize =>
+ counter3.getAndIncrement()
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+
+ val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
+ val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ Thread.sleep(2000)
+
+ (queueManager ? testQueueCreationMessage.copy(fqn = fqn2))
+ .mapTo[CreateQueueResponse]
+ .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true)
+
+ Thread.sleep(2000)
+
+ (queueManager ? testQueueCreationMessage.copy(fqn = fqn3))
+ .mapTo[CreateQueueResponse]
+ .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true)
+
+ Thread.sleep(2000)
+
+ counter1.get() should be > 0
+ counter2.get() should be > 0
+ counter3.get() should be > 0
+ }
+
+ it should "not create a queue if it is a warm-up action" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val warmUpActionMetaData =
+ WhiskActionMetaData(warmUpAction.namespace.toPath, warmUpAction.name, testExecMetadata, version = semVer)
+
+ val warmUpQueueCreationMessage =
+ CreateQueue(warmUpAction.namespace.toString, warmUpAction, testDocRevision, warmUpActionMetaData)
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+
+ (queueManager ? warmUpQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ warmUpAction.namespace.toString,
+ warmUpAction,
+ true)
+ }
+
+ behavior of "zero downtime deployment"
+
+ it should "stop all memory queues and corresponding consumers when it receives graceful shutdown message" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+ val probe = TestProbe()
+ val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
+ val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
+
+ // probe will watch all actors which are created by these factories
+ val childFactory =
+ (system: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => {
+ system.actorOf(Props(new Actor() {
+ override def receive: Receive = {
+ case GetActivation(_, _, _, _, _, _) =>
+ sender ! ActivationResponse(Right(activationMessage))
+
+ case GracefulShutdown =>
+ probe.ref ! GracefulShutdown
+ }
+ }))
+ }
+
+ val queueManager =
+ TestActorRef(
+ QueueManager.props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ (queueManager ? testQueueCreationMessage.copy(fqn = fqn2))
+ .mapTo[CreateQueueResponse]
+ .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn2, success = true)
+
+ (queueManager ? testQueueCreationMessage.copy(fqn = fqn3))
+ .mapTo[CreateQueueResponse]
+ .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn3, success = true)
+
+ queueManager ! GracefulShutdown
+
+ probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
+ }
+}