package system.utils
import java.util.HashMap
import java.util.Properties
import java.util.concurrent.{TimeUnit, TimeoutException}
import com.jayway.restassured.RestAssured
import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.mutable.ListBuffer
import spray.json.DefaultJsonProtocol._
import spray.json._
import system.packages.ActionHelper._
import org.apache.openwhisk.utils.JsHelpers
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import common.TestHelpers
import common.TestUtils
import common.WskTestHelpers
import org.apache.openwhisk.utils.retry
import org.apache.kafka.clients.producer.ProducerRecord
trait KafkaUtils extends TestHelpers with WskTestHelpers {
lazy val messageHubProps = KafkaUtils.initializeMessageHub()
def createProducer() : KafkaProducer[String, String] = {
// currently only supporting MH
new KafkaProducer[String, String](KafkaUtils.asKafkaProducerProps(this.messageHubProps))
def apply(key : String) = {
this.messageHubProps.getOrElse(key, "")
def getAsJson(key : String) = {
key match {
case key if key == "brokers" => this(key).asInstanceOf[List[String]].toJson
case key => this(key).asInstanceOf[String].toJson
val sslconfig = {
val inner = new SSLConfig().allowAllHostnames()
val config = inner.relaxedHTTPSValidation()
new RestAssuredConfig().sslConfig(config)
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
println(s"Creating trigger $name")
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) =>
trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
activation =>
// should be successful
activation.response.success shouldBe true
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
def consumerExists(uuid: String) = {
println("Checking health endpoint(s) for existence of consumer uuid")
// get /health endpoint(s) and ensure it contains the new uuid
val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
assert(healthUrls.size != 0)
val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
val response = RestAssured.given().config(sslconfig).get(u)
assert(response.statusCode() == 200)
.flatMap(c => {
val consumer = c.asJsObject.fields.head
consumer match {
case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer)
case _ => None
}, N = 60, waitBeforeRetry = Some(1.second))
def produceMessage(topic: String, key: String, value: String) = {
println(s"Producing message with key: $key and value: $value")
val producer = createProducer()
val record = new ProducerRecord(topic, key, value)
val future = producer.send(record)
try {
val result = future.get(60, TimeUnit.SECONDS)
println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
} catch {
case e: TimeoutException =>
fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
case e: Exception => throw e
object KafkaUtils {
val consumerInitTime = 10000 // ms
def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
val requiredKeys = List("brokers",
val propertyMap = props.filterKeys(
tuple =>
tuple match {
// transform "brokers" key to "bootstrap.servers"
case (k, v) if k == "brokers" => ("bootstrap.servers", v.asInstanceOf[List[String]].mkString(","))
case _ => tuple
val kafkaProducerProps = new Properties()
for ((k, v) <- propertyMap) kafkaProducerProps.put(k, v)
def messagesInActivation(activation : JsObject, field: String, value: String) : Array[JsObject] = {
val messages = JsHelpers.getFieldPath(activation, "response", "result", "messages").getOrElse(JsArray.empty).convertTo[Array[JsObject]]
messages.filter {
JsHelpers.getFieldPath(_, field) == Some(value.toJson)
private def initializeMessageHub() = {
// get the vcap stuff
var credentials = TestUtils.getCredentials("message_hub")
// initialize the set of tuples to go into the resulting Map
val user = ("user", credentials.get("user").getAsString())
val password = ("password", credentials.get("password").getAsString())
val kafka_admin_url = ("kafka_admin_url", credentials.get("kafka_admin_url").getAsString())
val api_key = ("api_key", credentials.get("api_key").getAsString())
val security_protocol = ("security.protocol", "SASL_SSL");
val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val maxRequestSize = ("max.request.size", "3000000");
var brokerList = new ListBuffer[String]()
val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
val brokerIterator = jsonArray.iterator()
while(brokerIterator.hasNext()) {
val current =
brokerList += current
val brokers = ("brokers", brokerList.toList)
System.setProperty("", "")
setMessageHubSecurityConfiguration(user._2, password._2)
Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer, maxRequestSize)
private def setMessageHubSecurityConfiguration(user: String, password: String) = {
val map = new HashMap[String, String]()
map.put("serviceName", "kafka")
map.put("username", user)
map.put("password", password)
Configuration.setConfiguration(new Configuration()
def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = Array(
new AppConfigurationEntry (
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, map))