produce actions should handle binary keys and values (#140)

* Move messagesInActivation into KafkaUtils

* produce actions should handle binary keys and values

The idea is that, in order to use this action with binary data for message keys and values, the action invoker must first base64 encode the binary data. After all, this binary data must pass through a REST call in order to get to the produce action in the first place. However, in order to get the actual binary data produced in their message, the action must first decode that data back into the original bytes. To do that, the action invoker must use the isBinaryKey and/or isBinaryValue parameters set to true, as needed.

* Rename and clean up basic health test

Rename to BasicHealthTest to be more clear on its intended purpose
Move binary feed related test out to MessageHubFeedTests as this is too detailed to be part of the BasicHealthTest
diff --git a/README.md b/README.md
index 13cb9b3..7447bc2 100644
--- a/README.md
+++ b/README.md
@@ -192,7 +192,7 @@
 You will notice that the trigger payload contains an array of messages. This means that if you are producing messages to your messaging system very quickly, the feed will attempt to batch up the posted messages into a single firing of your trigger. This allows the messages to be posted to your trigger more rapidly and efficiently.
 
 Please keep in mind when coding actions that are fired by your trigger, that the number of messages in the payload is technically unbounded, but will always be greater than 0. Here is an example of a batched message (please note the change in the *offset* value):
- 
+
  ```json
  {
    "messages": [
@@ -238,6 +238,8 @@
 |topic|String|The topic you would like the trigger to listen to|
 |value|String|The value for the message you would like to produce|
 |key|String (Optional)|The key for the message you would like to produce|
+|base64DecodeValue|Boolean (Optional - default=false)|If true, the message will be produced with a Base64 decoded version of the value parameter|
+|base64DecodeKey|Boolean (Optional - default=false)|If true, the message will be produced with a Base64 decoded version of the key parameter|
 
 While the first three parameters can be automatically bound by using `wsk package refresh`, here is an example of invoking the action with all required parameters:
 
@@ -254,6 +256,8 @@
 |topic|String|The topic you would like the trigger to listen to|
 |value|String|The value for the message you would like to produce|
 |key|String (Optional)|The key for the message you would like to produce|
+|base64DecodeValue|Boolean (Optional - default=false)|If true, the message will be produced with a Base64 decoded version of the value parameter|
+|base64DecodeKey|Boolean (Optional - default=false)|If true, the message will be produced with a Base64 decoded version of the key parameter|
 
 Here is an example of invoking the action with all required parameters:
 
@@ -261,14 +265,16 @@
 wsk action invoke /messaging/kafkaProduce -p brokers "[\"mykafkahost:9092\", \"mykafkahost:9093\"]" -p topic mytopic -p value "This is the content of my message"
 ```
 
-## Examples
+## Producing Messages with Binary Content
+You may find that you want to use one of the above actions to produce a message that has a key and/or value that is binary data. The problem is that invoking an OpenWhisk action inherently involves a REST call to the OpenWhisk server, which may require any binary parameter values of the action invocation to be Base64 encoded. How to handle this?
 
-### Producing messages to a Message Hub or generic Kafka instances using DIY action
-Before the time when OpenWhisk had built-in action to publish messages to and from Message Hub and Kafka developers used to accomplish this functionality using custom built actions using any of the OpenWhisk supported programming languages. This is still possible today and if, for some reason you are not satisfied with OpenWhisk provided Message Hub and Kafka feed and publish action, you can still develop and use your own. While creating such action, you can use Message Hub and Kafka Native API or Confluence REST API for Kafka. It is transparent to OpenWhisk which you use, just like with any other 3rd party API or library you decide to utilize in your OpenWhisk action. This [sample project](https://github.com/IBM/ogs-data-processing-messagehub) demonstrates the use of OpenWhisk for sending and receiving IBM Message Hub and Kafka messages. 
+The action caller (you, or your code) must first Base64 encode the data, for example, the value of the message you want to produce. Pass this encoded data as the `value` parameter to the produce action. However, to ensure that the produced message's value contains the original bytes, you must also set the `base64DecodeValue` parameter to `true`. This will cause the produce action to first Base64 decode the `value` parameter before producing the message. The same procedure applies to producing messages with a binary key, using the `base64DecodeKey` parameter set to `true` in conjunction with a Base64 encoded `key` parameter.
+
+## Examples
 
 ### Integrating OpenWhisk with IBM Message Hub, Node Red, IBM Watson IoT, IBM Object Storage, and IBM Data Science Experience
 Example that integrates OpenWhisk with IBM Message Hub, Node Red, IBM Watson IoT, IBM Object Storage, IBM Data Science Experience (Spark) service can be [found here](https://medium.com/openwhisk/transit-flexible-pipeline-for-iot-data-with-bluemix-and-openwhisk-4824cf20f1e0).
- 
+
 ## Testing
 An OpenWhisk deployment is required in order for the automated test suite to be executed. To run tests locally, run ```$ ./gradlew tests:test -Dhost=<container_address> -Dport=<docker_port>``` from the project's root directory. Where ```<docker_address>``` is the IP address of the service's docker container, and ```<docker_port>``` is the port the docker container is listening on. In addition, ```OPENWHISK_HOME``` must be set to the root of the local OpenWhisk directory. Ex: ```export OPENWHISK_HOME=<openwhisk_directory>```.
 
diff --git a/action/kafkaProduce.py b/action/kafkaProduce.py
index 03ae5fc..b8ac804 100644
--- a/action/kafkaProduce.py
+++ b/action/kafkaProduce.py
@@ -6,6 +6,8 @@
     validationResult = validateParams(params)
     if validationResult[0] != True:
         return {'error': validationResult[1]}
+    else:
+        validatedParams = validationResult[1]
 
     brokers = params['brokers']
 
@@ -17,10 +19,11 @@
         print "Created producer"
 
         # only use the key parameter if it is present
-        if 'key' in params:
-            producer.send(params['topic'], bytes(params['value']), key=bytes(params['key']))
+        if 'key' in validatedParams:
+            messageKey = validatedParams['key']
+            producer.send(validatedParams['topic'], bytes(validatedParams['value']), key=bytes(messageKey))
         else:
-            producer.send(params['topic'], bytes(params['value'])
+            producer.send(validatedParams['topic'], bytes(validatedParams['value']))
 
         producer.flush()
 
@@ -34,6 +37,7 @@
     return {"success": True}
 
 def validateParams(params):
+    validatedParams = params.copy()
     requiredParams = ['brokers', 'topic', 'value']
     actualParams = params.keys()
 
@@ -45,5 +49,21 @@
 
     if len(missingParams) > 0:
         return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
-    else:
-        return (True, "Params all check out.")
+
+    if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
+        decodedValue = params['value'].decode('base64').strip()
+        if len(decodedValue) == 0:
+            return (False, "value parameter is not Base64 encoded")
+        else:
+            # make use of the decoded value so we don't have to decode it again later
+            validatedParams['value'] = decodedValue
+
+    if 'base64DecodeKey' in params and params['base64DecodeKey'] == True:
+        decodedKey = params['key'].decode('base64').strip()
+        if len(decodedKey) == 0:
+            return (False, "key parameter is not Base64 encoded")
+        else:
+            # make use of the decoded key so we don't have to decode it again later
+            validatedParams['key'] = decodedKey
+
+    return (True, validatedParams)
diff --git a/action/messageHubProduce.py b/action/messageHubProduce.py
index e01a5ec..15934b5 100755
--- a/action/messageHubProduce.py
+++ b/action/messageHubProduce.py
@@ -6,6 +6,8 @@
     validationResult = validateParams(params)
     if validationResult[0] != True:
         return {'error': validationResult[1]}
+    else:
+        validatedParams = validationResult[1]
 
     sasl_mechanism = 'PLAIN'
     security_protocol = 'SASL_SSL'
@@ -18,9 +20,9 @@
     try:
         producer = KafkaProducer(
             api_version_auto_timeout_ms=15000,
-            bootstrap_servers=params['kafka_brokers_sasl'],
-            sasl_plain_username=params['user'],
-            sasl_plain_password=params['password'],
+            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
+            sasl_plain_username=validatedParams['user'],
+            sasl_plain_password=validatedParams['password'],
             security_protocol=security_protocol,
             ssl_context=context,
             sasl_mechanism=sasl_mechanism)
@@ -28,10 +30,11 @@
         print "Created producer"
 
         # only use the key parameter if it is present
-        if 'key' in params:
-            producer.send(params['topic'], bytes(params['value']), key=bytes(params['key']))
+        if 'key' in validatedParams:
+            messageKey = validatedParams['key']
+            producer.send(validatedParams['topic'], bytes(validatedParams['value']), key=bytes(messageKey))
         else:
-            producer.send(params['topic'], bytes(params['value']))
+            producer.send(validatedParams['topic'], bytes(validatedParams['value']))
 
         producer.flush()
 
@@ -45,6 +48,8 @@
     return {"success": True}
 
 def validateParams(params):
+    validatedParams = params.copy()
+
     requiredParams = ['kafka_brokers_sasl', 'user', 'password', 'topic', 'value']
     actualParams = params.keys()
 
@@ -56,5 +61,21 @@
 
     if len(missingParams) > 0:
         return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
-    else:
-        return (True, "Params all check out.")
+
+    if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
+        decodedValue = params['value'].decode('base64').strip()
+        if len(decodedValue) == 0:
+            return (False, "value parameter is not Base64 encoded")
+        else:
+            # make use of the decoded value so we don't have to decode it again later
+            validatedParams['value'] = decodedValue
+
+    if 'base64DecodeKey' in params and params['base64DecodeKey'] == True:
+        decodedKey = params['key'].decode('base64').strip()
+        if len(decodedKey) == 0:
+            return (False, "key parameter is not Base64 encoded")
+        else:
+            # make use of the decoded key so we don't have to decode it again later
+            validatedParams['key'] = decodedKey
+
+    return (True, validatedParams)
diff --git a/installCatalog.sh b/installCatalog.sh
index e0b8fbf..e7e61cc 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -60,5 +60,5 @@
 $WSK_CLI -i --apihost "$EDGEHOST" action update messaging/messageHubProduce "$PACKAGE_HOME/action/messageHubProduce.py" \
     --auth "$AUTH" \
     -a description 'Produce a message to Message Hub' \
-    -a parameters '[ {"name":"kafka_brokers_sasl", "required":true, "description": "Array of Message Hub brokers"},{"name":"user", "required":true, "description": "Message Hub username"},{"name":"password", "required":true, "description": "Message Hub password", "type":"password"},{"name":"topic", "required":true, "description": "Topic that you wish to produce a message to"},{"name":"value", "required":true, "description": "The value for the message you want to produce"},{"name":"key", "required":false, "description": "The key for the message you want to produce"}]' \
+    -a parameters '[ {"name":"kafka_brokers_sasl", "required":true, "description": "Array of Message Hub brokers"},{"name":"user", "required":true, "description": "Message Hub username"},{"name":"password", "required":true, "description": "Message Hub password", "type":"password"},{"name":"topic", "required":true, "description": "Topic that you wish to produce a message to"},{"name":"value", "required":true, "description": "The value for the message you want to produce"},{"name":"key", "required":false, "description": "The key for the message you want to produce"},{"name":"base64DecodeValue", "required":false, "description": "If true, the message will be produced with a Base64 decoded version of the value parameter"},{"name":"base64DecodeKey", "required":false, "description": "If true, the message will be produced with a Base64 decoded version of the key parameter"}]' \
     -a sampleInput '{"kafka_brokers_sasl":"[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\"]", "username":"someUsername", "password":"somePassword", "topic":"mytopic", "value": "This is my message"}'
diff --git a/installKafka.sh b/installKafka.sh
index 075ae25..b3105dc 100644
--- a/installKafka.sh
+++ b/installKafka.sh
@@ -51,5 +51,5 @@
 $WSK_CLI -i --apihost "$EDGEHOST" action update messaging/kafkaProduce "$PACKAGE_HOME/action/kafkaProduce.py" \
     --auth "$AUTH" \
     -a description 'Produce a message to a Kafka cluster' \
-    -a parameters '[ {"name":"brokers", "required":true, "description": "Array of Kafka brokers"},{"name":"topic", "required":true, "description": "Topic that you want to produce a message to"},{"name":"value", "required":true, "description": "The value for the message you want to produce"},{"name":"key", "required":false, "description": "The key for the message you want to produce"}]' \
+    -a parameters '[ {"name":"brokers", "required":true, "description": "Array of Kafka brokers"},{"name":"topic", "required":true, "description": "Topic that you want to produce a message to"},{"name":"value", "required":true, "description": "The value for the message you want to produce"},{"name":"key", "required":false, "description": "The key for the message you want to produce"},{"name":"base64DecodeValue", "required":false, "description": "If true, the message will be produced with a Base64 decoded version of the value parameter"},{"name":"base64DecodeKey", "required":false, "description": "If true, the message will be produced with a Base64 decoded version of the key parameter"}]' \
     -a sampleInput '{"brokers":"[\"127.0.0.1:9093\"]", "topic":"mytopic", "value": "This is my message"}'
diff --git a/tests/src/system/health/BasicHealthTest.scala b/tests/src/system/health/BasicHealthTest.scala
new file mode 100644
index 0000000..532df4c
--- /dev/null
+++ b/tests/src/system/health/BasicHealthTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package system.health
+
+import system.utils.KafkaUtils
+
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.JsHelpers
+import common.TestHelpers
+import common.Wsk
+import common.WskActorSystem
+import common.WskProps
+import common.WskTestHelpers
+import spray.json.DefaultJsonProtocol._
+import spray.json.pimpAny
+
+
+@RunWith(classOf[JUnitRunner])
+class BasicHealthTest
+    extends FlatSpec
+    with Matchers
+    with WskActorSystem
+    with BeforeAndAfterAll
+    with TestHelpers
+    with WskTestHelpers
+    with JsHelpers {
+
+    val topic = "test"
+    val sessionTimeout = 10 seconds
+
+    implicit val wskprops = WskProps()
+    val wsk = new Wsk()
+
+    val messagingPackage = "/whisk.system/messaging"
+    val messageHubFeed = "messageHubFeed"
+    val messageHubProduce = "messageHubProduce"
+
+    val kafkaUtils = new KafkaUtils
+
+    behavior of "Message Hub feed"
+
+    it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
+        val currentTime = s"${System.currentTimeMillis}"
+
+        (wp, assetHelper) =>
+            val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+                (trigger, _) =>
+                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+                        "user" -> kafkaUtils.getAsJson("user"),
+                        "password" -> kafkaUtils.getAsJson("password"),
+                        "api_key" -> kafkaUtils.getAsJson("api_key"),
+                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                        "topic" -> topic.toJson))
+            }
+
+            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+                activation =>
+                    // should be successful
+                    activation.response.success shouldBe true
+            }
+
+            // It takes a moment for the consumer to fully initialize. We choose 4 seconds
+            // as a temporary length of time to wait for.
+            Thread.sleep(4000)
+
+            // key to use for the produced message
+            val key = "TheKey"
+
+            withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+                "user" -> kafkaUtils.getAsJson("user"),
+                "password" -> kafkaUtils.getAsJson("password"),
+                "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                "topic" -> topic.toJson,
+                "key" -> key.toJson,
+                "value" -> currentTime.toJson))) {
+                    _.response.success shouldBe true
+                }
+
+            println("Polling for activations")
+            val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
+            assert(activations.length > 0)
+
+            val matchingActivations = for {
+                id <- activations
+                activation = wsk.activation.waitForActivation(id)
+                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
+            } yield activation.right.get
+
+            assert(matchingActivations.length == 1)
+
+            val activation = matchingActivations.head
+            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+
+            // assert that there exists a message in the activation which has the expected keys and values
+            val messages = KafkaUtils.messagesInActivation(activation, field="value", value=currentTime)
+            assert(messages.length == 1)
+
+            val message = messages.head
+            message.getFieldPath("topic") shouldBe Some(topic.toJson)
+            message.getFieldPath("key") shouldBe Some(key.toJson)
+    }
+}
diff --git a/tests/src/system/health/MessagingFeedTests.scala b/tests/src/system/health/MessagingFeedTests.scala
deleted file mode 100644
index 7bdcdcc..0000000
--- a/tests/src/system/health/MessagingFeedTests.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package system.health
-
-import system.utils.KafkaUtils
-
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import common.JsHelpers
-import common.TestHelpers
-import common.Wsk
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
-import spray.json.DefaultJsonProtocol._
-import spray.json.JsArray
-import spray.json.JsObject
-import spray.json.JsBoolean
-import spray.json.pimpAny
-
-import sun.misc.BASE64Encoder
-
-@RunWith(classOf[JUnitRunner])
-class MessagingFeedTests
-    extends FlatSpec
-    with Matchers
-    with WskActorSystem
-    with BeforeAndAfterAll
-    with TestHelpers
-    with WskTestHelpers
-    with JsHelpers {
-
-    val topic = "test"
-    val sessionTimeout = 10 seconds
-
-    implicit val wskprops = WskProps()
-    val wsk = new Wsk()
-
-    val messagingPackage = "/whisk.system/messaging"
-    val messageHubFeed = "messageHubFeed"
-    val messageHubProduce = "messageHubProduce"
-
-    val kafkaUtils = new KafkaUtils
-
-    behavior of "Message Hub feed"
-
-    it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) {
-        val currentTime = s"${System.currentTimeMillis}"
-
-        (wp, assetHelper) =>
-            val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson))
-            }
-
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
-            }
-
-            // It takes a moment for the consumer to fully initialize. We choose 4 seconds
-            // as a temporary length of time to wait for.
-            Thread.sleep(4000)
-
-            // key to use for the produced message
-            val key = "TheKey"
-
-            withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-                "user" -> kafkaUtils.getAsJson("user"),
-                "password" -> kafkaUtils.getAsJson("password"),
-                "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                "topic" -> topic.toJson,
-                "key" -> key.toJson,
-                "value" -> currentTime.toJson))) {
-                    _.response.success shouldBe true
-                }
-
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
-            assert(activations.length > 0)
-
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(currentTime))
-            } yield activation.right.get
-
-            assert(matchingActivations.length == 1)
-
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-            // assert that there exists a message in the activation which has the expected keys and values
-            val messages = messagesInActivation(activation, withMessageValue = currentTime)
-            assert(messages.length == 1)
-
-            val message = messages.head
-            message.getFieldPath("topic") shouldBe Some(topic.toJson)
-            message.getFieldPath("key") shouldBe Some(key.toJson)
-    }
-
-    it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) {
-        val currentTime = s"${System.currentTimeMillis}"
-
-        (wp, assetHelper) =>
-            val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson,
-                        "isBinaryKey" -> JsBoolean(true),
-                        "isBinaryValue" -> JsBoolean(true)))
-            }
-
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
-            }
-
-            // It takes a moment for the consumer to fully initialize. We choose 4 seconds
-            // as a temporary length of time to wait for.
-            Thread.sleep(4000)
-
-            // key to use for the produced message
-            val key = "TheKey"
-            val encodedCurrentTime = new BASE64Encoder().encode(currentTime.getBytes("utf-8"))
-            val encodedKey = new BASE64Encoder().encode(key.getBytes("utf-8"))
-
-            withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-                "user" -> kafkaUtils.getAsJson("user"),
-                "password" -> kafkaUtils.getAsJson("password"),
-                "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-                "topic" -> topic.toJson,
-                "key" -> key.toJson,
-                "value" -> currentTime.toJson))) {
-                _.response.success shouldBe true
-            }
-
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
-            assert(activations.length > 0)
-
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-            } yield activation.right.get
-
-
-            assert(matchingActivations.length == 1)
-
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
-
-            // assert that there exists a message in the activation which has the expected keys and values
-            val messages = messagesInActivation(activation, withMessageValue = encodedCurrentTime)
-            assert(messages.length == 1)
-
-            val message = messages.head
-            message.getFieldPath("topic") shouldBe Some(topic.toJson)
-            message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
-    }
-
-    def messagesInActivation(activation : JsObject, withMessageValue: String) : Array[JsObject] = {
-        val messages = activation.getFieldPath("response", "result", "messages").getOrElse(JsArray.empty).convertTo[Array[JsObject]]
-        messages.filter {
-            _.getFieldPath("value") == Some(withMessageValue.toJson)
-        }
-    }
-}
diff --git a/tests/src/system/packages/MessageHubFeedTests.scala b/tests/src/system/packages/MessageHubFeedTests.scala
index 1561dfe..53f5c9e 100644
--- a/tests/src/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/system/packages/MessageHubFeedTests.scala
@@ -15,12 +15,21 @@
  */
 package system.packages
 
