Add fake clock for test code (#5304)

* Add fake clock for test code

* Add test code for state timeout

* Add test case for transaction _ => Flushing

* Add StateTimeout test for Flushing state
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala
new file mode 100644
index 0000000..8acc2e1
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.time
+
+import java.time.Instant
+
+trait Clock {
+  def now(): Instant
+}
+
+object SystemClock extends Clock {
+  def now() = Instant.now()
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 312e9ec..436b53d 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -21,6 +21,7 @@
 import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
 import akka.util.Timeout
 import org.apache.openwhisk.common._
+import org.apache.openwhisk.common.time.{Clock, SystemClock}
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
@@ -121,13 +122,14 @@
                   ack: ActiveAck,
                   store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
                   getUserLimit: String => Future[Int],
-                  checkToDropStaleActivation: (Queue[TimeSeriesActivationEntry],
+                  checkToDropStaleActivation: (Clock,
+                                               Queue[TimeSeriesActivationEntry],
                                                Long,
                                                String,
                                                WhiskActionMetaData,
                                                MemoryQueueState,
                                                ActorRef) => Unit,
-                  queueConfig: QueueConfig)(implicit logging: Logging)
+                  queueConfig: QueueConfig)(implicit logging: Logging, clock: Clock)
     extends FSM[MemoryQueueState, MemoryQueueData]
     with Stash {
 
@@ -342,7 +344,7 @@
         msg.transid)
       val whiskError = isWhiskError(data.error)
       if (whiskError)
-        queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+        queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
       else
         completeErrorActivation(msg, data.reason, whiskError)
       stay() using data.copy(activeDuringFlush = true)
@@ -351,8 +353,12 @@
     // Instead, StateTimeout message will be sent by a timer.
     case Event(StateTimeout | DropOld, data: FlushingData) =>
       logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.")
-      queue =
-        MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
+      queue = MemoryQueue.dropOld(
+        clock,
+        queue,
+        Duration.ofMillis(actionRetentionTimeout),
+        data.reason,
+        completeErrorActivation)
       if (data.activeDuringFlush || queue.nonEmpty)
         stay using data.copy(activeDuringFlush = false)
       else
@@ -540,7 +546,7 @@
 
     case Event(DropOld, _) =>
       if (queue.nonEmpty && Duration
-            .between(queue.head.timestamp, Instant.now)
+            .between(queue.head.timestamp, clock.now())
             .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
         logging.error(
           this,
@@ -550,6 +556,7 @@
           s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}")
       }
       queue = MemoryQueue.dropOld(
+        clock,
         queue,
         Duration.ofMillis(actionRetentionTimeout),
         s"Activation processing is not initiated for $actionRetentionTimeout ms",
@@ -706,6 +713,7 @@
         NoData()
     }
   }
