MessageHubFeedTests Resiliency Updates (#291)

diff --git a/tests/dat/createTriggerActionsFromEncodedMessage.js b/tests/dat/createTriggerActionsFromEncodedMessage.js
new file mode 100644
index 0000000..0c27eb8
--- /dev/null
+++ b/tests/dat/createTriggerActionsFromEncodedMessage.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+    console.log(JSON.stringify(params));
+    var name = new Buffer(params.messages[0].value, 'base64').toString('ascii');
+    var ow = openwhisk({ignore_certs: true});
+    return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index f2e6180..d970d4b 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -20,7 +20,6 @@
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -28,23 +27,17 @@
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
 import org.apache.kafka.clients.producer.ProducerRecord
-
 import spray.json.DefaultJsonProtocol._
 import spray.json._
-
 import common.JsHelpers
-import common.TestUtils
 import common.TestHelpers
 import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
-
 import ActionHelper._
 
-import java.util.Base64
-import java.nio.charset.StandardCharsets
-
+import common.TestUtils.NOT_FOUND
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
@@ -75,8 +68,6 @@
   val wsk = new Wsk()
   val actionName = s"${messagingPackage}/${messageHubFeed}"
 
-  val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
-
   behavior of "Message Hub feed action"
 
   it should "reject invocation when topic argument is missing" in {
@@ -126,78 +117,7 @@
     runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
   }
 
-  it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) {
-    val currentTime = s"${System.currentTimeMillis}"
-
-    (wp, assetHelper) =>
-      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
-
-      createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "isBinaryKey" -> true.toJson,
-        "isBinaryValue" -> true.toJson))
-
-      val defaultActionName = s"helloKafka-${currentTime}"
-
-      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-        action.create(name, defaultAction)
-      }
-      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
-        rule.create(name, trigger = triggerName, action = defaultActionName)
-      }
-
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
-      // key to use for the produced message
-      val key = "TheKey"
-      val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
-      val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
-
-      withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> currentTime.toJson))) {
-        _.response.success shouldBe true
-      }
-
-      retry({
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-        assert(activations.nonEmpty)
-
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-        } yield activation.right.get
-
-        assert(matchingActivations.nonEmpty)
-
-        val activation = matchingActivations.head
-        activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-        // assert that there exists a message in the activation which has the expected keys and values
-        val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
-        assert(messages.length == 1)
-
-        val message = messages.head
-        message.getFieldPath("topic") shouldBe Some(topic.toJson)
-        message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
-      }, N = 3)
-  }
-
-  it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
+  it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) {
     // payload size should be under the payload limit, but greater than 50% of the limit
     val testPayloadSize = 600000
 
@@ -218,6 +138,7 @@
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
 
+      val defaultAction = Some("dat/createTriggerActionsFromKey.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -227,6 +148,16 @@
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName1 = s"trigger1-$currentTime"
+      val verificationName2 = s"trigger2-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+      assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
       // It takes a moment for the consumer to fully initialize.
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
@@ -235,27 +166,14 @@
       // This should ensure that the feed fires these as two separate triggers.
       println("Rapidly producing two large messages")
       val producer = kafkaUtils.createProducer()
-      val firstMessage = new ProducerRecord(topic, "key", generateMessage(s"first${currentTime}", testPayloadSize))
-      val secondMessage = new ProducerRecord(topic, "key", generateMessage(s"second${currentTime}", testPayloadSize))
+      val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
+      val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
       producer.send(firstMessage)
       producer.send(secondMessage)
       producer.close()
 
-      retry({
-        // verify there are two trigger activations required to handle these messages
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
-
-        println("Verifying activation content")
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
-            activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
-        } yield activation.right.get
-
-        assert(matchingActivations.length == 2)
-      }, N = 3)
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+      retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
   it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -279,6 +197,7 @@
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
 
+      val defaultAction = Some("dat/createTriggerActionsFromKey.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -288,30 +207,21 @@
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName = s"trigger-$currentTime"
+
+      wsk.trigger.get(verificationName, NOT_FOUND)
+
       // It takes a moment for the consumer to fully initialize.
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
       println("Producing an oversized message")
       val producer = kafkaUtils.createProducer()
-      val bigMessage = new ProducerRecord(topic, "key", generateMessage(s"${currentTime}", testPayloadSize))
+      val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
       producer.send(bigMessage)
       producer.close()
 
-      retry({
-        // verify there are no activations that match
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-
-        println("Verifying activation content")
-        val matchingActivations = for {
-          id <- activations
-          activation = wsk.activation.waitForActivation(id)
-          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
-        } yield activation.right.get
-
-        assert(matchingActivations.isEmpty)
-      }, N = 3)
+      a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
   it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -485,15 +395,22 @@
         "topic" -> topic.toJson
       ))
 
+      val defaultAction1 = Some("dat/createTriggerActions.js")
       val defaultActionName = s"helloKafka-${currentTime}"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-        action.create(name, defaultAction)
+        action.create(name, defaultAction1)
       }
       assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
         rule.create(name, trigger = triggerName, action = defaultActionName)
       }
 
+      val verificationName1 = s"trigger1-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
@@ -504,12 +421,12 @@
         "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
-        "value" -> currentTime.toJson
+        "value" -> verificationName1.toJson
       ))) {
         _.response.success shouldBe true
       }
 
-      checkForActivations(1, triggerName, topic, key, currentTime)
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
 
       println("Updating trigger")
 
@@ -524,11 +441,18 @@
         _.response.success shouldBe true
       }
 
+      val verificationName2 = s"trigger2-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      val defaultAction2 = Some("dat/createTriggerActionsFromEncodedMessage.js")
+      wsk.action.create(defaultActionName, defaultAction2, update = true)
+
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
 
-      val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
-
       println("Producing a message")
       withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
         "user" -> kafkaUtils.getAsJson("user"),
@@ -536,12 +460,12 @@
         "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
-        "value" -> currentTime.toJson
+        "value" -> verificationName2.toJson
       ))) {
         _.response.success shouldBe true
       }
 
-      checkForActivations(2, triggerName, topic, key, encodedCurrentTime)
+      retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
@@ -557,34 +481,6 @@
     }
   }
 
-  def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = {
-    retry({
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries)
-      assert(activations.nonEmpty)
-
-      println("Validating content of activation(s)")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
-      } yield activation.right.get
-
-      assert(matchingActivations.nonEmpty)
-
-      val activation = matchingActivations.head
-      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-      // assert that there exists a message in the activation which has the expected keys and values
-      val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
-      assert(messages.length == 1)
-
-      val message = messages.head
-      message.getFieldPath("topic") shouldBe Some(topic.toJson)
-      message.getFieldPath("key") shouldBe Some(key.toJson)
-    }, N = 3)
-  }
-
   def generateMessage(prefix: String, size: Int): String = {
     val longString = Array.fill[String](size)("0").mkString("")
     s"${prefix}${longString}"