Update Tests to Wait for Producer to Finish (#306)

* Wait for producer to finish

* Expect Exception from Producer for Oversized Payload

* Add comment to test

* Increase producer max.request.size
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 7b90679..ae8c788 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,11 +17,8 @@
 
 package system.health
 
-import java.util.concurrent.{TimeUnit, TimeoutException}
-
 import common.TestUtils.NOT_FOUND
 import common._
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
@@ -101,16 +98,6 @@
 
       produceMessage(topic, key, verificationName)
 
-      try {
-        val result = future.get(60, TimeUnit.SECONDS)
-
-        println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
-      } catch {
-        case e: TimeoutException =>
-          fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
-        case e: Exception => throw e
-      }
-
       // Check if the trigger, that should have been created as reaction on the kafka-message, has been created.
       // The trigger should have been created by the action, that has been triggered by the kafka message.
       // If we cannot find it, the most probably the action did not run.
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d691125..941f846 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -38,6 +38,7 @@
 import ActionHelper._
 import common.TestUtils.NOT_FOUND
 import org.apache.openwhisk.utils.retry
+import java.util.concurrent.ExecutionException
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -197,9 +198,9 @@
       val verificationName = s"trigger-$currentTime"
 
       wsk.trigger.get(verificationName, NOT_FOUND)
-      println("Producing an oversized message")
-      produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
 
+      // The producer will generate an error as the payload size is too large for the MessageHub brokers
+      a[ExecutionException] should be thrownBy produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
       a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index 62361a6..19345b3 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -19,6 +19,7 @@
 
 import java.util.HashMap
 import java.util.Properties
+import java.util.concurrent.{TimeUnit, TimeoutException}
 
 import com.jayway.restassured.RestAssured
 import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
@@ -38,6 +39,7 @@
 import common.TestUtils
 import common.WskTestHelpers
 import org.apache.openwhisk.utils.retry
+import org.apache.kafka.clients.producer.ProducerRecord
 
 trait KafkaUtils extends TestHelpers with WskTestHelpers {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
@@ -126,6 +128,16 @@
 
         producer.flush()
         producer.close()
+
+        try {
+          val result = future.get(60, TimeUnit.SECONDS)
+
+          println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
+        } catch {
+          case e: TimeoutException =>
+            fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
+          case e: Exception => throw e
+        }
     }
 }
 
@@ -138,7 +150,8 @@
                                 "password",
                                 "key.serializer",
                                 "value.serializer",
-                                "security.protocol")
+                                "security.protocol",
+                                "max.request.size")
 
         val propertyMap = props.filterKeys(
             requiredKeys.contains(_)
@@ -176,7 +189,7 @@
         val security_protocol = ("security.protocol", "SASL_SSL");
         val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
+        val maxRequestSize = ("max.request.size", "3000000");
         var brokerList = new ListBuffer[String]()
         val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
         val brokerIterator = jsonArray.iterator()
@@ -190,7 +203,7 @@
         System.setProperty("java.security.auth.login.config", "")
         setMessageHubSecurityConfiguration(user._2, password._2)
 
-        Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer)
+        Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer, maxRequestSize)
     }
 
     private def setMessageHubSecurityConfiguration(user: String, password: String) = {