+
   private def cleanUpWatcher(): Unit = {
     watchedKeys.foreach { key =>
       watcherService ! UnwatchEndpoint(key, isPrefix = true, watcherName)
@@ -883,7 +891,14 @@
   // these schedulers will run forever and stop when the memory queue stops
   private def startMonitoring(): (ActorRef, ActorRef) = {
     val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
-      checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self)
+      checkToDropStaleActivation(
+        clock,
+        queue,
+        actionRetentionTimeout,
+        invocationNamespace,
+        actionMetaData,
+        stateName,
+        self)
       Future.successful(())
     }
 
@@ -930,7 +945,7 @@
   @tailrec
   private def getStaleActivationNum(count: Int, queue: Queue[TimeSeriesActivationEntry]): Int = {
     if (queue.isEmpty || Duration
-          .between(queue.head.timestamp, Instant.now)
+          .between(queue.head.timestamp, clock.now())
           .compareTo(StaleDuration) < 0) count
     else
       getStaleActivationNum(count + 1, queue.tail)
@@ -988,7 +1003,7 @@
         stay
       }
       .getOrElse {
-        queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+        queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
         in.decrementAndGet()
         tryEnableActionThrottling()
       }
@@ -1051,7 +1066,7 @@
 
   /** Generates an activation with zero runtime. Usually used for error cases */
   private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = {
-    val now = Instant.now
+    val now = clock.now()
     val causedBy = if (msg.causedBySequence) {
       Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
     } else None
@@ -1101,6 +1116,7 @@
             ack: ActiveAck,
             store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
             getUserLimit: String => Future[Int])(implicit logging: Logging): Props = {
+    implicit val clock: Clock = SystemClock
     Props(
       new MemoryQueue(
         etcdClient,
@@ -1126,19 +1142,21 @@
 
   @tailrec
   def dropOld(
+    clock: Clock,
     queue: Queue[TimeSeriesActivationEntry],
     retention: Duration,
     reason: String,
     completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = {
-    if (queue.isEmpty || Duration.between(queue.head.timestamp, Instant.now).compareTo(retention) < 0)
+    if (queue.isEmpty || Duration.between(queue.head.timestamp, clock.now()).compareTo(retention) < 0)
       queue
     else {
       completeErrorActivation(queue.head.msg, reason, true)
-      dropOld(queue.tail, retention, reason, completeErrorActivation)
+      dropOld(clock, queue.tail, retention, reason, completeErrorActivation)
     }
   }
 
-  def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
+  def checkToDropStaleActivation(clock: Clock,
+                                 queue: Queue[TimeSeriesActivationEntry],
                                  maxRetentionMs: Long,
                                  invocationNamespace: String,
                                  actionMetaData: WhiskActionMetaData,
@@ -1150,7 +1168,7 @@
       s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")
 
     if (queue.nonEmpty && Duration
-          .between(queue.head.timestamp, Instant.now)
+          .between(queue.head.timestamp, clock.now())
           .compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
       logging.info(
         this,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 5256832..e68de5b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -22,6 +22,7 @@
 import akka.testkit.{TestActor, TestFSMRef, TestProbe}
 import com.sksamuel.elastic4s.http.{search => _}
 import org.apache.openwhisk.common.GracefulShutdown
+import org.apache.openwhisk.common.time.{Clock, SystemClock}
 import org.apache.openwhisk.core.connector.ContainerCreationError.{NonExecutableActionError, WhiskError}
 import org.apache.openwhisk.core.connector.ContainerCreationMessage
 import org.apache.openwhisk.core.entity._
@@ -50,6 +51,17 @@
 import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
 import scala.language.postfixOps
 
+class FakeClock extends Clock {
+  var instant: Instant = Instant.now()
+  def now() = instant
+  def set(now: Instant): Unit = {
+    instant = now
+  }
+  def plusSeconds(secondsToAdd: Long): Unit = {
+    instant = instant.plusSeconds(secondsToAdd)
+  }
+}
+
 @RunWith(classOf[JUnitRunner])
 class MemoryQueueFlowTests
     extends MemoryQueueTestsFixture
@@ -75,6 +87,7 @@
   behavior of "MemoryQueueFlow"
 
   it should "normally be created and handle an activation and became idle an finally removed" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -151,11 +164,11 @@
     container.send(fsm, getActivation(false))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
 
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
 
     expectDataCleanUp(watcher, dataMgmtService)
 
@@ -168,6 +181,7 @@
   }
 
   it should "became Idle and Running again if a message arrives" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -235,7 +249,7 @@
     container.send(fsm, getActivation(false))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
 
     probe.expectMsg(Transition(fsm, Running, Idle))
 
@@ -243,7 +257,7 @@
 
     probe.expectMsg(Transition(fsm, Idle, Running))
 
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
     // since there is one message, it should not be Idle
     probe.expectNoMessage()
 
@@ -264,11 +278,11 @@
     container.send(fsm, getActivation(false))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
 
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
 
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -281,6 +295,7 @@
   }
 
   it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -367,6 +382,7 @@
   }
 
   it should "go to the NamespaceThrottled state without dropping messages and get back to the Running container" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -471,12 +487,12 @@
     container.send(fsm, getActivation(false, "testContainerId2"))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
 
     // all subsequent procedures are same with the Running case
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
 
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -489,6 +505,7 @@
   }
 
   it should "go to the ActionThrottled state when there are too many stale activations including transition to NamespaceThrottling" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -623,10 +640,10 @@
     probe.expectMsg(Transition(fsm, NamespaceThrottled, Running))
 
     // normal termination process
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
 
@@ -637,6 +654,7 @@
   }
 
   it should "be Flushing when the limit is 0 and restarted back to Running state when the limit is increased" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -689,10 +707,12 @@
     expectInitialData(watcher, dataMgmtService)
     probe.expectMsg(Transition(fsm, Uninitialized, Running))
 
+    clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
+
     probe.expectMsg(Transition(fsm, Running, Flushing))
     // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
-    Thread.sleep(flushGrace.toMillis)
     fsm ! messages(1)
+    fsm ! StateTimeout
 
     awaitAssert({
       ackedMessageCount shouldBe 1
@@ -700,7 +720,7 @@
       storedMessageCount shouldBe 1
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero)))
       fsm.underlyingActor.queue.length shouldBe 1
-    }, FiniteDuration(retentionTimeout, MILLISECONDS))
+    }, 5.seconds)
     // limit is increased by an operator
     limit = 10
 
