Update MessageHub parameter validation (#293)
* Update MessageHub parameter validation
* Review refactor
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 66114a6..435d8fd 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -179,6 +179,26 @@
}
}
+ validatedParams.isMessageHub = true;
+
+ return validateMessageHubParameters(rawParams.__bx_creds && rawParams.__bx_creds.messagehub ? rawParams.__bx_creds.messagehub : rawParams)
+ .then(p => {
+ validatedParams = Object.assign(validatedParams, p)
+ resolve(validatedParams)
+ })
+ .catch(error => {
+ reject(error);
+ return;
+ })
+ });
+
+ return promise;
+}
+
+function validateMessageHubParameters(rawParams) {
+ var promise = new Promise((resolve, reject) => {
+ var validatedParams = {};
+
// kafka_brokers_sasl
if (rawParams.kafka_brokers_sasl) {
validatedParams.brokers = common.validateBrokerParam(rawParams.kafka_brokers_sasl);
@@ -215,8 +235,6 @@
return;
}
- validatedParams.isMessageHub = true;
-
resolve(validatedParams);
});
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d970d4b..04ed5c9 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -468,6 +468,59 @@
retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
}
+ it should "create a trigger with __bx_creds and fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ val key = "TheKey"
+ val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+ println(s"Creating trigger $triggerName")
+
+ createTrigger(assetHelper, triggerName, parameters = Map(
+ "__bx_creds" -> Map(
+ "messagehub" -> Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "api_key" -> kafkaUtils.getAsJson("api_key"),
+ "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+ "topic" -> topic.toJson
+ ))
+
+ val defaultAction1 = Some("dat/createTriggerActions.js")
+ val defaultActionName = s"helloKafka-${currentTime}"
+
+ assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+ action.create(name, defaultAction1)
+ }
+ assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+ rule.create(name, trigger = triggerName, action = defaultActionName)
+ }
+
+ val verificationName1 = s"trigger1-$currentTime"
+
+ assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(consumerInitTime)
+
+ println("Producing a message")
+ withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+ "user" -> kafkaUtils.getAsJson("user"),
+ "password" -> kafkaUtils.getAsJson("password"),
+ "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+ "topic" -> topic.toJson,
+ "key" -> key.toJson,
+ "value" -> verificationName1.toJson
+ ))) {
+ _.response.success shouldBe true
+ }
+
+ retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+ }
+
def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
(trigger, _) =>