Update Tests to Wait for Producer to Finish (#306)
* Wait for producer to finish
* Expect Exception from Producer for Oversized Payload
* Add comment to test
* Increase producer max.request.size
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 7b90679..ae8c788 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,11 +17,8 @@
package system.health
-import java.util.concurrent.{TimeUnit, TimeoutException}
-
import common.TestUtils.NOT_FOUND
import common._
-import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
@@ -101,16 +98,6 @@
produceMessage(topic, key, verificationName)
- try {
- val result = future.get(60, TimeUnit.SECONDS)
-
- println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
- } catch {
- case e: TimeoutException =>
- fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
- case e: Exception => throw e
- }
-
// Check if the trigger, that should have been created as reaction on the kafka-message, has been created.
// The trigger should have been created by the action, that has been triggered by the kafka message.
// If we cannot find it, the most probably the action did not run.
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d691125..941f846 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -38,6 +38,7 @@
import ActionHelper._
import common.TestUtils.NOT_FOUND
import org.apache.openwhisk.utils.retry
+import java.util.concurrent.ExecutionException
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -197,9 +198,9 @@
val verificationName = s"trigger-$currentTime"
wsk.trigger.get(verificationName, NOT_FOUND)
- println("Producing an oversized message")
- produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
+ // The producer will generate an error as the payload size is too large for the MessageHub brokers
+ a[ExecutionException] should be thrownBy produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
}
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index 62361a6..19345b3 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -19,6 +19,7 @@
import java.util.HashMap
import java.util.Properties
+import java.util.concurrent.{TimeUnit, TimeoutException}
import com.jayway.restassured.RestAssured
import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
@@ -38,6 +39,7 @@
import common.TestUtils
import common.WskTestHelpers
import org.apache.openwhisk.utils.retry
+import org.apache.kafka.clients.producer.ProducerRecord
trait KafkaUtils extends TestHelpers with WskTestHelpers {
lazy val messageHubProps = KafkaUtils.initializeMessageHub()
@@ -126,6 +128,16 @@
producer.flush()
producer.close()
+
+ try {
+ val result = future.get(60, TimeUnit.SECONDS)
+
+ println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
+ } catch {
+ case e: TimeoutException =>
+ fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
+ case e: Exception => throw e
+ }
}
}
@@ -138,7 +150,8 @@
"password",
"key.serializer",
"value.serializer",
- "security.protocol")
+ "security.protocol",
+ "max.request.size")
val propertyMap = props.filterKeys(
requiredKeys.contains(_)
@@ -176,7 +189,7 @@
val security_protocol = ("security.protocol", "SASL_SSL");
val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
+ val maxRequestSize = ("max.request.size", "3000000");
var brokerList = new ListBuffer[String]()
val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
val brokerIterator = jsonArray.iterator()
@@ -190,7 +203,7 @@
System.setProperty("java.security.auth.login.config", "")
setMessageHubSecurityConfiguration(user._2, password._2)
- Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer)
+ Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer, maxRequestSize)
}
private def setMessageHubSecurityConfiguration(user: String, password: String) = {