@@ -730,11 +750,11 @@
     fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace -= 1
 
     // normal termination process
-    Thread.sleep(idleGrace.toMillis)
+    fsm ! StateTimeout
 
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
 
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -747,6 +767,7 @@
   }
 
   it should "be Flushing when the limit is 0 and be terminated without recovering" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -801,7 +822,8 @@
     fsm ! messages(1)
 
     // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
-    Thread.sleep(flushGrace.toMillis)
+    clock.plusSeconds((queueConfig.maxRetentionMs) / 1000)
+    fsm ! DropOld
 
     awaitAssert({
       ackedMessageCount shouldBe 2
@@ -809,9 +831,10 @@
       storedMessageCount shouldBe 2
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero)))
       fsm.underlyingActor.queue.length shouldBe 0
-    }, FiniteDuration(retentionTimeout, MILLISECONDS))
+    }, 5.seconds)
 
     // In this case data clean up happens first.
+    fsm ! StateTimeout
     expectDataCleanUp(watcher, dataMgmtService)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
 
@@ -822,6 +845,7 @@
   }
 
   it should "be the Flushing state when a whisk error happens" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -897,7 +921,8 @@
 
     fsm ! messages(1)
 
-    Thread.sleep(flushGrace.toMillis)
+    clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
+    fsm ! StateTimeout
 
     awaitAssert({
       ackedMessageCount shouldBe 2
@@ -907,7 +932,8 @@
       fsm.underlyingActor.queue.length shouldBe 0
     }, FiniteDuration(retentionTimeout, MILLISECONDS))
 
-    Thread.sleep(flushGrace.toMillis * 2)
+    clock.plusSeconds(flushGrace.toSeconds * 2)
+    fsm ! StateTimeout
 
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
@@ -920,6 +946,7 @@
   }
 
   it should "be the Flushing state when a whisk error happens and be recovered when a container is created" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -975,12 +1002,12 @@
         fsm ! FailedCreationJob(id, testInvocationNamespace, fqn, revision, WhiskError, "whisk error")
     }
 
+    clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
     probe.expectMsg(Transition(fsm, Running, Flushing))
-    Thread.sleep(1000)
     fsm ! messages(1)
 
     // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
