Resiliency Updates for MessageHub Tests (#253)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 02183d2..139ed21 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -141,9 +141,7 @@
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       // It takes a moment for the consumer to fully initialize.
@@ -280,9 +278,7 @@
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       val readRunResult = wsk.action.invoke(actionName, parameters = Map(
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 1fe18a1..e280e39 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -17,18 +17,21 @@
 package system.packages
 
 import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
+import org.apache.kafka.clients.producer.ProducerRecord
+
 import spray.json.DefaultJsonProtocol._
 import spray.json._
+
 import common.JsHelpers
 import common.TestUtils
 import common.TestHelpers
@@ -36,11 +39,14 @@
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import ActionHelper._
+
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 import java.time.{Clock, Instant}
 
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -166,28 +172,30 @@
         _.response.success shouldBe true
       }
 
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-      assert(activations.length == 1)
+      retry({
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+        assert(activations.length == 1)
 
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-      } yield activation.right.get
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 1)
+        assert(matchingActivations.length > 0)
 
-      val activation = matchingActivations.head
-      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+        val activation = matchingActivations.head
+        activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-      // assert that there exists a message in the activation which has the expected keys and values
-      val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
-      assert(messages.length == 1)
+        // assert that there exists a message in the activation which has the expected keys and values
+        val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime)
+        assert(messages.length == 1)
 
-      val message = messages.head
-      message.getFieldPath("topic") shouldBe Some(topic.toJson)
-      message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+        val message = messages.head
+        message.getFieldPath("topic") shouldBe Some(topic.toJson)
+        message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+      }, N = 3)
   }
 
   it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) {
@@ -234,19 +242,21 @@
       producer.send(secondMessage)
       producer.close()
 
-      // verify there are two trigger activations required to handle these messages
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
+      retry({
+        // verify there are two trigger activations required to handle these messages
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries)
 
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
-          activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
-      } yield activation.right.get
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") ||
+            activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 2)
+        assert(matchingActivations.length == 2)
+      }, N = 3)
   }
 
   it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) {
@@ -289,18 +299,20 @@
       producer.send(bigMessage)
       producer.close()
 
-      // verify there are no activations that match
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+      retry({
+        // verify there are no activations that match
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
 
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
-      } yield activation.right.get
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 0)
+        assert(matchingActivations.length == 0)
+      }, N = 3)
   }
 
   it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
@@ -333,8 +345,7 @@
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -370,8 +381,7 @@
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -430,8 +440,7 @@
       ))
 
       withActivation(wsk.activation, updateRunResult) {
-        activation =>
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       println("Giving the consumer a moment to get ready")
@@ -469,29 +478,31 @@
   }
 
   def checkForActivations(triggerName: String, since: Instant, topic: String, key: String, value: String) = {
-    println("Polling for activations")
-    val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
-    assert(activations.length == 1)
+    retry({
+      println("Polling for activations")
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = Some(since), retries = maxRetries)
+      assert(activations.length == 1)
 
-    println("Validating content of activation(s)")
-    val matchingActivations = for {
-      id <- activations
-      activation = wsk.activation.waitForActivation(id)
-      if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
-    } yield activation.right.get
+      println("Validating content of activation(s)")
+      val matchingActivations = for {
+        id <- activations
+        activation = wsk.activation.waitForActivation(id)
+        if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value))
+      } yield activation.right.get
 
-    assert(matchingActivations.length == 1)
+      assert(matchingActivations.length > 0)
 
-    val activation = matchingActivations.head
-    activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+      val activation = matchingActivations.head
+      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-    // assert that there exists a message in the activation which has the expected keys and values
-    val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
-    assert(messages.length == 1)
+      // assert that there exists a message in the activation which has the expected keys and values
+      val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value)
+      assert(messages.length == 1)
 
-    val message = messages.head
-    message.getFieldPath("topic") shouldBe Some(topic.toJson)
-    message.getFieldPath("key") shouldBe Some(key.toJson)
+      val message = messages.head
+      message.getFieldPath("topic") shouldBe Some(topic.toJson)
+      message.getFieldPath("key") shouldBe Some(key.toJson)
+    }, N = 3)
   }
 
   def generateMessage(prefix: String, size: Int): String = {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 3c2aac8..4d2b38c 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -35,12 +35,15 @@
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import spray.json.DefaultJsonProtocol._
 import spray.json.pimpAny
 
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 
+import whisk.utils.retry
+
 @RunWith(classOf[JUnitRunner])
 class MessageHubProduceTests
     extends FlatSpec
@@ -150,7 +153,6 @@
     }
 
     it should "Post a message with a binary value" in withAssetCleaner(wskprops) {
-        // create trigger
         val currentTime = s"${System.currentTimeMillis}"
 
         (wp, assetHelper) =>
@@ -169,53 +171,53 @@
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
-            }
-
-            val defaultActionName = s"helloKafka-${currentTime}"
-
-            assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-                action.create(name, defaultAction)
-            }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
-                rule.create(name, trigger = triggerName, action = defaultActionName)
+                _.response.success shouldBe true
             }
 
             // It takes a moment for the consumer to fully initialize.
             println("Giving the consumer a moment to get ready")
             Thread.sleep(consumerInitTime)
 
+            val defaultActionName = s"helloKafka-${currentTime}"
+
+            assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+                action.create(name, defaultAction)
+            }
+
+            assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
+                rule.create(name, trigger = triggerName, action = defaultActionName)
+            }
+
             // produce message
             val decodedMessage = "This will be base64 encoded"
             val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-            // assert that there exists a message in the activation which has the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 
     it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
@@ -238,52 +240,52 @@
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
-            }
-
-            val defaultActionName = s"helloKafka-${currentTime}"
-
-            assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
-                action.create(name, defaultAction)
-            }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
-                rule.create(name, trigger = triggerName, action = defaultActionName)
+                _.response.success shouldBe true
             }
 
             // It takes a moment for the consumer to fully initialize.
             println("Giving the consumer a moment to get ready")
             Thread.sleep(consumerInitTime)
 
+            val defaultActionName = s"helloKafka-${currentTime}"
+
+            assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+                action.create(name, defaultAction)
+            }
+
+            assetHelper.withCleaner(wsk.rule, s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
+                rule.create(name, trigger = triggerName, action = defaultActionName)
+            }
+
             // produce message
             val decodedKey = "This will be base64 encoded"
             val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-            // assert that there exists a message in the activation which has the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 }