blob: a08a2b33bd27f6972a776dc2fc2aae12bd030b25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package system.utils
import java.util.HashMap
import java.util.Properties
import java.util.concurrent.{TimeUnit, TimeoutException}
import io.restassured.RestAssured
import io.restassured.config.{RestAssuredConfig, SSLConfig}
import javax.security.auth.login.Configuration
import javax.security.auth.login.AppConfigurationEntry
import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.mutable.ListBuffer
import spray.json.DefaultJsonProtocol._
import spray.json._
import system.packages.ActionHelper._
import org.apache.openwhisk.utils.JsHelpers
import scala.concurrent.duration.DurationInt
import common.TestHelpers
import common.TestUtils
import common.WskTestHelpers
import common.ActivationResult
import org.apache.openwhisk.utils.retry
import org.apache.kafka.clients.producer.ProducerRecord
trait KafkaUtils extends TestHelpers with WskTestHelpers {
lazy val messageHubProps = KafkaUtils.initializeMessageHub()
def createProducer() : KafkaProducer[String, String] = {
// currently only supporting MH
new KafkaProducer[String, String](KafkaUtils.asKafkaProducerProps(this.messageHubProps))
}
def apply(key : String) = {
this.messageHubProps.getOrElse(key, "")
}
def getAsJson(key : String) = {
key match {
case key if key == "brokers" => this(key).asInstanceOf[List[String]].toJson
case key => this(key).asInstanceOf[String].toJson
}
}
val sslconfig = {
val inner = new SSLConfig().allowAllHostnames()
val config = inner.relaxedHTTPSValidation()
new RestAssuredConfig().sslConfig(config)
}
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]): String = {
println(s"Creating trigger $name")
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) =>
trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
}
val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult]
// should be successful
activation.response.success shouldBe true
// It takes a moment for the consumer to fully initialize.
println("Giving the consumer a moment to get ready")
Thread.sleep(KafkaUtils.consumerInitTime)
val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
consumerExists(uuid)
uuid
}
def consumerExists(uuid: String) = {
println("Checking health endpoint(s) for existence of consumer uuid")
// get /health endpoint(s) and ensure it contains the new uuid
val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
assert(healthUrls.size != 0)
retry({
val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
val response = RestAssured.given().config(sslconfig).get(u)
assert(response.statusCode() == 200)
response.asString()
.parseJson
.asJsObject
.getFields("consumers")
.head
.convertTo[JsArray]
.elements
.flatMap(c => {
val consumer = c.asJsObject.fields.head
consumer match {
case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer)
case _ => None
}
})
})
assert(uuids.nonEmpty)
}, N = 60, waitBeforeRetry = Some(1.second))
}
def produceMessage(topic: String, key: String, value: String) = {
println(s"Producing message with key: $key and value: $value")
val producer = createProducer()
val record = new ProducerRecord(topic, key, value)
val future = producer.send(record)
producer.flush()
producer.close()
try {
val result = future.get(60, TimeUnit.SECONDS)
println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
} catch {
case e: TimeoutException =>
fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
case e: Exception => throw e
}
}
}
object KafkaUtils {
val consumerInitTime = 10000 // ms
def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
val requiredKeys = List("brokers",
"user",
"password",
"key.serializer",
"value.serializer",
"security.protocol",
"max.request.size")
val propertyMap = props.filterKeys(
requiredKeys.contains(_)
).map(
tuple =>
tuple match {
// transform "brokers" key to "bootstrap.servers"
case (k, v) if k == "brokers" => ("bootstrap.servers", v.asInstanceOf[List[String]].mkString(","))
case _ => tuple
}
)
val kafkaProducerProps = new Properties()
for ((k, v) <- propertyMap) kafkaProducerProps.put(k, v)
kafkaProducerProps
}
def messagesInActivation(activation : JsObject, field: String, value: String) : Array[JsObject] = {
val messages = JsHelpers.getFieldPath(activation, "response", "result", "messages").getOrElse(JsArray.empty).convertTo[Array[JsObject]]
messages.filter {
JsHelpers.getFieldPath(_, field) == Some(value.toJson)
}
}
private def initializeMessageHub() = {
// get the vcap stuff
var credentials = TestUtils.getCredentials("message_hub")
// initialize the set of tuples to go into the resulting Map
val user = ("user", credentials.get("user").getAsString())
val password = ("password", credentials.get("password").getAsString())
val kafka_admin_url = ("kafka_admin_url", credentials.get("kafka_admin_url").getAsString())
val api_key = ("api_key", credentials.get("api_key").getAsString())
val security_protocol = ("security.protocol", "SASL_SSL");
val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val maxRequestSize = ("max.request.size", "3000000");
var brokerList = new ListBuffer[String]()
val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
val brokerIterator = jsonArray.iterator()
while(brokerIterator.hasNext()) {
val current = brokerIterator.next().getAsString
brokerList += current
}
val brokers = ("brokers", brokerList.toList)
System.setProperty("java.security.auth.login.config", "")
setMessageHubSecurityConfiguration(user._2, password._2)
Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer, maxRequestSize)
}
private def setMessageHubSecurityConfiguration(user: String, password: String) = {
val map = new HashMap[String, String]()
map.put("serviceName", "kafka")
map.put("username", user)
map.put("password", password)
Configuration.setConfiguration(new Configuration()
{
def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = Array(
new AppConfigurationEntry (
"com.ibm.messagehub.login.MessageHubLoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, map))
})
}
}