-    Thread.sleep(flushGrace.toMillis)
+    fsm ! StateTimeout
 
     awaitAssert({
       ackedMessageCount shouldBe 1
@@ -1005,11 +1032,11 @@
     container.send(fsm, getActivation(false))
     container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
 
-    Thread.sleep(idleGrace.toMillis)
-
+    fsm.underlyingActor.creationIds = Set.empty[String]
+    fsm ! StateTimeout
     probe.expectMsg(Transition(fsm, Running, Idle))
 
-    Thread.sleep(stopGrace.toMillis)
+    fsm ! StateTimeout
 
     parent.expectMsg(queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -1022,6 +1049,7 @@
   }
 
   it should "be the Flushing state when a developer error happens" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -1113,7 +1141,10 @@
       fsm.underlyingActor.queue.length shouldBe 0
     }
 
-    Thread.sleep(flushGrace.toMillis * 2)
+    // simulate timeout 2 times
+    clock.plusSeconds(flushGrace.toSeconds * 2)
+    fsm ! StateTimeout
+    fsm ! StateTimeout
 
     expectDataCleanUp(watcher, dataMgmtService)
     parent.expectMsg(queueRemovedMsg)
@@ -1125,6 +1156,7 @@
   }
 
   it should "be gracefully terminated when it receives a GracefulShutDown message" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -1211,12 +1243,12 @@
     fsm ! QueueRemovedCompleted
 
     // if there is a message, it should not terminate
-    Thread.sleep(gracefulShutdownTimeout.toMillis)
+    fsm ! StateTimeout
 
     container.send(fsm, getActivation())
     container.expectMsg(ActivationResponse(Right(messages(2))))
 
-    Thread.sleep(gracefulShutdownTimeout.toMillis)
+    fsm ! StateTimeout
 
     // it doesn't need to check if all containers are timeout as same version of a new queue will be created in another scheduler.
 