+import system.utils.KafkaUtils
+
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-import spray.json._
+
+import spray.json.DefaultJsonProtocol._
+import spray.json.JsObject
+import spray.json.JsString
+import spray.json.pimpAny
 
 import common.JsHelpers
 import common.TestHelpers
@@ -30,6 +39,9 @@
 import common.WskTestHelpers
 import ActionHelper._
 
+import java.util.Base64
+import java.nio.charset.StandardCharsets
+
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
   extends FlatSpec
@@ -40,6 +52,15 @@
     with WskTestHelpers
     with JsHelpers {
 
+  val topic = "test"
+  val sessionTimeout = 10 seconds
+
+  val messagingPackage = "/whisk.system/messaging"
+  val messageHubFeed = "messageHubFeed"
+  val messageHubProduce = "messageHubProduce"
+
+  val kafkaUtils = new KafkaUtils
+
   implicit val wskprops = WskProps()
   val wsk = new Wsk()
   val actionName = "messageHubFeedAction"
@@ -113,4 +134,71 @@
     runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
   }
 
+  it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) {
+      val currentTime = s"${System.currentTimeMillis}"
+
+      (wp, assetHelper) =>
+          val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+          val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+              (trigger, _) =>
+                  trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+                      "user" -> kafkaUtils.getAsJson("user"),
+                      "password" -> kafkaUtils.getAsJson("password"),
+                      "api_key" -> kafkaUtils.getAsJson("api_key"),
+                      "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+                      "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                      "topic" -> topic.toJson,
+                      "isBinaryKey" -> true.toJson,
+                      "isBinaryValue" -> true.toJson))
+          }
+
+          withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+              activation =>
+                  // should be successful
+                  activation.response.success shouldBe true
+          }
+
+          // It takes a moment for the consumer to fully initialize. We choose 4 seconds
+          // as a temporary length of time to wait for.
+          Thread.sleep(4000)
+
+          // key to use for the produced message
+          val key = "TheKey"
+          val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8))
+          val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8))
+
+          withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+              "user" -> kafkaUtils.getAsJson("user"),
+              "password" -> kafkaUtils.getAsJson("password"),
+              "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+              "topic" -> topic.toJson,
+              "key" -> key.toJson,
+              "value" -> currentTime.toJson))) {
+              _.response.success shouldBe true
+          }
+
+          println("Polling for activations")
+          val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
+          assert(activations.length > 0)
+
+          val matchingActivations = for {
+              id <- activations
+              activation = wsk.activation.waitForActivation(id)
+              if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+          } yield activation.right.get
+
+
+          assert(matchingActivations.length == 1)
+
+          val activation = matchingActivations.head
+          activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+
+          // assert that there exists a message in the activation which has the expected keys and values
+          val messages = KafkaUtils.messagesInActivation(activation, field="value", value=encodedCurrentTime)
+          assert(messages.length == 1)
+
+          val message = messages.head
+          message.getFieldPath("topic") shouldBe Some(topic.toJson)
+          message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+  }
 }
