Resiliency Updates for MessageHub Tests (#253)
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 02183d2..139ed21 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -141,9 +141,7 @@
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
// It takes a moment for the consumer to fully initialize.
@@ -280,9 +278,7 @@
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
val readRunResult = wsk.action.invoke(actionName, parameters = Map(
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 1fe18a1..e280e39 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -17,18 +17,21 @@
package system.packages
import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.Inside
import org.scalatest.junit.JUnitRunner
+import org.apache.kafka.clients.producer.ProducerRecord
+
import spray.json.DefaultJsonProtocol._
import spray.json._
+
import common.JsHelpers
import common.TestUtils
import common.TestHelpers
@@ -36,11 +39,14 @@
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+
import ActionHelper._
+
import java.util.Base64
import java.nio.charset.StandardCharsets
import java.time.{Clock, Instant}
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -166,28 +172,30 @@
_.response.success shouldBe true
}
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length == 1)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.length == 1)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
+ assert(messages.length == 1)
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+ val message = messages.head
+ message.getFieldPath("topic") shouldBe Some(topic.toJson)
+ message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+ }, N = 3)
}
it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
@@ -234,19 +242,21 @@
producer.send(secondMessage)
producer.close()
- // verify there are two trigger activations required to handle these messages
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
+ retry({
+ // verify there are two trigger activations required to handle these messages
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
- activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
- } yield activation.right.get
+ println("Verifying activation content")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
+ activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
+ } yield activation.right.get
- assert(matchingActivations.length == 2)
+ assert(matchingActivations.length == 2)
+ }, N = 3)
}
it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -289,18 +299,20 @@
producer.send(bigMessage)
producer.close()
- // verify there are no activations that match
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ retry({
+ // verify there are no activations that match
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
- } yield activation.right.get
+ println("Verifying activation content")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
+ } yield activation.right.get
- assert(matchingActivations.length == 0)
+ assert(matchingActivations.length == 0)
+ }, N = 3)
}
it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -333,8 +345,7 @@
))
withActivation(wsk.activation, run) {
- activation =>
- activation.response.success shouldBe false
+ _.response.success shouldBe false
}
}
@@ -370,8 +381,7 @@
))
withActivation(wsk.activation, run) {
- activation =>
- activation.response.success shouldBe false
+ _.response.success shouldBe false
}
}
@@ -430,8 +440,7 @@
))
withActivation(wsk.activation, updateRunResult) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
println("Giving the consumer a moment to get ready")
@@ -469,29 +478,31 @@
}
def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
- assert(activations.length == 1)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+ assert(activations.length == 1)
- println("Validating content of activation(s)")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
- } yield activation.right.get
+ println("Validating content of activation(s)")
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
+ assert(messages.length == 1)
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(key.toJson)
+ val message = messages.head
+ message.getFieldPath("topic") shouldBe Some(topic.toJson)
+ message.getFieldPath("key") shouldBe Some(key.toJson)
+ }, N = 3)
}
def generateMessage(prefix: String, size: Int): String = {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 3c2aac8..4d2b38c 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -35,12 +35,15 @@
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
+
import spray.json.DefaultJsonProtocol._
import spray.json.pimpAny
import java.util.Base64
import java.nio.charset.StandardCharsets
+import whisk.utils.retry
+
@RunWith(classOf[JUnitRunner])
class MessageHubProduceTests
extends FlatSpec
@@ -150,7 +153,6 @@
}
it should "Post a message with a binary value" in withAssetCleaner(wskprops) {
- // create trigger
val currentTime = s"${System.currentTimeMillis}"
(wp, assetHelper) =>
@@ -169,53 +171,53 @@
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
-
- val defaultActionName = s"helloKafka-${currentTime}"
-
- assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
- action.create(name, defaultAction)
- }
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
- rule.create(name, trigger = triggerName, action = defaultActionName)
+ _.response.success shouldBe true
}
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
+ val defaultActionName = s"helloKafka-${currentTime}"
+
+ assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+ action.create(name, defaultAction)
+ }
+
+ assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
+ rule.create(name, trigger = triggerName, action = defaultActionName)
+ }
+
// produce message
val decodedMessage = "This will be base64 encoded"
val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
+ println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.nonEmpty)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
+ assert(messages.length == 1)
+ }, N = 3)
}
it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -238,52 +240,52 @@
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
-
- val defaultActionName = s"helloKafka-${currentTime}"
-
- assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
- action.create(name, defaultAction)
- }
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
- rule.create(name, trigger = triggerName, action = defaultActionName)
+ _.response.success shouldBe true
}
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
+ val defaultActionName = s"helloKafka-${currentTime}"
+
+ assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+ action.create(name, defaultAction)
+ }
+
+ assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
+ rule.create(name, trigger = triggerName, action = defaultActionName)
+ }
+
// produce message
val decodedKey = "This will be base64 encoded"
val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
+ println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
- activation =>
- activation.response.success shouldBe true
+ _.response.success shouldBe true
}
- // verify trigger fired
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.length > 0)
+ retry({
+ println("Polling for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+ assert(activations.nonEmpty)
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
- } yield activation.right.get
+ val matchingActivations = for {
+ id <- activations
+ activation = wsk.activation.waitForActivation(id)
+ if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
+ } yield activation.right.get
- assert(matchingActivations.length == 1)
+ assert(matchingActivations.length > 0)
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+ val activation = matchingActivations.head
+ activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
- assert(messages.length == 1)
+ // assert that there exists a message in the activation which has the expected keys and values
+ val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
+ assert(messages.length == 1)
+ }, N = 3)
}
}