@@ -1235,6 +1267,7 @@
     val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed)
 
     allStates.foreach { state =>
+      implicit val clock = SystemClock
       val mockEtcdClient = mock[EtcdClient]
       val parent = TestProbe()
       val watcher = TestProbe()
@@ -1319,6 +1352,7 @@
     val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed)
 
     allStates.foreach { state =>
+      implicit val clock = SystemClock
       val mockEtcdClient = mock[EtcdClient]
       val parent = TestProbe()
       val watcher = TestProbe()
@@ -1393,7 +1427,7 @@
       probe.expectMsg(Transition(fsm, state, Removed))
 
       // queue manager could not respond to the memory queue.
-      Thread.sleep(stopGrace.toMillis)
+      fsm ! StateTimeout
 
       // the queue is supposed to send queueRemovedMsg once again and stops itself.
       parent.expectMsg(queueRemovedMsg)
@@ -1408,6 +1442,7 @@
       List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)
 
     allStates.foreach { state =>
+      implicit val clock = SystemClock
       val mockEtcdClient = mock[EtcdClient]
       val parent = TestProbe()
       val watcher = TestProbe()
@@ -1509,7 +1544,7 @@
           parent.expectMsg(message)
 
           // queue should be terminated after gracefulShutdownTimeout
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           // clean up actors only because etcd data is being used by a new queue
           watcher.expectMsgAllOf(
@@ -1548,14 +1583,15 @@
           probe.expectMsg(Transition(fsm, state, Removed))
 
           fsm ! QueueRemovedCompleted
-
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           watcher.expectMsgAllOf(
             UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
             UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),
             UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
 
+          probe.expectTerminated(fsm, 10.seconds)
+
         case _ =>
           parent.expectMsg(staleQueueRemovedMsg)
           parent.expectMsg(message)
@@ -1584,6 +1620,7 @@
       List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)
 
     allStates.foreach { state =>
+      implicit val clock = SystemClock
       val mockEtcdClient = mock[EtcdClient]
       val parent = TestProbe()
       val watcher = TestProbe()
@@ -1676,13 +1713,13 @@
         // queue will be gracefully shutdown.
         case Removing =>
           // queue should not be terminated as there is an activation
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           container.send(fsm, getActivation())
           container.expectMsg(ActivationResponse(Right(message)))
 
           // queue should not be terminated as there is an activation
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           // clean up actors only because etcd data is being used by a new queue
           watcher.expectMsgAllOf(
@@ -1721,12 +1758,12 @@
           fsm ! QueueRemovedCompleted
 
           // queue should not be terminated as there is an activation
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           container.send(fsm, getActivation())
           container.expectMsg(ActivationResponse(Right(message)))
 
-          Thread.sleep(gracefulShutdownTimeout.toMillis)
+          fsm ! StateTimeout
 
           watcher.expectMsgAllOf(
             UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index fd2ac73..e642628 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -33,6 +33,7 @@
 import com.ibm.etcd.client.{EtcdClient => Client}
 import com.sksamuel.elastic4s.http.ElasticClient
 import common.StreamLogging
+import org.apache.openwhisk.common.time.SystemClock
 import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector._
@@ -135,7 +136,131 @@
 
   behavior of "MemoryQueue"
 
+  it should "send StateTimeout message when state timeout" in {
+    implicit val clock = SystemClock
+    val mockEtcdClient = mock[EtcdClient]
+    val prove = TestProbe()
+    val watcher = TestProbe()
+    val parent = TestProbe()
+
+    expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+    val queueConfigWithShortTimeout = queueConfig.copy(
+      idleGrace = 10.milliseconds,
+      stopGrace = 10.milliseconds,
+      gracefulShutdownTimeout = 10.milliseconds)
+
+    val fsm =
+      TestFSMRef(
+        new MemoryQueue(
+          mockEtcdClient,
+          durationChecker,
+          fqn,
+          mockMessaging(),
+          schedulingConfig,
+          testInvocationNamespace,
+          revision,
+          endpoints,
+          actionMetadata,
+          prove.ref,
+          watcher.ref,
+          TestProbe().ref,
+          TestProbe().ref,
+          schedulerId,
+          ack,
+          store,
+          getUserLimit,
+          checkToDropStaleActivation,
+          queueConfigWithShortTimeout),
+        parent.ref,
+        "MemoryQueue")
+
+    registerCallback(fsm)
+    fsm ! Start
+    expectMsg(Transition(fsm, Uninitialized, Running))
+
+    // Test stateTimeout for when(Running, stateTimeout = queueConfig.idleGrace)
+    fsm.isStateTimerActive shouldBe true
+    Thread.sleep(queueConfigWithShortTimeout.idleGrace.toMillis)
+
+    expectMsg(Transition(fsm, Running, Idle))
+
+    // Test stateTimeout for when(Idle, stateTimeout = queueConfig.stopGrace)
+    fsm.isStateTimerActive shouldBe true
+    Thread.sleep(queueConfigWithShortTimeout.stopGrace.toMillis)
+    expectMsg(Transition(fsm, Idle, Removed))
+
+    // Test stateTimeout for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout)
+    fsm.isStateTimerActive shouldBe true
+    Thread.sleep(queueConfigWithShortTimeout.gracefulShutdownTimeout.toMillis)
+    parent.expectMsg(queueRemovedMsg)
+  }
+
+  it should "start startTimerWithFixedDelay(name=StopQueue) on Transition _ => Flushing" in {
+    implicit val clock = SystemClock
+    val mockEtcdClient = mock[EtcdClient]
+    val prove = TestProbe()
+    val watcher = TestProbe()
+    val parent = TestProbe()
+
+    expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+    val queueConfigWithShortTimeout =
+      queueConfig.copy(
+        idleGrace = 10.seconds,
+        stopGrace = 10.milliseconds,
+        gracefulShutdownTimeout = 10.milliseconds,
+        flushGrace = 10.milliseconds)
+
+    val fsm =
+      TestFSMRef(
+        new MemoryQueue(
+          mockEtcdClient,
+          durationChecker,
+          fqn,
+          mockMessaging(),
+          schedulingConfig,
+          testInvocationNamespace,
+          revision,
+          endpoints,
+          actionMetadata,
+          prove.ref,
+          watcher.ref,
+          TestProbe().ref,
+          TestProbe().ref,
+          schedulerId,
+          ack,
+          store,
+          getUserLimit,
+          checkToDropStaleActivation,
+          queueConfigWithShortTimeout),
+        parent.ref,
+        "MemoryQueue")
+
+    registerCallback(fsm)
+    fsm ! Start
+    expectMsg(Transition(fsm, Uninitialized, Running))
+
+    fsm ! FailedCreationJob(
+      testCreationId,
+      message.user.namespace.name.asString,
+      message.action,
+      message.revision,
+      ContainerCreationError.NoAvailableInvokersError,
+      "no available invokers")
+
+    // Test case _ -> Flushing => startTimerWithFixedDelay("StopQueue", StateTimeout, queueConfig.flushGrace)
+    // state Running -> Flushing
+    expectMsg(Transition(fsm, Running, Flushing))
+    fsm.isTimerActive("StopQueue") shouldBe true
+
+    // wait for flushGrace time, StopQueue timer should send StateTimeout
+    Thread.sleep(queueConfigWithShortTimeout.flushGrace.toMillis)
+    expectMsg(Transition(fsm, Flushing, Removed))
+  }
+
   it should "register the endpoint when initializing" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val prove = TestProbe()
     val watcher = TestProbe()
@@ -192,6 +317,7 @@
   }
 
   it should "go to Flushing state if any error happens when the queue is depreacted" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val prove = TestProbe()
     val watcher = TestProbe()
@@ -252,6 +378,7 @@
   }
 
   it should "go to the Running state without storing any data if it receives VersionUpdated" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val prove = TestProbe()
     val watcher = TestProbe()
@@ -308,6 +435,7 @@
   }
 
   it should "remove the queue when timeout occurs" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val dataManagementService = TestProbe()
@@ -380,6 +508,7 @@
   }
 
   it should "back to Running state when got new ActivationMessage when in Idle State" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val prove = TestProbe()
     val watcher = TestProbe()
