Update MessageHub parameter validation (#293)

* Update MessageHub parameter validation

* Review refactor
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 66114a6..435d8fd 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -179,6 +179,26 @@
             }
         }
 
+        validatedParams.isMessageHub = true;
+
+        return validateMessageHubParameters(rawParams.__bx_creds && rawParams.__bx_creds.messagehub ? rawParams.__bx_creds.messagehub : rawParams)
+        .then(p => {
+            validatedParams = Object.assign(validatedParams, p)
+            resolve(validatedParams)
+        })
+        .catch(error => {
+            reject(error);
+            return;
+        })
+    });
+
+    return promise;
+}
+
+function validateMessageHubParameters(rawParams) {
+    var promise = new Promise((resolve, reject) => {
+        var validatedParams = {};
+
         // kafka_brokers_sasl
         if (rawParams.kafka_brokers_sasl) {
             validatedParams.brokers = common.validateBrokerParam(rawParams.kafka_brokers_sasl);
@@ -215,8 +235,6 @@
             return;
         }
 
-        validatedParams.isMessageHub = true;
-
         resolve(validatedParams);
     });
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d970d4b..04ed5c9 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -468,6 +468,59 @@
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
+  it should "create a trigger with __bx_creds and fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val key = "TheKey"
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      println(s"Creating trigger $triggerName")
+
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "__bx_creds" -> Map(
+          "messagehub" -> Map(
+            "user" -> kafkaUtils.getAsJson("user"),
+            "password" -> kafkaUtils.getAsJson("password"),
+            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+        "topic" -> topic.toJson
+      ))
+
+      val defaultAction1 = Some("dat/createTriggerActions.js")
+      val defaultActionName = s"helloKafka-${currentTime}"
+
+      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+        action.create(name, defaultAction1)
+      }
+      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+        rule.create(name, trigger = triggerName, action = defaultActionName)
+      }
+
+      val verificationName1 = s"trigger1-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(consumerInitTime)
+
+      println("Producing a message")
+      withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+        "user" -> kafkaUtils.getAsJson("user"),
+        "password" -> kafkaUtils.getAsJson("password"),
+        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "topic" -> topic.toJson,
+        "key" -> key.toJson,
+        "value" -> verificationName1.toJson
+      ))) {
+        _.response.success shouldBe true
+      }
+
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+  }
+
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
     val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
       (trigger, _) =>