Add fake clock for test code (#5304)
* Add fake clock for test code
* Add test code for state timeout
* Add test case for transaction _ => Flushing
* Add StateTimeout test for Flushing state
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala
new file mode 100644
index 0000000..8acc2e1
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.time
+
+import java.time.Instant
+
+trait Clock {
+ def now(): Instant
+}
+
+object SystemClock extends Clock {
+ def now() = Instant.now()
+}
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 312e9ec..436b53d 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -21,6 +21,7 @@
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common._
+import org.apache.openwhisk.common.time.{Clock, SystemClock}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
@@ -121,13 +122,14 @@
ack: ActiveAck,
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
getUserLimit: String => Future[Int],
- checkToDropStaleActivation: (Queue[TimeSeriesActivationEntry],
+ checkToDropStaleActivation: (Clock,
+ Queue[TimeSeriesActivationEntry],
Long,
String,
WhiskActionMetaData,
MemoryQueueState,
ActorRef) => Unit,
- queueConfig: QueueConfig)(implicit logging: Logging)
+ queueConfig: QueueConfig)(implicit logging: Logging, clock: Clock)
extends FSM[MemoryQueueState, MemoryQueueData]
with Stash {
@@ -342,7 +344,7 @@
msg.transid)
val whiskError = isWhiskError(data.error)
if (whiskError)
- queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+ queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
else
completeErrorActivation(msg, data.reason, whiskError)
stay() using data.copy(activeDuringFlush = true)
@@ -351,8 +353,12 @@
// Instead, StateTimeout message will be sent by a timer.
case Event(StateTimeout | DropOld, data: FlushingData) =>
logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.")
- queue =
- MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
+ queue = MemoryQueue.dropOld(
+ clock,
+ queue,
+ Duration.ofMillis(actionRetentionTimeout),
+ data.reason,
+ completeErrorActivation)
if (data.activeDuringFlush || queue.nonEmpty)
stay using data.copy(activeDuringFlush = false)
else
@@ -540,7 +546,7 @@
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
- .between(queue.head.timestamp, Instant.now)
+ .between(queue.head.timestamp, clock.now())
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
logging.error(
this,
@@ -550,6 +556,7 @@
s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}")
}
queue = MemoryQueue.dropOld(
+ clock,
queue,
Duration.ofMillis(actionRetentionTimeout),
s"Activation processing is not initiated for $actionRetentionTimeout ms",
@@ -706,6 +713,7 @@
NoData()
}
}
+
private def cleanUpWatcher(): Unit = {
watchedKeys.foreach { key =>
watcherService ! UnwatchEndpoint(key, isPrefix = true, watcherName)
@@ -883,7 +891,14 @@
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
- checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self)
+ checkToDropStaleActivation(
+ clock,
+ queue,
+ actionRetentionTimeout,
+ invocationNamespace,
+ actionMetaData,
+ stateName,
+ self)
Future.successful(())
}
@@ -930,7 +945,7 @@
@tailrec
private def getStaleActivationNum(count: Int, queue: Queue[TimeSeriesActivationEntry]): Int = {
if (queue.isEmpty || Duration
- .between(queue.head.timestamp, Instant.now)
+ .between(queue.head.timestamp, clock.now())
.compareTo(StaleDuration) < 0) count
else
getStaleActivationNum(count + 1, queue.tail)
@@ -988,7 +1003,7 @@
stay
}
.getOrElse {
- queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+ queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg))
in.decrementAndGet()
tryEnableActionThrottling()
}
@@ -1051,7 +1066,7 @@
/** Generates an activation with zero runtime. Usually used for error cases */
private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = {
- val now = Instant.now
+ val now = clock.now()
val causedBy = if (msg.causedBySequence) {
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
} else None
@@ -1101,6 +1116,7 @@
ack: ActiveAck,
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
getUserLimit: String => Future[Int])(implicit logging: Logging): Props = {
+ implicit val clock: Clock = SystemClock
Props(
new MemoryQueue(
etcdClient,
@@ -1126,19 +1142,21 @@
@tailrec
def dropOld(
+ clock: Clock,
queue: Queue[TimeSeriesActivationEntry],
retention: Duration,
reason: String,
completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = {
- if (queue.isEmpty || Duration.between(queue.head.timestamp, Instant.now).compareTo(retention) < 0)
+ if (queue.isEmpty || Duration.between(queue.head.timestamp, clock.now()).compareTo(retention) < 0)
queue
else {
completeErrorActivation(queue.head.msg, reason, true)
- dropOld(queue.tail, retention, reason, completeErrorActivation)
+ dropOld(clock, queue.tail, retention, reason, completeErrorActivation)
}
}
- def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
+ def checkToDropStaleActivation(clock: Clock,
+ queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
invocationNamespace: String,
actionMetaData: WhiskActionMetaData,
@@ -1150,7 +1168,7 @@
s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")
if (queue.nonEmpty && Duration
- .between(queue.head.timestamp, Instant.now)
+ .between(queue.head.timestamp, clock.now())
.compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
logging.info(
this,
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 5256832..e68de5b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -22,6 +22,7 @@
import akka.testkit.{TestActor, TestFSMRef, TestProbe}
import com.sksamuel.elastic4s.http.{search => _}
import org.apache.openwhisk.common.GracefulShutdown
+import org.apache.openwhisk.common.time.{Clock, SystemClock}
import org.apache.openwhisk.core.connector.ContainerCreationError.{NonExecutableActionError, WhiskError}
import org.apache.openwhisk.core.connector.ContainerCreationMessage
import org.apache.openwhisk.core.entity._
@@ -50,6 +51,17 @@
import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
import scala.language.postfixOps
+class FakeClock extends Clock {
+ var instant: Instant = Instant.now()
+ def now() = instant
+ def set(now: Instant): Unit = {
+ instant = now
+ }
+ def plusSeconds(secondsToAdd: Long): Unit = {
+ instant = instant.plusSeconds(secondsToAdd)
+ }
+}
+
@RunWith(classOf[JUnitRunner])
class MemoryQueueFlowTests
extends MemoryQueueTestsFixture
@@ -75,6 +87,7 @@
behavior of "MemoryQueueFlow"
it should "normally be created and handle an activation and became idle an finally removed" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -151,11 +164,11 @@
container.send(fsm, getActivation(false))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
expectDataCleanUp(watcher, dataMgmtService)
@@ -168,6 +181,7 @@
}
it should "became Idle and Running again if a message arrives" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -235,7 +249,7 @@
container.send(fsm, getActivation(false))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
@@ -243,7 +257,7 @@
probe.expectMsg(Transition(fsm, Idle, Running))
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
// since there is one message, it should not be Idle
probe.expectNoMessage()
@@ -264,11 +278,11 @@
container.send(fsm, getActivation(false))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -281,6 +295,7 @@
}
it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -367,6 +382,7 @@
}
it should "go to the NamespaceThrottled state without dropping messages and get back to the Running container" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -471,12 +487,12 @@
container.send(fsm, getActivation(false, "testContainerId2"))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
// all subsequent procedures are same with the Running case
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -489,6 +505,7 @@
}
it should "go to the ActionThrottled state when there are too many stale activations including transition to NamespaceThrottling" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -623,10 +640,10 @@
probe.expectMsg(Transition(fsm, NamespaceThrottled, Running))
// normal termination process
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -637,6 +654,7 @@
}
it should "be Flushing when the limit is 0 and restarted back to Running state when the limit is increased" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -689,10 +707,12 @@
expectInitialData(watcher, dataMgmtService)
probe.expectMsg(Transition(fsm, Uninitialized, Running))
+ clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
+
probe.expectMsg(Transition(fsm, Running, Flushing))
// activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
- Thread.sleep(flushGrace.toMillis)
fsm ! messages(1)
+ fsm ! StateTimeout
awaitAssert({
ackedMessageCount shouldBe 1
@@ -700,7 +720,7 @@
storedMessageCount shouldBe 1
lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero)))
fsm.underlyingActor.queue.length shouldBe 1
- }, FiniteDuration(retentionTimeout, MILLISECONDS))
+ }, 5.seconds)
// limit is increased by an operator
limit = 10
@@ -730,11 +750,11 @@
fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace -= 1
// normal termination process
- Thread.sleep(idleGrace.toMillis)
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -747,6 +767,7 @@
}
it should "be Flushing when the limit is 0 and be terminated without recovering" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -801,7 +822,8 @@
fsm ! messages(1)
// activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
- Thread.sleep(flushGrace.toMillis)
+ clock.plusSeconds((queueConfig.maxRetentionMs) / 1000)
+ fsm ! DropOld
awaitAssert({
ackedMessageCount shouldBe 2
@@ -809,9 +831,10 @@
storedMessageCount shouldBe 2
lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero)))
fsm.underlyingActor.queue.length shouldBe 0
- }, FiniteDuration(retentionTimeout, MILLISECONDS))
+ }, 5.seconds)
// In this case data clean up happens first.
+ fsm ! StateTimeout
expectDataCleanUp(watcher, dataMgmtService)
probe.expectMsg(Transition(fsm, Flushing, Removed))
@@ -822,6 +845,7 @@
}
it should "be the Flushing state when a whisk error happens" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -897,7 +921,8 @@
fsm ! messages(1)
- Thread.sleep(flushGrace.toMillis)
+ clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
+ fsm ! StateTimeout
awaitAssert({
ackedMessageCount shouldBe 2
@@ -907,7 +932,8 @@
fsm.underlyingActor.queue.length shouldBe 0
}, FiniteDuration(retentionTimeout, MILLISECONDS))
- Thread.sleep(flushGrace.toMillis * 2)
+ clock.plusSeconds(flushGrace.toSeconds * 2)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Flushing, Removed))
@@ -920,6 +946,7 @@
}
it should "be the Flushing state when a whisk error happens and be recovered when a container is created" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -975,12 +1002,12 @@
fsm ! FailedCreationJob(id, testInvocationNamespace, fqn, revision, WhiskError, "whisk error")
}
+ clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds)
probe.expectMsg(Transition(fsm, Running, Flushing))
- Thread.sleep(1000)
fsm ! messages(1)
// activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error
- Thread.sleep(flushGrace.toMillis)
+ fsm ! StateTimeout
awaitAssert({
ackedMessageCount shouldBe 1
@@ -1005,11 +1032,11 @@
container.send(fsm, getActivation(false))
container.expectMsg(ActivationResponse(Left(NoActivationMessage())))
- Thread.sleep(idleGrace.toMillis)
-
+ fsm.underlyingActor.creationIds = Set.empty[String]
+ fsm ! StateTimeout
probe.expectMsg(Transition(fsm, Running, Idle))
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, Idle, Removed))
@@ -1022,6 +1049,7 @@
}
it should "be the Flushing state when a developer error happens" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1113,7 +1141,10 @@
fsm.underlyingActor.queue.length shouldBe 0
}
- Thread.sleep(flushGrace.toMillis * 2)
+ // simulate timeout 2 times
+ clock.plusSeconds(flushGrace.toSeconds * 2)
+ fsm ! StateTimeout
+ fsm ! StateTimeout
expectDataCleanUp(watcher, dataMgmtService)
parent.expectMsg(queueRemovedMsg)
@@ -1125,6 +1156,7 @@
}
it should "be gracefully terminated when it receives a GracefulShutDown message" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1211,12 +1243,12 @@
fsm ! QueueRemovedCompleted
// if there is a message, it should not terminate
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(messages(2))))
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
// it doesn't need to check if all containers are timeout as same version of a new queue will be created in another scheduler.
@@ -1235,6 +1267,7 @@
val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed)
allStates.foreach { state =>
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1319,6 +1352,7 @@
val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed)
allStates.foreach { state =>
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1393,7 +1427,7 @@
probe.expectMsg(Transition(fsm, state, Removed))
// queue manager could not respond to the memory queue.
- Thread.sleep(stopGrace.toMillis)
+ fsm ! StateTimeout
// the queue is supposed to send queueRemovedMsg once again and stops itself.
parent.expectMsg(queueRemovedMsg)
@@ -1408,6 +1442,7 @@
List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)
allStates.foreach { state =>
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1509,7 +1544,7 @@
parent.expectMsg(message)
// queue should be terminated after gracefulShutdownTimeout
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
// clean up actors only because etcd data is being used by a new queue
watcher.expectMsgAllOf(
@@ -1548,14 +1583,15 @@
probe.expectMsg(Transition(fsm, state, Removed))
fsm ! QueueRemovedCompleted
-
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
watcher.expectMsgAllOf(
UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName),
UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
+ probe.expectTerminated(fsm, 10.seconds)
+
case _ =>
parent.expectMsg(staleQueueRemovedMsg)
parent.expectMsg(message)
@@ -1584,6 +1620,7 @@
List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed)
allStates.foreach { state =>
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
@@ -1676,13 +1713,13 @@
// queue will be gracefully shutdown.
case Removing =>
// queue should not be terminated as there is an activation
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(message)))
// queue should not be terminated as there is an activation
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
// clean up actors only because etcd data is being used by a new queue
watcher.expectMsgAllOf(
@@ -1721,12 +1758,12 @@
fsm ! QueueRemovedCompleted
// queue should not be terminated as there is an activation
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
container.send(fsm, getActivation())
container.expectMsg(ActivationResponse(Right(message)))
- Thread.sleep(gracefulShutdownTimeout.toMillis)
+ fsm ! StateTimeout
watcher.expectMsgAllOf(
UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName),
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index fd2ac73..e642628 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -33,6 +33,7 @@
import com.ibm.etcd.client.{EtcdClient => Client}
import com.sksamuel.elastic4s.http.ElasticClient
import common.StreamLogging
+import org.apache.openwhisk.common.time.SystemClock
import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector._
@@ -135,7 +136,131 @@
behavior of "MemoryQueue"
+ it should "send StateTimeout message when state timeout" in {
+ implicit val clock = SystemClock
+ val mockEtcdClient = mock[EtcdClient]
+ val prove = TestProbe()
+ val watcher = TestProbe()
+ val parent = TestProbe()
+
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+ val queueConfigWithShortTimeout = queueConfig.copy(
+ idleGrace = 10.milliseconds,
+ stopGrace = 10.milliseconds,
+ gracefulShutdownTimeout = 10.milliseconds)
+
+ val fsm =
+ TestFSMRef(
+ new MemoryQueue(
+ mockEtcdClient,
+ durationChecker,
+ fqn,
+ mockMessaging(),
+ schedulingConfig,
+ testInvocationNamespace,
+ revision,
+ endpoints,
+ actionMetadata,
+ prove.ref,
+ watcher.ref,
+ TestProbe().ref,
+ TestProbe().ref,
+ schedulerId,
+ ack,
+ store,
+ getUserLimit,
+ checkToDropStaleActivation,
+ queueConfigWithShortTimeout),
+ parent.ref,
+ "MemoryQueue")
+
+ registerCallback(fsm)
+ fsm ! Start
+ expectMsg(Transition(fsm, Uninitialized, Running))
+
+ // Test stateTimeout for when(Running, stateTimeout = queueConfig.idleGrace)
+ fsm.isStateTimerActive shouldBe true
+ Thread.sleep(queueConfigWithShortTimeout.idleGrace.toMillis)
+
+ expectMsg(Transition(fsm, Running, Idle))
+
+ // Test stateTimeout for when(Idle, stateTimeout = queueConfig.stopGrace)
+ fsm.isStateTimerActive shouldBe true
+ Thread.sleep(queueConfigWithShortTimeout.stopGrace.toMillis)
+ expectMsg(Transition(fsm, Idle, Removed))
+
+ // Test stateTimeout for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout)
+ fsm.isStateTimerActive shouldBe true
+ Thread.sleep(queueConfigWithShortTimeout.gracefulShutdownTimeout.toMillis)
+ parent.expectMsg(queueRemovedMsg)
+ }
+
+ it should "start startTimerWithFixedDelay(name=StopQueue) on Transition _ => Flushing" in {
+ implicit val clock = SystemClock
+ val mockEtcdClient = mock[EtcdClient]
+ val prove = TestProbe()
+ val watcher = TestProbe()
+ val parent = TestProbe()
+
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+ val queueConfigWithShortTimeout =
+ queueConfig.copy(
+ idleGrace = 10.seconds,
+ stopGrace = 10.milliseconds,
+ gracefulShutdownTimeout = 10.milliseconds,
+ flushGrace = 10.milliseconds)
+
+ val fsm =
+ TestFSMRef(
+ new MemoryQueue(
+ mockEtcdClient,
+ durationChecker,
+ fqn,
+ mockMessaging(),
+ schedulingConfig,
+ testInvocationNamespace,
+ revision,
+ endpoints,
+ actionMetadata,
+ prove.ref,
+ watcher.ref,
+ TestProbe().ref,
+ TestProbe().ref,
+ schedulerId,
+ ack,
+ store,
+ getUserLimit,
+ checkToDropStaleActivation,
+ queueConfigWithShortTimeout),
+ parent.ref,
+ "MemoryQueue")
+
+ registerCallback(fsm)
+ fsm ! Start
+ expectMsg(Transition(fsm, Uninitialized, Running))
+
+ fsm ! FailedCreationJob(
+ testCreationId,
+ message.user.namespace.name.asString,
+ message.action,
+ message.revision,
+ ContainerCreationError.NoAvailableInvokersError,
+ "no available invokers")
+
+ // Test case _ -> Flushing => startTimerWithFixedDelay("StopQueue", StateTimeout, queueConfig.flushGrace)
+ // state Running -> Flushing
+ expectMsg(Transition(fsm, Running, Flushing))
+ fsm.isTimerActive("StopQueue") shouldBe true
+
+ // wait for flushGrace time, StopQueue timer should send StateTimeout
+ Thread.sleep(queueConfigWithShortTimeout.flushGrace.toMillis)
+ expectMsg(Transition(fsm, Flushing, Removed))
+ }
+
it should "register the endpoint when initializing" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val prove = TestProbe()
val watcher = TestProbe()
@@ -192,6 +317,7 @@
}
it should "go to Flushing state if any error happens when the queue is depreacted" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val prove = TestProbe()
val watcher = TestProbe()
@@ -252,6 +378,7 @@
}
it should "go to the Running state without storing any data if it receives VersionUpdated" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val prove = TestProbe()
val watcher = TestProbe()
@@ -308,6 +435,7 @@
}
it should "remove the queue when timeout occurs" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val dataManagementService = TestProbe()
@@ -380,6 +508,7 @@
}
it should "back to Running state when got new ActivationMessage when in Idle State" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val prove = TestProbe()
val watcher = TestProbe()
@@ -459,6 +588,7 @@
}
it should "back to Running state when got new ActivationMessage when in Removed State" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val prove = TestProbe()
val watcher = TestProbe()
@@ -530,6 +660,7 @@
}
it should "store the received ActivationMessage in the queue" in {
+ implicit val clock = SystemClock
val mockEtcdClient = new MockEtcdClient(client, isLeader = true)
val prove = TestProbe()
@@ -571,6 +702,7 @@
}
it should "send a ActivationMessage in response to GetActivation" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val tid = TransactionId(TransactionId.generateTid())
@@ -611,6 +743,7 @@
}
it should "send NoActivationMessage in case there is no message in the queue" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val tid = TransactionId(TransactionId.generateTid())
@@ -650,6 +783,7 @@
}
it should "poll for the ActivationMessage in case there is no message in the queue" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val tid = TransactionId(TransactionId.generateTid())
@@ -698,6 +832,7 @@
}
it should "not send msg to a deleted container" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val tid = TransactionId(TransactionId.generateTid())
@@ -752,6 +887,7 @@
}
it should "send response to request according to the order of container id and warmed flag" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val tid = TransactionId(TransactionId.generateTid())
@@ -819,6 +955,7 @@
}
it should "send a container creation request to ContainerManager at initialization time" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val containerManger = TestProbe()
val probe = TestProbe()
@@ -888,6 +1025,7 @@
}
it should "complete error activation while received FailedCreationJob and the error is not a whisk error(unrecoverable)" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val testProbe = TestProbe()
val parent = TestProbe()
@@ -968,6 +1106,7 @@
}
it should "complete error activation after timeout while received FailedCreationJob and the error is a whisk error(recoverable)" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val testProbe = TestProbe()
val decisionMaker = TestProbe()
@@ -1024,11 +1163,15 @@
parent.expectNoMessage(5.seconds)
// Add 3 more messages.
+ clock.plusSeconds(5)
(1 to expectedCount).foreach(_ => fsm ! message)
parent.expectNoMessage(5.seconds)
// After 10 seconds(action retention timeout), the first 3 messages are timed out.
// It does not get removed as there are still 3 messages in the queue.
+ clock.plusSeconds(5)
+ fsm ! DropOld
+
awaitAssert({
ackedMessageCount shouldBe 3
lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("no available invokers")))
@@ -1052,7 +1195,8 @@
parent.expectMsg(Transition(fsm, Running, Flushing))
// wait for the flush grace, and then all existing activations will be flushed
- Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis)
+ clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000)
+ fsm ! DropOld
// The error message is updated from the recent error message of the FailedCreationJob.
awaitAssert({
@@ -1072,6 +1216,7 @@
}
it should "send old version activation to queueManager when update action if doesn't exist old version container" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val queueManager = TestProbe()
@@ -1114,6 +1259,7 @@
}
it should "fetch old version activation by old container when update action" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val probe = TestProbe()
val queueManager = TestProbe()
@@ -1159,6 +1305,7 @@
}
it should "complete error activation after blackbox timeout when the action is a blackbox action and received FailedCreationJob with a whisk error(recoverable)" in {
+ implicit val clock = new FakeClock
val mockEtcdClient = mock[EtcdClient]
val testProbe = TestProbe()
val decisionMaker = TestProbe()
@@ -1246,7 +1393,8 @@
fsm ! message
// wait for the flush grace, and then some existing activations will be flushed
- Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis)
+ clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000)
+ fsm ! DropOld
(1 to expectedCount).foreach(_ => probe.expectMsg(ActivationResponse.whiskError("no available invokers")))
val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace
@@ -1262,6 +1410,7 @@
}
it should "stop scheduling if the namespace does not exist" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val getZeroLimit = (_: String) => { Future.failed(NoDocumentException("namespace does not exist")) }
val testProbe = TestProbe()
@@ -1302,10 +1451,11 @@
parent.expectMsg(10 seconds, CurrentState(fsm, Uninitialized))
parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
- Thread.sleep(idleGrace.toMillis)
-
- parent expectMsg QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), None)
- parent.expectMsg(10 seconds, Transition(fsm, Running, Removing))
+ fsm ! StopSchedulingAsOutdated
+ parent expectMsgAllOf (10 seconds, Transition(fsm, Running, Removing), QueueRemoved(
+ testInvocationNamespace,
+ fqn.toDocId.asDocInfo(revision),
+ None))
fsm ! QueueRemovedCompleted
parent.expectMsg(10 seconds, Transition(fsm, Removing, Removed))
@@ -1314,6 +1464,7 @@
}
it should "throttle the namespace when the limit is already reached" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()
val probe = TestProbe()
@@ -1364,6 +1515,7 @@
}
it should "disable namespace throttling when the capacity become available" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()
val probe = TestProbe()
@@ -1425,6 +1577,7 @@
}
it should "throttle the action when the number of messages reaches the limit" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()
val probe = TestProbe()
@@ -1473,6 +1626,7 @@
}
it should "disable action throttling when the number of messages is under throttling fraction" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()
val probe = TestProbe()
@@ -1531,6 +1685,7 @@
}
it should "update the number of containers based on Watch event" in {
+ implicit val clock = SystemClock
val mockEtcdClient = new MockEtcdClient(client, true)
val probe = TestProbe()
val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
@@ -1574,7 +1729,7 @@
val newRevision = DocRevision("2-testRev")
memoryQueue.containers.size shouldBe 0
- memoryQueue.creationIds.size shouldBe 0
+ memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
@@ -1611,13 +1766,12 @@
Some(ContainerId("test-containerId2"))),
"test-value")
- Thread.sleep(1000)
- memoryQueue.containers.size shouldBe 1
- // the monit actor in memoryQueue may decide to create a container
- memoryQueue.creationIds.size should be >= 1
- memoryQueue.creationIds.size should be <= 2
- memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
- memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2
+ awaitAssert({
+ memoryQueue.containers.size shouldBe 1 // ['test-containerId1']
+ memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 1 // ['testId1']
+ memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2
+ memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2
+ }, 5.seconds)
mockEtcdClient.publishEvents(
EventType.PUT,
@@ -1650,12 +1804,12 @@
Some(ContainerId("test-containerId4"))),
"test-value")
- Thread.sleep(1000)
- memoryQueue.containers.size shouldBe 2
- memoryQueue.creationIds.size should be >= 2
- memoryQueue.creationIds.size should be <= 3
- memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
- memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4
+ awaitAssert({
+ memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3']
+ memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 2 // ['testId1', 'testId3']
+ memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+ memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4
+ }, 5.seconds)
mockEtcdClient.publishEvents(
EventType.DELETE,
@@ -1678,12 +1832,12 @@
inProgressContainer(testInvocationNamespace, newFqn, newRevision, schedulerId, CreationId("testId4")),
"test-value")
- Thread.sleep(1000)
- memoryQueue.containers.size shouldBe 2
- memoryQueue.creationIds.size should be >= 0
- memoryQueue.creationIds.size should be <= 1
- memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
- memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+ awaitAssert({
+ memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3']
+ memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
+ memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
+ memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4
+ }, 5.seconds)
mockEtcdClient.publishEvents(
EventType.DELETE,
@@ -1726,15 +1880,12 @@
Some(ContainerId("test-containerId4"))),
"test-value")
- memoryQueue.creationIds.size should be >= 0
- memoryQueue.creationIds.size should be <= 1
-
- Thread.sleep(1000)
- memoryQueue.containers.size shouldBe 0
- memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container
- memoryQueue.creationIds.size should be <= 2
- memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
- memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
+ awaitAssert({
+ memoryQueue.containers.size shouldBe 0
+ memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0
+ memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0
+ memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0
+ }, 5.seconds)
}
private def getData(states: List[MemoryQueueState]) = {
@@ -1749,6 +1900,7 @@
(schedulingActors, droppingActors, data)
}
it should "clean up throttling data when it stops gracefully" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()
@@ -1815,7 +1967,8 @@
it should "drop the old activation from the queue" in {
var queue = Queue.empty[TimeSeriesActivationEntry]
- val now = Instant.now
+ val clock = new FakeClock
+ val now = clock.now()
val records = List(
TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message),
TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 2000), message),
@@ -1826,10 +1979,10 @@
)
records.foreach(record => queue = queue.enqueue(record))
-
- Thread.sleep(5000)
+ clock.plusSeconds(5)
queue = MemoryQueue.dropOld(
+ clock,
queue,
java.time.Duration.ofMillis(1000),
"activation processing is not initiated for 1000 ms",
@@ -1843,6 +1996,7 @@
noException should be thrownBy {
queue = MemoryQueue.dropOld(
+ SystemClock,
queue,
java.time.Duration.ofMillis(1000),
"activation processing is not initiated for 1000 ms",
@@ -1853,6 +2007,7 @@
behavior of "duration checker"
it should "check the duration once" in {
+ implicit val clock = SystemClock
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = TestProbe()