@@ -459,6 +588,7 @@
   }
 
   it should "back to Running state when got new ActivationMessage when in Removed State" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val prove = TestProbe()
     val watcher = TestProbe()
@@ -530,6 +660,7 @@
   }
 
   it should "store the received ActivationMessage in the queue" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = new MockEtcdClient(client, isLeader = true)
     val prove = TestProbe()
 
@@ -571,6 +702,7 @@
   }
 
   it should "send a ActivationMessage in response to GetActivation" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val tid = TransactionId(TransactionId.generateTid())
@@ -611,6 +743,7 @@
   }
 
   it should "send NoActivationMessage in case there is no message in the queue" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val tid = TransactionId(TransactionId.generateTid())
@@ -650,6 +783,7 @@
   }
 
   it should "poll for the ActivationMessage in case there is no message in the queue" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val tid = TransactionId(TransactionId.generateTid())
@@ -698,6 +832,7 @@
   }
 
   it should "not send msg to a deleted container" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val tid = TransactionId(TransactionId.generateTid())
@@ -752,6 +887,7 @@
   }
 
   it should "send response to request according to the order of container id and warmed flag" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val tid = TransactionId(TransactionId.generateTid())
@@ -819,6 +955,7 @@
   }
 
   it should "send a container creation request to ContainerManager at initialization time" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val containerManger = TestProbe()
     val probe = TestProbe()
@@ -888,6 +1025,7 @@
   }
 
   it should "complete error activation while received FailedCreationJob and the error is not a whisk error(unrecoverable)" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val testProbe = TestProbe()
     val parent = TestProbe()
@@ -968,6 +1106,7 @@
   }
 
   it should "complete error activation after timeout while received FailedCreationJob and the error is a whisk error(recoverable)" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val testProbe = TestProbe()
     val decisionMaker = TestProbe()
@@ -1024,11 +1163,15 @@
     parent.expectNoMessage(5.seconds)
 
     // Add 3 more messages.