diff --git a/tests/src/system/packages/MessageHubProduceTests.scala b/tests/src/system/packages/MessageHubProduceTests.scala
index 5828624..ee2a86b 100644
--- a/tests/src/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/system/packages/MessageHubProduceTests.scala
@@ -36,6 +36,9 @@
 import spray.json.DefaultJsonProtocol._
 import spray.json.pimpAny
 
+import java.util.Base64
+import java.nio.charset.StandardCharsets
+
 @RunWith(classOf[JUnitRunner])
 class MessageHubProduceTests
     extends FlatSpec
@@ -53,6 +56,7 @@
     val wsk = new Wsk()
 
     val messagingPackage = "/whisk.system/messaging"
+    val messageHubFeed = "messageHubFeed"
     val messageHubProduce = "messageHubProduce"
 
     val kafkaUtils = new KafkaUtils
@@ -116,4 +120,132 @@
                 activation.response.result.get.toString should include("No brokers available")
         }
     }
+
+    it should "Reject trying to decode a non-base64 key" in {
+        val badKeyParams = validParameters + ("key" -> "?".toJson) + ("base64DecodeKey" -> true.toJson)
+
+        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badKeyParams)) {
+            activation =>
+                activation.response.success shouldBe false
+                activation.response.result.get.toString should include("key parameter is not Base64 encoded")
+        }
+    }
+
+    it should "Reject trying to decode a non-base64 value" in {
+        val badValueParams = validParameters + ("value" -> "?".toJson) + ("base64DecodeValue" -> true.toJson)
+
+        withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", badValueParams)) {
+            activation =>
+                activation.response.success shouldBe false
+                activation.response.result.get.toString should include("value parameter is not Base64 encoded")
+        }
+    }
+
+    it should "Post a message with a binary value" in withAssetCleaner(wskprops) {
+        // create trigger
+        val currentTime = s"${System.currentTimeMillis}"
+
+        (wp, assetHelper) =>
+            val triggerName = s"/_/binaryValueTrigger-$currentTime"
+            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+                (trigger, _) =>
+                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+                        "user" -> kafkaUtils.getAsJson("user"),
+                        "password" -> kafkaUtils.getAsJson("password"),
+                        "api_key" -> kafkaUtils.getAsJson("api_key"),
+                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                        "topic" -> topic.toJson))
+            }
+
+            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+                activation =>
+                    // should be successful
+                    activation.response.success shouldBe true
+            }
+
+            // produce message
+            val decodedMessage = "This will be base64 encoded"
+            val encodedMessage = Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
+            val base64ValueParams = validParameters + ("base64DecodeValue" -> true.toJson) + ("value" -> encodedMessage.toJson)
+
+            withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
+                activation =>
+                    activation.response.success shouldBe true
+            }
+
+            // verify trigger fired
+            println("Polling for activations")
+            val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
+            assert(activations.length > 0)
+
+            val matchingActivations = for {
+                id <- activations
+                activation = wsk.activation.waitForActivation(id)
+                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedMessage))
+            } yield activation.right.get
+
+            assert(matchingActivations.length == 1)
+
+            val activation = matchingActivations.head
+            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+
+            // assert that there exists a message in the activation which has the expected keys and values
+            val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = decodedMessage)
+            assert(messages.length == 1)
+    }
+
+    it should "Post a message with a binary key" in withAssetCleaner(wskprops) {
+        // create trigger
+        val currentTime = s"${System.currentTimeMillis}"
+
+        (wp, assetHelper) =>
+            val triggerName = s"/_/binaryKeyTrigger-$currentTime"
+            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+                (trigger, _) =>
+                    trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
+                        "user" -> kafkaUtils.getAsJson("user"),
+                        "password" -> kafkaUtils.getAsJson("password"),
+                        "api_key" -> kafkaUtils.getAsJson("api_key"),
+                        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+                        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                        "topic" -> topic.toJson))
+            }
+
+            withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+                activation =>
+                    // should be successful
+                    activation.response.success shouldBe true
+            }
+
+            // produce message
+            val decodedKey = "This will be base64 encoded"
+            val encodedKey = Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
+            val base64ValueParams = validParameters + ("base64DecodeKey" -> true.toJson) + ("key" -> encodedKey.toJson)
+
+            withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
+                activation =>
+                    activation.response.success shouldBe true
+            }
+
+            // verify trigger fired
+            println("Polling for activations")
+            val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = 30)
+            assert(activations.length > 0)
+
+            val matchingActivations = for {
+                id <- activations
+                activation = wsk.activation.waitForActivation(id)
+                if (activation.isRight && activation.right.get.fields.get("response").toString.contains(decodedKey))
+            } yield activation.right.get
+
+            assert(matchingActivations.length == 1)
+
+            val activation = matchingActivations.head
+            activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+
+            // assert that there exists a message in the activation which has the expected keys and values
+            val messages = KafkaUtils.messagesInActivation(activation, field = "key", value = decodedKey)
+            assert(messages.length == 1)
+    }
 }
diff --git a/tests/src/system/utils/KafkaUtils.scala b/tests/src/system/utils/KafkaUtils.scala
index 30d21a7..a4a5da6 100644
--- a/tests/src/system/utils/KafkaUtils.scala
+++ b/tests/src/system/utils/KafkaUtils.scala
@@ -28,8 +28,12 @@
 import scala.collection.mutable.ListBuffer
 
 import spray.json.DefaultJsonProtocol._
+import spray.json.JsObject
+import spray.json.JsArray
 import spray.json.pimpAny
 
+import whisk.utils.JsHelpers
+
 
 class KafkaUtils {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
@@ -77,6 +81,13 @@
         kafkaProducerProps
     }
 
+    def messagesInActivation(activation : JsObject, field: String, value: String) : Array[JsObject] = {
+        val messages = JsHelpers.getFieldPath(activation, "response", "result", "messages").getOrElse(JsArray.empty).convertTo[Array[JsObject]]
+        messages.filter {
+            JsHelpers.getFieldPath(_, field) == Some(value.toJson)
+        }
+    }
+
     private def initializeMessageHub() = {
         // get the vcap stuff
         var credentials = TestUtils.getCredentials("message_hub")