MessageHubFeedTests Resiliency Updates (#291)
diff --git a/tests/dat/createTriggerActionsFromEncodedMessage.js b/tests/dat/createTriggerActionsFromEncodedMessage.js
new file mode 100644
index 0000000..0c27eb8
--- /dev/null
+++ b/tests/dat/createTriggerActionsFromEncodedMessage.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+ console.log(JSON.stringify(params));
+ var name = new Buffer(params.messages[0].value, 'base64').toString('ascii');
+ var ow = openwhisk({ignore_certs: true});
+ return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index f2e6180..d970d4b 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -20,7 +20,6 @@
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
-
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
@@ -28,23 +27,17 @@
import org.scalatest.Inside
import org.scalatest.junit.JUnitRunner
import org.apache.kafka.clients.producer.ProducerRecord
-
import spray.json.DefaultJsonProtocol._
import spray.json._
-
import common.JsHelpers
-import common.TestUtils
import common.TestHelpers
import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
-
import ActionHelper._
-import java.util.Base64
-import java.nio.charset.StandardCharsets
-
+import common.TestUtils.NOT_FOUND
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
@@ -75,8 +68,6 @@
val wsk = new Wsk()
val actionName = s"${messagingPackage}/${messageHubFeed}"
- val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-
behavior of "Message Hub feed action"
it should "reject invocation when topic argument is missing" in {
@@ -126,78 +117,7 @@
runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
}
- it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) {
- val currentTime = s"${System.currentTimeMillis}"
-
- (wp, assetHelper) =>
- val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
-
- createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson,
- "isBinaryKey" -> true.toJson,
- "isBinaryValue" -> true.toJson))
-
- val defaultActionName = s"helloKafka-${currentTime}"
-
- assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
- action.create(name, defaultAction)
- }
- assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
- rule.create(name, trigger = triggerName, action = defaultActionName)
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
- // key to use for the produced message
- val key = "TheKey"
- val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
- val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
-
- withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson,
- "key" -> key.toJson,
- "value" -> currentTime.toJson))) {
- _.response.success shouldBe true
- }
-
- retry({
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
- assert(activations.nonEmpty)
-
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
- } yield activation.right.get
-
- assert(matchingActivations.nonEmpty)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
- assert(messages.length == 1)
-
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
- }, N = 3)
- }
-
- it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
+ it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) {
// payload size should be under the payload limit, but greater than 50% of the limit
val testPayloadSize = 600000
@@ -218,6 +138,7 @@
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
+ val defaultAction = Some("dat/createTriggerActionsFromKey.js")
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -227,6 +148,16 @@
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val verificationName1 = s"trigger1-$currentTime"
+ val verificationName2 = s"trigger2-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+ assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
@@ -235,27 +166,14 @@
// This should ensure that the feed fires these as two separate triggers.
println("Rapidly producing two large messages")
val producer = kafkaUtils.createProducer()
- val firstMessage = new ProducerRecord(topic, "key", generateMessage(s"first${currentTime}", testPayloadSize))
- val secondMessage = new ProducerRecord(topic, "key", generateMessage(s"second${currentTime}", testPayloadSize))
+ val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
+ val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
producer.send(firstMessage)
producer.send(secondMessage)
producer.close()
- retry({
- // verify there are two trigger activations required to handle these messages
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
-
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
- activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
- } yield activation.right.get
-
- assert(matchingActivations.length == 2)
- }, N = 3)
+ retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+ retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
}
it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -279,6 +197,7 @@
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
+ val defaultAction = Some("dat/createTriggerActionsFromKey.js")
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -288,30 +207,21 @@
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val verificationName = s"trigger-$currentTime"
+
+ wsk.trigger.get(verificationName, NOT_FOUND)
+
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
println("Producing an oversized message")
val producer = kafkaUtils.createProducer()
- val bigMessage = new ProducerRecord(topic, "key", generateMessage(s"${currentTime}", testPayloadSize))
+ val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
producer.send(bigMessage)
producer.close()
- retry({
- // verify there are no activations that match
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-
- println("Verifying activation content")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
- } yield activation.right.get
-
- assert(matchingActivations.isEmpty)
- }, N = 3)
+ a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
}
it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -485,15 +395,22 @@
"topic" -> topic.toJson
))
+ val defaultAction1 = Some("dat/createTriggerActions.js")
val defaultActionName = s"helloKafka-${currentTime}"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
- action.create(name, defaultAction)
+ action.create(name, defaultAction1)
}
assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
rule.create(name, trigger = triggerName, action = defaultActionName)
}
+ val verificationName1 = s"trigger1-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
@@ -504,12 +421,12 @@
"kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
- "value" -> currentTime.toJson
+ "value" -> verificationName1.toJson
))) {
_.response.success shouldBe true
}
- checkForActivations(1, triggerName, topic, key, currentTime)
+ retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
println("Updating trigger")
@@ -524,11 +441,18 @@
_.response.success shouldBe true
}
+ val verificationName2 = s"trigger2-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
+ val defaultAction2 = Some("dat/createTriggerActionsFromEncodedMessage.js")
+ wsk.action.create(defaultActionName, defaultAction2, update = true)
+
println("Giving the consumer a moment to get ready")
Thread.sleep(consumerInitTime)
- val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
-
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
"user" -> kafkaUtils.getAsJson("user"),
@@ -536,12 +460,12 @@
"kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
- "value" -> currentTime.toJson
+ "value" -> verificationName2.toJson
))) {
_.response.success shouldBe true
}
- checkForActivations(2, triggerName, topic, key, encodedCurrentTime)
+ retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
}
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
@@ -557,34 +481,6 @@
}
}
- def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = {
- retry({
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries)
- assert(activations.nonEmpty)
-
- println("Validating content of activation(s)")
- val matchingActivations = for {
- id <- activations
- activation = wsk.activation.waitForActivation(id)
- if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
- } yield activation.right.get
-
- assert(matchingActivations.nonEmpty)
-
- val activation = matchingActivations.head
- activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
- // assert that there exists a message in the activation which has the expected keys and values
- val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
- assert(messages.length == 1)
-
- val message = messages.head
- message.getFieldPath("topic") shouldBe Some(topic.toJson)
- message.getFieldPath("key") shouldBe Some(key.toJson)
- }, N = 3)
- }
-
def generateMessage(prefix: String, size: Int): String = {
val longString = Array.fill[String](size)("0").mkString("")
s"${prefix}${longString}"