Update MessageHubProduceTests to not rely on activation polling (#287)
diff --git a/tests/dat/createTriggerActionsFromKey.js b/tests/dat/createTriggerActionsFromKey.js
new file mode 100644
index 0000000..d9bf7dd
--- /dev/null
+++ b/tests/dat/createTriggerActionsFromKey.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+ console.log(JSON.stringify(params));
+ var name = params.messages[0].key;
+ var ow = openwhisk({ignore_certs: true});
+ return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 386ead9..4170ab2 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -150,7 +150,7 @@
})
}
- ignore should "balance the load accross workers when a worker is added" in withAssetCleaner(wskprops) {
+ ignore should "balance the load across workers when a worker is added" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 080fe2b..6f46186 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -30,7 +30,7 @@
import common.JsHelpers
import common.TestHelpers
-import common.TestUtils
+import common.TestUtils.NOT_FOUND
import common.Wsk
import common.WskActorSystem
import common.WskProps
@@ -70,8 +70,6 @@
val maxRetries = System.getProperty("max.retries", "60").toInt
- val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-
// these parameter values are 100% valid and should work as-is
val validParameters = Map(
"user" -> kafkaUtils.getAsJson("user"),
@@ -168,6 +166,7 @@
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
+ val defaultAction = Some("dat/createTriggerActions.js")
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -178,9 +177,14 @@
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val verificationName = s"trigger-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
// produce message
- val decodedMessage = "This will be base64 encoded"
- val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
+ val encodedMessage = Base64.getEncoder.encodeToString(verificationName.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
println("Producing a message")
@@ -188,26 +192,7 @@
_.response.success shouldBe true
}
- retry({
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.nonEmpty)
-
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
- } yield activation.right.get
-
- assert(matchingActivations.nonEmpty)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
- assert(messages.length == 1)
- }, N = 3)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
}
it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -237,6 +222,7 @@
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
+ val defaultAction = Some("dat/createTriggerActionsFromKey.js")
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -247,9 +233,14 @@
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val verificationName = s"trigger-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
// produce message
- val decodedKey = "This will be base64 encoded"
- val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
+ val encodedKey = Base64.getEncoder.encodeToString(verificationName.getBytes(StandardCharsets.UTF_8))
val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
println("Producing a message")
@@ -257,25 +248,6 @@
_.response.success shouldBe true
}
- retry({
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.nonEmpty)
-
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
- } yield activation.right.get
-
- assert(matchingActivations.nonEmpty)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
- assert(messages.length == 1)
- }, N = 3)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
}
}