Ensure test consumers exists before producing messages (#292)
* Ensure test consumers exists before producing messages
* Refactoring
* More refactoring
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 2b99938..949bce6 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,7 +19,6 @@
import java.util.concurrent.{TimeUnit, TimeoutException}
-import com.jayway.restassured.RestAssured
import common.TestUtils.NOT_FOUND
import common._
import org.apache.kafka.clients.producer.ProducerRecord
@@ -43,7 +42,8 @@
with TestHelpers
with WskTestHelpers
with Inside
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -55,125 +55,24 @@
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
val actionName = s"$messagingPackage/$messageHubFeed"
-
- val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
behavior of "Message Hub feed"
- it should "create a new trigger" in withAssetCleaner(wskprops) {
- val triggerName = s"newTrigger-${System.currentTimeMillis}"
- println(s"Creating trigger $triggerName")
-
- (wp, assetHelper) =>
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
- val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
- println("Checking health endpoint(s) for existence of consumer uuid")
- // get /health endpoint(s) and ensure it contains the new uuid
- val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
- healthUrls shouldNot be(empty)
-
- retry({
- val uuids = healthUrls.flatMap(u => {
- val response = RestAssured.given().get(u)
- response.statusCode() should be(200)
- response.asString()
- .parseJson
- .asJsObject
- .getFields("consumers")
- .head
- .convertTo[JsArray]
- .elements
- .flatMap(c => {
- c.asJsObject.fields.keySet
- })
- }).toList
-
- uuids should contain(uuid)
-
- }, N = 10, waitBeforeRetry = Some(1.second))
- }
- }
-
- it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+ it should "create a consumer and fire a trigger when a message is posted to messagehub" in withAssetCleaner(wskprops) {
val currentTime = s"${System.currentTimeMillis}"
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(actionName), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
- val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
-
- println("Checking health endpoint(s) for existence of consumer uuid")
- // get /health endpoint(s) and ensure it contains the new uuid
- val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
- healthUrls shouldNot be(empty)
-
- retry({
- val uuids = healthUrls.flatMap(u => {
- val response = RestAssured.given().get(u)
- response.statusCode() should be(200)
- response.asString()
- .parseJson
- .asJsObject
- .getFields("consumers")
- .head
- .convertTo[JsArray]
- .elements
- .flatMap(c => {
- c.asJsObject.fields.keySet
- })
- }).toList
-
- uuids should contain(uuid)
-
- }, N = 10, waitBeforeRetry = Some(1.second))
- }
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson
+ ))
// This action creates a trigger if it gets executed.
// The name of the trigger will be the message, that has been send to kafka.
@@ -201,7 +100,7 @@
}
println(s"Producing message with key: $key and value: $verificationName")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val record = new ProducerRecord(topic, key, verificationName)
val future = producer.send(record)
@@ -231,17 +130,17 @@
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
println(s"Creating trigger $triggerName")
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
(trigger, _) =>
trigger.create(triggerName, feed = Some(actionName), parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
diff --git a/tests/src/test/scala/system/packages/KafkaProduceTests.scala b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
index 85d5007..e3d9094 100644
--- a/tests/src/test/scala/system/packages/KafkaProduceTests.scala
+++ b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
@@ -45,7 +45,8 @@
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -56,8 +57,6 @@
val actionName = "kafkaProduceAction"
val actionFile = "../action/kafkaProduce.py"
- val kafkaUtils = new KafkaUtils
-
behavior of "Kafka Produce action"
override def beforeAll() {
@@ -73,7 +72,7 @@
def testMissingParameter(missingParam : String) = {
var fullParamsMap = Map(
"topic" -> topic.toJson,
- "brokers" -> kafkaUtils.getAsJson("brokers"),
+ "brokers" -> getAsJson("brokers"),
"value" -> "This will fail".toJson)
var missingParamsMap = fullParamsMap.filterKeys(_ != missingParam)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 04ed5c9..4263a1d 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -36,7 +36,6 @@
import common.WskProps
import common.WskTestHelpers
import ActionHelper._
-
import common.TestUtils.NOT_FOUND
import whisk.utils.retry
@@ -49,19 +48,15 @@
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
-
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
-
val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
implicit val wskprops = WskProps()
@@ -126,14 +121,13 @@
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
@@ -158,20 +152,15 @@
trigger.get(name, NOT_FOUND)
}
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
// Rapidly produce two messages whose size are each greater than half the allowed payload limit.
// This should ensure that the feed fires these as two separate triggers.
println("Rapidly producing two large messages")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize))
val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize))
producer.send(firstMessage)
producer.send(secondMessage)
producer.close()
-
retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
}
@@ -185,14 +174,13 @@
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"isBinaryKey" -> false.toJson,
"isBinaryValue" -> false.toJson))
@@ -211,12 +199,8 @@
wsk.trigger.get(verificationName, NOT_FOUND)
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
println("Producing an oversized message")
- val producer = kafkaUtils.createProducer()
+ val producer = createProducer()
val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
producer.send(bigMessage)
producer.close()
@@ -229,17 +213,15 @@
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
-
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
createTrigger(assetHelper, triggerName, parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
@@ -263,17 +245,15 @@
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
-
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
createTrigger(assetHelper, triggerName, parameters = Map(
"user" -> username,
"password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "api_key" -> getAsJson("api_key"),
"kafka_admin_url" -> admin_url,
"kafka_brokers_sasl" -> brokers,
"topic" -> topic.toJson,
@@ -299,31 +279,22 @@
(wp, assetHelper) =>
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
+ val username = getAsJson("user")
+ val password = getAsJson("password")
+ val admin_url = getAsJson("kafka_admin_url")
+ val brokers = getAsJson("brokers")
- val username = kafkaUtils.getAsJson("user")
- val password = kafkaUtils.getAsJson("password")
- val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
- val brokers = kafkaUtils.getAsJson("brokers")
-
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(actionName), parameters = Map(
- "user" -> username,
- "password" -> password,
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> admin_url,
- "kafka_brokers_sasl" -> brokers,
- "topic" -> topic.toJson,
- "isJSONData" -> true.toJson,
- "isBinaryKey" -> false.toJson,
- "isBinaryValue" -> false.toJson
- ))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> username,
+ "password" -> password,
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> admin_url,
+ "kafka_brokers_sasl" -> brokers,
+ "topic" -> topic.toJson,
+ "isJSONData" -> true.toJson,
+ "isBinaryKey" -> false.toJson,
+ "isBinaryValue" -> false.toJson
+ ))
val readRunResult = wsk.action.invoke(actionName, parameters = Map(
"triggerName" -> triggerName.toJson,
@@ -384,14 +355,13 @@
(wp, assetHelper) =>
val key = "TheKey"
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
createTrigger(assetHelper, triggerName, parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson
))
@@ -411,14 +381,11 @@
trigger.get(name, NOT_FOUND)
}
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
-
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName1.toJson
@@ -455,9 +422,9 @@
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName2.toJson
@@ -474,16 +441,15 @@
(wp, assetHelper) =>
val key = "TheKey"
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
- println(s"Creating trigger $triggerName")
createTrigger(assetHelper, triggerName, parameters = Map(
"__bx_creds" -> Map(
"messagehub" -> Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"))).toJson,
"topic" -> topic.toJson
))
@@ -508,9 +474,9 @@
println("Producing a message")
withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"key" -> key.toJson,
"value" -> verificationName1.toJson
@@ -521,19 +487,6 @@
retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
}
- def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
- (trigger, _) =>
- trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
- }
-
def generateMessage(prefix: String, size: Int): String = {
val longString = Array.fill[String](size)("0").mkString("")
s"${prefix}${longString}"
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 4170ab2..baedd0c 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -19,7 +19,6 @@
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
@@ -49,7 +48,8 @@
with TestHelpers
with WskTestHelpers
with JsHelpers
- with StreamLogging {
+ with StreamLogging
+ with KafkaUtils {
val topic = "test"
@@ -58,7 +58,6 @@
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
-
val dbProtocol = WhiskProperties.getProperty("db.protocol")
val dbHost = WhiskProperties.getProperty("db.host")
val dbPort = WhiskProperties.getProperty("db.port").toInt
@@ -66,11 +65,8 @@
val dbPassword = WhiskProperties.getProperty("db.password")
val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
val dbName = s"${dbPrefix}ow_kafka_triggers"
-
val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, dbUsername, dbPassword, dbName)
- val kafkaUtils = new KafkaUtils
-
behavior of "Mussage Hub Feed"
ignore should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) {
@@ -187,26 +183,13 @@
})
}
- def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
- (trigger, _) =>
- trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- activation =>
- // should be successful
- activation.response.success shouldBe true
- }
- }
-
def constructParams(workers: List[String]) = {
Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson,
"workers" -> workers.toJson
)
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 5cddefb..4ba4064 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -52,7 +52,8 @@
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
- with JsHelpers {
+ with JsHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -63,19 +64,15 @@
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
-
val consumerInitTime = 10000 // ms
-
- val kafkaUtils = new KafkaUtils
-
val maxRetries = System.getProperty("max.retries", "60").toInt
// these parameter values are 100% valid and should work as-is
val validParameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
"topic" -> topic.toJson,
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"value" -> "Big Trouble is actually a really good Tim Allen movie. Seriously.".toJson)
behavior of "Message Hub Produce action"
@@ -135,26 +132,14 @@
(wp, assetHelper) =>
val triggerName = s"/_/binaryValueTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson))
val defaultAction = Some("dat/createTriggerActions.js")
val defaultActionName = s"helloKafka-${currentTime}"
@@ -191,26 +176,14 @@
(wp, assetHelper) =>
val triggerName = s"/_/binaryKeyTrigger-$currentTime"
- println(s"Creating trigger ${triggerName}")
- val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
- (trigger, _) =>
- trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
- "topic" -> topic.toJson))
- }
-
- withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
- _.response.success shouldBe true
- }
-
- // It takes a moment for the consumer to fully initialize.
- println("Giving the consumer a moment to get ready")
- Thread.sleep(consumerInitTime)
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson))
val defaultAction = Some("dat/createTriggerActionsFromKey.js")
val defaultActionName = s"helloKafka-${currentTime}"
diff --git a/tests/src/test/scala/system/stress/StressTest.scala b/tests/src/test/scala/system/stress/StressTest.scala
index b174323..bd413e0 100644
--- a/tests/src/test/scala/system/stress/StressTest.scala
+++ b/tests/src/test/scala/system/stress/StressTest.scala
@@ -42,7 +42,8 @@
with Matchers
with WskActorSystem
with TestHelpers
- with WskTestHelpers {
+ with WskTestHelpers
+ with KafkaUtils {
val topic = "test"
val sessionTimeout = 10 seconds
@@ -54,8 +55,6 @@
val messageHubFeed = "messageHubFeed"
val messageHubProduce = "messageHubProduce"
- val kafkaUtils = new KafkaUtils
-
behavior of "Message Hub provider"
it should "rapidly create and delete many triggers" in {
@@ -80,11 +79,11 @@
val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
println(s"\nCreating trigger #${iterationLabel}: ${triggerName}")
val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
- "user" -> kafkaUtils.getAsJson("user"),
- "password" -> kafkaUtils.getAsJson("password"),
- "api_key" -> kafkaUtils.getAsJson("api_key"),
- "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
- "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
"topic" -> topic.toJson))
println("Waiting for trigger create")
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index be17d6c..e01c715 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -17,24 +17,29 @@
package system.utils
-import common.TestUtils
-
import java.util.HashMap
import java.util.Properties
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
-
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.mutable.ListBuffer
-
import spray.json.DefaultJsonProtocol._
import spray.json._
-
+import system.packages.ActionHelper._
import whisk.utils.JsHelpers
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+import common.TestHelpers
+import common.TestUtils
+import common.WskTestHelpers
+import whisk.utils.retry
-class KafkaUtils {
+trait KafkaUtils extends TestHelpers with WskTestHelpers {
lazy val messageHubProps = KafkaUtils.initializeMessageHub()
def createProducer() : KafkaProducer[String, String] = {
@@ -52,9 +57,71 @@
case key => this(key).asInstanceOf[String].toJson
}
}
+
+ val sslconfig = {
+ val inner = new SSLConfig().allowAllHostnames()
+ val config = inner.relaxedHTTPSValidation()
+ new RestAssuredConfig().sslConfig(config)
+ }
+
+ def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
+ println(s"Creating trigger $name")
+
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
+ (trigger, _) =>
+ trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+ }
+
+ withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+ activation =>
+ // should be successful
+ activation.response.success shouldBe true
+
+ // It takes a moment for the consumer to fully initialize.
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(KafkaUtils.consumerInitTime)
+
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+ consumerExists(uuid)
+ }
+ }
+
+
+ def consumerExists(uuid: String) = {
+ println("Checking health endpoint(s) for existence of consumer uuid")
+ // get /health endpoint(s) and ensure it contains the new uuid
+ val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+ assert(healthUrls.size != 0)
+
+ retry({
+ val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
+ val response = RestAssured.given().config(sslconfig).get(u)
+ assert(response.statusCode() == 200)
+
+ response.asString()
+ .parseJson
+ .asJsObject
+ .getFields("consumers")
+ .head
+ .convertTo[JsArray]
+ .elements
+ .flatMap(c => {
+ val consumer = c.asJsObject.fields.head
+ consumer match {
+ case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer)
+ case _ => None
+ }
+ })
+ })
+
+ assert(uuids.nonEmpty)
+ }, N = 60, waitBeforeRetry = Some(1.second))
+ }
}
object KafkaUtils {
+ val consumerInitTime = 10000 // ms
+
def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
val requiredKeys = List("brokers",
"user",