Update MessageHubProduceTests to not rely on activation polling (#287)

diff --git a/tests/dat/createTriggerActionsFromKey.js b/tests/dat/createTriggerActionsFromKey.js
new file mode 100644
index 0000000..d9bf7dd
--- /dev/null
+++ b/tests/dat/createTriggerActionsFromKey.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+    console.log(JSON.stringify(params));
+    var name = params.messages[0].key;
+    var ow = openwhisk({ignore_certs: true});
+    return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 386ead9..4170ab2 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -150,7 +150,7 @@
       })
   }
 
-  ignore should "balance the load accross workers when a worker is added" in withAssetCleaner(wskprops) {
+  ignore should "balance the load across workers when a worker is added" in withAssetCleaner(wskprops) {
 
     (wp, assetHelper) =>
       val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 080fe2b..6f46186 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -30,7 +30,7 @@
 
 import common.JsHelpers
 import common.TestHelpers
-import common.TestUtils
+import common.TestUtils.NOT_FOUND
 import common.Wsk
 import common.WskActorSystem
 import common.WskProps
@@ -70,8 +70,6 @@
 
     val maxRetries = System.getProperty("max.retries", "60").toInt
 
-    val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-
     // these parameter values are 100% valid and should work as-is
     val validParameters = Map(
         "user" -> kafkaUtils.getAsJson("user"),
@@ -168,6 +166,7 @@
             println("Giving the consumer a moment to get ready")
             Thread.sleep(consumerInitTime)
 
+            val defaultAction = Some("dat/createTriggerActions.js")
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -178,9 +177,14 @@
                 rule.create(name, trigger = triggerName, action = defaultActionName)
             }
 
+            val verificationName = s"trigger-$currentTime"
+
+            assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+                trigger.get(name, NOT_FOUND)
+            }
+
             // produce message
-            val decodedMessage = "This will be base64 encoded"
-            val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
+            val encodedMessage = Base64.getEncoder.encodeToString(verificationName.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
 
             println("Producing a message")
@@ -188,26 +192,7 @@
                 _.response.success shouldBe true
             }
 
-            retry({
-                println("Polling for activations")
-                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-                assert(activations.nonEmpty)
-
-                val matchingActivations = for {
-                    id <- activations
-                    activation = wsk.activation.waitForActivation(id)
-                    if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
-                } yield activation.right.get
-
-                assert(matchingActivations.nonEmpty)
-
-                val activation = matchingActivations.head
-                activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-                // assert that there exists a message in the activation which has the expected keys and values
-                val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
-                assert(messages.length == 1)
-            }, N = 3)
+            retry(wsk.trigger.get(verificationName), 60, Some(1.second))
     }
 
     it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -237,6 +222,7 @@
             println("Giving the consumer a moment to get ready")
             Thread.sleep(consumerInitTime)
 
+            val defaultAction = Some("dat/createTriggerActionsFromKey.js")
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -247,9 +233,14 @@
                 rule.create(name, trigger = triggerName, action = defaultActionName)
             }
 
+            val verificationName = s"trigger-$currentTime"
+
+            assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+                trigger.get(name, NOT_FOUND)
+            }
+
             // produce message
-            val decodedKey = "This will be base64 encoded"
-            val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
+            val encodedKey = Base64.getEncoder.encodeToString(verificationName.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
 
             println("Producing a message")
@@ -257,25 +248,6 @@
                 _.response.success shouldBe true
             }
 
-            retry({
-                println("Polling for activations")
-                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-                assert(activations.nonEmpty)
-
-                val matchingActivations = for {
-                    id <- activations
-                    activation = wsk.activation.waitForActivation(id)
-                    if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
-                } yield activation.right.get
-
-                assert(matchingActivations.nonEmpty)
-
-                val activation = matchingActivations.head
-                activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-                // assert that there exists a message in the activation which has the expected keys and values
-                val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
-                assert(messages.length == 1)
-            }, N = 3)
+            retry(wsk.trigger.get(verificationName), 60, Some(1.second))
     }
 }