blob: f6ce7232fe541269a4a8cf35ad6ccb4bd60b156a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package system.packages
import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import common.JsHelpers
import common.TestHelpers
import common.StreamLogging
import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
import spray.json.DefaultJsonProtocol._
import spray.json.{pimpAny, _}
import whisk.core.database.test.DatabaseScriptTestUtils
import whisk.utils.JsHelpers
@RunWith(classOf[JUnitRunner])
class MessageHubMultiWorkersTest extends FlatSpec
with Matchers
with WskActorSystem
with BeforeAndAfterAll
with TestHelpers
with WskTestHelpers
with JsHelpers
with StreamLogging
with DatabaseScriptTestUtils {
val topic = "test"
implicit val wskprops = WskProps()
val wsk = new Wsk()
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
val dbName = s"${dbPrefix}ow_kafka_triggers"
val kafkaUtils = new KafkaUtils
behavior of "Mussage Hub Feed"
it should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
val worker0 = s"worker${System.currentTimeMillis()}"
val parameters = constructParams(List(worker0))
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
validateTriggerAssignment(documents, firstTrigger, worker0)
validateTriggerAssignment(documents, secondTrigger, worker0)
}
it should "assign a trigger to worker0 and a trigger to worker1 when both workers are available" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
val worker0 = s"worker${System.currentTimeMillis()}"
val worker1 = s"worker${System.currentTimeMillis()}"
val parameters = constructParams(List(worker0, worker1))
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
validateTriggerAssignment(documents, firstTrigger, worker0)
validateTriggerAssignment(documents, secondTrigger, worker1)
}
it should "assign a trigger to worker1 when worker0 is removed and there is an assignment imbalance" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
val thirdTrigger = s"thirdTrigger-${System.currentTimeMillis()}"
val fourthTrigger = s"fourthTrigger-${System.currentTimeMillis()}"
val worker0 = s"worker${System.currentTimeMillis()}"
val worker1 = s"worker${System.currentTimeMillis()}"
val parameters = constructParams(List(worker1))
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
createTrigger(assetHelper, thirdTrigger, parameters = constructParams(List(worker0, worker1)))
createTrigger(assetHelper, fourthTrigger, parameters = constructParams(List(worker1)))
val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
validateTriggerAssignment(documents, firstTrigger, worker1)
validateTriggerAssignment(documents, secondTrigger, worker1)
validateTriggerAssignment(documents, thirdTrigger, worker0)
validateTriggerAssignment(documents, fourthTrigger, worker1)
}
it should "balance the load accross workers when a worker is added" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
val thirdTrigger = s"thirdTrigger-${System.currentTimeMillis()}"
val fourthTrigger = s"fourthTrigger-${System.currentTimeMillis()}"
val fifthTrigger = s"fifthTrigger-${System.currentTimeMillis()}"
val sixthTrigger = s"sixthTrigger-${System.currentTimeMillis()}"
val worker0 = s"worker${System.currentTimeMillis()}"
val worker1 = s"worker${System.currentTimeMillis()}"
val parameters = constructParams(List(worker0))
val updatedParameters = constructParams(List(worker0, worker1))
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
createTrigger(assetHelper, thirdTrigger, updatedParameters)
createTrigger(assetHelper, fourthTrigger, updatedParameters)
createTrigger(assetHelper, fifthTrigger, updatedParameters)
createTrigger(assetHelper, sixthTrigger, updatedParameters)
val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
validateTriggerAssignment(documents, firstTrigger, worker0)
validateTriggerAssignment(documents, secondTrigger, worker0)
validateTriggerAssignment(documents, thirdTrigger, worker1)
validateTriggerAssignment(documents, fourthTrigger, worker1)
validateTriggerAssignment(documents, fifthTrigger, worker0)
validateTriggerAssignment(documents, sixthTrigger, worker1)
}
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) =>
trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
}
withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
activation =>
// should be successful
activation.response.success shouldBe true
}
}
def constructParams(workers: List[String]) = {
Map(
"user" -> kafkaUtils.getAsJson("user"),
"password" -> kafkaUtils.getAsJson("password"),
"api_key" -> kafkaUtils.getAsJson("api_key"),
"kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
"kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
"topic" -> topic.toJson,
"workers" -> workers.toJson
)
}
def validateTriggerAssignment(documents: List[JsObject], trigger: String, worker: String) = {
val doc = documents.filter(_.fields("id").convertTo[String].contains(trigger))
JsHelpers.getFieldPath(doc(0), "doc", "worker") shouldBe Some(JsString(worker))
}
}