+    clock.plusSeconds(5)
     (1 to expectedCount).foreach(_ => fsm ! message)
     parent.expectNoMessage(5.seconds)
 
     // After 10 seconds(action retention timeout), the first 3 messages are timed out.
     // It does not get removed as there are still 3 messages in the queue.
+    clock.plusSeconds(5)
+    fsm ! DropOld
+
     awaitAssert({
       ackedMessageCount shouldBe 3
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("no available invokers")))
@@ -1052,7 +1195,8 @@
     parent.expectMsg(Transition(fsm, Running, Flushing))
 
     // wait for the flush grace, and then all existing activations will be flushed
-    Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis)
+    clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000)
+    fsm ! DropOld
 
     // The error message is updated from the recent error message of the FailedCreationJob.
     awaitAssert({
@@ -1072,6 +1216,7 @@
   }
 
   it should "send old version activation to queueManager when update action if doesn't exist old version container" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val queueManager = TestProbe()
@@ -1114,6 +1259,7 @@
   }
 
   it should "fetch old version activation by old container when update action" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val probe = TestProbe()
     val queueManager = TestProbe()
@@ -1159,6 +1305,7 @@
   }
 
   it should "complete error activation after blackbox timeout when the action is a blackbox action and received FailedCreationJob with a whisk error(recoverable)" in {
+    implicit val clock = new FakeClock
     val mockEtcdClient = mock[EtcdClient]
     val testProbe = TestProbe()
     val decisionMaker = TestProbe()
@@ -1246,7 +1393,8 @@
     fsm ! message
 
     // wait for the flush grace, and then some existing activations will be flushed
-    Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis)
+    clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000)
+    fsm ! DropOld
     (1 to expectedCount).foreach(_ => probe.expectMsg(ActivationResponse.whiskError("no available invokers")))
 
     val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace
@@ -1262,6 +1410,7 @@
   }
 
   it should "stop scheduling if the namespace does not exist" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val getZeroLimit = (_: String) => { Future.failed(NoDocumentException("namespace does not exist")) }
     val testProbe = TestProbe()
@@ -1302,10 +1451,11 @@
     parent.expectMsg(10 seconds, CurrentState(fsm, Uninitialized))
     parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
 
-    Thread.sleep(idleGrace.toMillis)
-
-    parent expectMsg QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), None)
-    parent.expectMsg(10 seconds, Transition(fsm, Running, Removing))
+    fsm ! StopSchedulingAsOutdated
+    parent expectMsgAllOf (10 seconds, Transition(fsm, Running, Removing), QueueRemoved(
+      testInvocationNamespace,
+      fqn.toDocId.asDocInfo(revision),
+      None))
 
     fsm ! QueueRemovedCompleted
     parent.expectMsg(10 seconds, Transition(fsm, Removing, Removed))
@@ -1314,6 +1464,7 @@
   }
 
   it should "throttle the namespace when the limit is already reached" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = TestProbe()
     val probe = TestProbe()
@@ -1364,6 +1515,7 @@
   }
 
   it should "disable namespace throttling when the capacity become available" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = TestProbe()
     val probe = TestProbe()
@@ -1425,6 +1577,7 @@
   }
 
   it should "throttle the action when the number of messages reaches the limit" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = TestProbe()
     val probe = TestProbe()
@@ -1473,6 +1626,7 @@
   }
 
   it should "disable action throttling when the number of messages is under throttling fraction" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = TestProbe()
     val probe = TestProbe()
@@ -1531,6 +1685,7 @@
   }
 
   it should "update the number of containers based on Watch event" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = new MockEtcdClient(client, true)
     val probe = TestProbe()
     val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
@@ -1574,7 +1729,7 @@
     val newRevision = DocRevision("2-testRev")
 
     memoryQueue.containers.size shouldBe 0
-    memoryQueue.creationIds.size shouldBe 0
+    memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
     memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
     memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
 
@@ -1611,13 +1766,12 @@
         Some(ContainerId("test-containerId2"))),
       "test-value")
 
-    Thread.sleep(1000)
-    memoryQueue.containers.size shouldBe 1
-    // the monit actor in memoryQueue may decide to create a container
-    memoryQueue.creationIds.size should be >= 1
-    memoryQueue.creationIds.size should be <= 2
-    memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
-    memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2
+    awaitAssert({
+      memoryQueue.containers.size shouldBe 1 // ['test-containerId1']
+      memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 1 // ['testId1']
+      memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
+      memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2
+    }, 5.seconds)
 
     mockEtcdClient.publishEvents(
       EventType.PUT,
@@ -1650,12 +1804,12 @@
         Some(ContainerId("test-containerId4"))),
       "test-value")
 
-    Thread.sleep(1000)
-    memoryQueue.containers.size shouldBe 2
-    memoryQueue.creationIds.size should be >= 2
-    memoryQueue.creationIds.size should be <= 3
-    memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
-    memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4
+    awaitAssert({
+      memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3']
+      memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 2 // ['testId1', 'testId3']
+      memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+      memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4
+    }, 5.seconds)
 
     mockEtcdClient.publishEvents(
       EventType.DELETE,
@@ -1678,12 +1832,12 @@
       inProgressContainer(testInvocationNamespace, newFqn, newRevision, schedulerId, CreationId("testId4")),
       "test-value")
 
-    Thread.sleep(1000)
-    memoryQueue.containers.size shouldBe 2
-    memoryQueue.creationIds.size should be >= 0
-    memoryQueue.creationIds.size should be <= 1
-    memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
-    memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+    awaitAssert({
+      memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3']
+      memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
+      memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
+      memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+    }, 5.seconds)
 
     mockEtcdClient.publishEvents(
       EventType.DELETE,
@@ -1726,15 +1880,12 @@
         Some(ContainerId("test-containerId4"))),
       "test-value")
 
-    memoryQueue.creationIds.size should be >= 0
-    memoryQueue.creationIds.size should be <= 1
-
-    Thread.sleep(1000)
-    memoryQueue.containers.size shouldBe 0
-    memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container
-    memoryQueue.creationIds.size should be <= 2
-    memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
-    memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
+    awaitAssert({
+      memoryQueue.containers.size shouldBe 0
+      memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
+      memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
+      memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
+    }, 5.seconds)
   }
 
   private def getData(states: List[MemoryQueueState]) = {
@@ -1749,6 +1900,7 @@
     (schedulingActors, droppingActors, data)
   }
   it should "clean up throttling data when it stops gracefully" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
 
     val dataManagementService = TestProbe()
@@ -1815,7 +1967,8 @@
   it should "drop the old activation from the queue" in {
     var queue = Queue.empty[TimeSeriesActivationEntry]
 
-    val now = Instant.now
+    val clock = new FakeClock
+    val now = clock.now()
     val records = List(
       TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message),
       TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 2000), message),
@@ -1826,10 +1979,10 @@
     )
 
     records.foreach(record => queue = queue.enqueue(record))
-
-    Thread.sleep(5000)
+    clock.plusSeconds(5)
 
     queue = MemoryQueue.dropOld(
+      clock,
       queue,
       java.time.Duration.ofMillis(1000),
       "activation processing is not initiated for 1000 ms",
@@ -1843,6 +1996,7 @@
 
     noException should be thrownBy {
       queue = MemoryQueue.dropOld(
+        SystemClock,
         queue,
         java.time.Duration.ofMillis(1000),
         "activation processing is not initiated for 1000 ms",
@@ -1853,6 +2007,7 @@
   behavior of "duration checker"
 
   it should "check the duration once" in {
+    implicit val clock = SystemClock
     val mockEtcdClient = mock[EtcdClient]
 
     val dataManagementService = TestProbe()