Implement provider redundancy (#160)

This change allows multiple instances of the feed provider to run simultaneously for the purpose of failover redundancy, and even a little load balancing.

This is accomplished by allowing the feed action to write triggers directly to the DB, without having to converse directly with the feed provider instances. The provider instances pick up trigger changes by using a CouchDB changes feed to detect new, modified, and deleted triggers.

To allow the feed action to talk directly to the DB, without exposing the DB credentials to all users, a new web action is introduced. This action can be invoked by any user, but cannot be inspected by anyone other than the action owner (typically a system admin account). The non-web feed action is still added to a shared package, allowing all users to create triggers from it, but this action only invokes the new web action over a REST call.

Even though multiple instances of the provider are running and, in fact, handling the same set of triggers, the use of kafka consumer groups ensures that each trigger is only fired once per produced message by way of the fact that Kafka guarantees that only one consumer in the group will get each produced message.
diff --git a/.gitignore b/.gitignore
index 39bb3a5..e643d4d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,6 @@
+.gradle
 launchConfigurations/
 *.pyc
+action/*.zip
+action/package.json
+tests/build
diff --git a/action/kafkaFeed.js b/action/kafkaFeed.js
index fc785c0..3cd42f7 100644
--- a/action/kafkaFeed.js
+++ b/action/kafkaFeed.js
@@ -1,143 +1,30 @@
-var request = require('request');
+const common = require('./lib/common');
 
 /**
  *   Feed to listen to Kafka messages
  *  @param {string} brokers - array of Kafka brokers
  *  @param {string} topic - topic to subscribe to
  *  @param {bool}   isJSONData - attempt to parse messages as JSON
- *  @param {bool}   isBinaryKey - encode message as Base64
- *  @param {bool}   isBinaryValue - encode key as Base64
- *  @param {string} endpoint - address to OpenWhisk deployment
+ *  @param {bool}   isBinaryKey - encode key as Base64
+ *  @param {bool}   isBinaryValue - encode message as Base64
+ *  @param {string} endpoint - address to OpenWhisk deployment (expected to be bound at deployment)
  */
 function main(params) {
-    var promise = new Promise(function(resolve, reject) {
-        if(!params.package_endpoint) {
-            reject('Could not find the package_endpoint parameter.');
-            return;
-        }
+    const endpoint = params.endpoint;
+    const webActionName = 'kafkaFeedWeb';
 
-        var triggerComponents = params.triggerName.split("/");
-        var namespace = encodeURIComponent(process.env['__OW_NAMESPACE']);
-        var trigger = encodeURIComponent(triggerComponents[2]);
-        var feedServiceURL = 'http://' + params.package_endpoint + '/triggers/' + namespace + '/' + trigger;
+    var massagedParams = common.massageParamsForWeb(params);
+    massagedParams.triggerName = common.getTriggerFQN(params.triggerName);
 
-        if (params.lifecycleEvent === 'CREATE') {
-            // makes a PUT request to create the trigger in the feed service provider
-            var putTrigger = function(validatedParams) {
-                var body = validatedParams;
+    if (params.lifecycleEvent === 'CREATE') {
+        return common.createTrigger(endpoint, massagedParams, webActionName);
+    } else if (params.lifecycleEvent === 'DELETE') {
+        return common.deleteTrigger(endpoint, massagedParams, webActionName);
+    }
 
-                // params.endpoint may already include the protocol - if so,
-                // strip it out
-                var massagedAPIHost = params.endpoint.replace(/https?:\/\/(.*)/, "$1");
-                body.triggerURL = 'https://' + process.env['__OW_API_KEY'] + "@" + massagedAPIHost + '/api/v1/namespaces/' + namespace + '/triggers/' + trigger;
-
-                var options = {
-                    method: 'PUT',
-                    url: feedServiceURL,
-                    body: JSON.stringify(body),
-                    headers: {
-                        'Content-Type': 'application/json',
-                        'User-Agent': 'whisk'
-                    }
-                };
-
-                return doRequest(options);
-            };
-
-            validateParameters(params)
-                .then(putTrigger)
-                .then(resolve)
-                .catch(reject);
-        } else if (params.lifecycleEvent === 'DELETE') {
-            var authorizationHeader = 'Basic ' + new Buffer(process.env['__OW_API_KEY']).toString('base64');
-
-            var options = {
-                method: 'DELETE',
-                url: feedServiceURL,
-                headers: {
-                    'Content-Type': 'application/json',
-                    'Authorization': authorizationHeader,
-                    'User-Agent': 'whisk'
-                }
-            };
-
-            doRequest(options)
-                .then(resolve)
-                .catch(reject);
-        }
-    });
-
-    return promise;
+    return {
+        error: 'unsupported lifecycleEvent'
+    };
 }
 
-function doRequest(options) {
-    var requestPromise = new Promise(function (resolve, reject) {
-        request(options, function (error, response, body) {
-            if (error) {
-                reject({
-                    response: response,
-                    error: error,
-                    body: JSON.parse(body)
-                });
-            } else {
-                console.log("Status code: " + response.statusCode);
-
-                if (response.statusCode >= 400) {
-                    console.log("Response from Kafka feed service: " + body);
-                    reject({
-                        statusCode: response.statusCode,
-                        response: JSON.parse(body)
-                    });
-                } else {
-                    resolve({
-                        response: JSON.parse(body)
-                    });
-                }
-            }
-        });
-    });
-
-    return requestPromise;
-}
-
-function validateParameters(rawParams) {
-    var promise = new Promise(function(resolve, reject) {
-        var validatedParams = {};
-
-        validatedParams.isMessageHub = false;
-        validatedParams.isJSONData = getBooleanFromArgs(rawParams, 'isJSONData');
-        validatedParams.isBinaryValue = getBooleanFromArgs(rawParams, 'isBinaryValue');
-        validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
-
-        if (validatedParams.isJSONData && validatedParams.isBinaryValue) {
-            reject('isJSONData and isBinaryValue cannot both be enabled.');
-            return;
-        }
-
-        if (rawParams.topic && rawParams.topic.length > 0) {
-            validatedParams.topic = rawParams.topic;
-        } else {
-            reject('You must supply a "topic" parameter.');
-            return;
-        }
-
-        if (isNonEmptyArray(rawParams.brokers)) {
-            validatedParams.brokers = rawParams.brokers;
-        } else {
-            reject('You must supply a "brokers" parameter as an array of Kafka brokers.');
-            return;
-        }
-
-        resolve(validatedParams);
-    });
-
-    return promise;
-}
-
-function getBooleanFromArgs(args, key) {
-  return (typeof args[key] !== 'undefined' && args[key] && (args[key] === true || args[key].toString().trim().toLowerCase() === 'true'));
-}
-
-function isNonEmptyArray(obj) {
-    return obj && Array.isArray(obj) && obj.length !== 0;
-}
+exports.main = main;
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
new file mode 100644
index 0000000..6975677
--- /dev/null
+++ b/action/kafkaFeedWeb.js
@@ -0,0 +1,115 @@
+const common = require('./lib/common');
+const Database = require('./lib/Database');
+
+/**
+ *   Feed to listen to Kafka messages
+ *  @param {string} brokers - array of Kafka brokers
+ *  @param {string} topic - topic to subscribe to
+ *  @param {bool}   isJSONData - attempt to parse messages as JSON
+ *  @param {bool}   isBinaryKey - encode key as Base64
+ *  @param {bool}   isBinaryValue - encode message as Base64
+ *  @param {string} endpoint - address to OpenWhisk deployment (expected to be bound at deployment)
+ *  @param {string} DB_URL - URL for the DB, must include authentication (expected to be bound at deployment)
+ *  @param {string} DB_NAME - DB name (expected to be bound at deployment)
+ */
+function main(params) {
+    var promise = new Promise((resolve, reject) => {
+        // hold off initializing this until definitely needed
+        var db;
+
+        if (params.__ow_method === "put") {
+            return validateParameters(params)
+                .then(validatedParams => {
+                    console.log(`VALIDATED: ${JSON.stringify(validatedParams, null, 2)}`);
+                    db = new Database(params.DB_URL, params.DB_NAME);
+
+                    var promises = [];
+
+                    // do these in parallel!
+                    promises.push(db.ensureTriggerIsUnique(validatedParams.triggerName));
+                    promises.push(common.verifyTriggerAuth(validatedParams.triggerURL));
+
+                    return Promise.all(promises)
+                        .then(result => validatedParams);
+                })
+                .then(validatedParams => db.recordTrigger(validatedParams))
+                .then(result => {
+                    console.log('successfully wrote the trigger');
+                    resolve();
+                })
+                .catch(error => {
+                    console.log(`Failed to write the trigger ${error}`);
+
+                    // defaults to potentially be overridden
+                    var statusCode = 500;
+                    var body = error.toString();
+
+                    if(error.validationError) {
+                        statusCode = 400;
+                        body = error.validationError;
+                    } else if(error.authError) {
+                        statusCode = 401;
+                        body = error.authError;
+                    }
+
+                    resolve({
+                        statusCode: statusCode,
+                        headers: {'Content-Type': 'text/plain'},
+                        body: body
+                    });
+                });
+        } else if (params.__ow_method === "delete") {
+            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+            return common.verifyTriggerAuth(triggerURL)
+                .then(() => {
+                    db = new Database(params.DB_URL, params.DB_NAME);
+                    return db.deleteTrigger(params.triggerName);
+                })
+                .then(resolve)
+                .catch(reject);
+        } else {
+            resolve({
+                statusCode: 400,
+                headers: {'Content-Type': 'text/plain'},
+                body: 'unsupported lifecycleEvent'
+            });
+        }
+    });
+
+    return promise;
+}
+
+function validateParameters(rawParams) {
+    var promise = new Promise((resolve, reject) => {
+        var validatedParams;
+
+        var commonValidationResult = common.performCommonParameterValidation(rawParams);
+        if(commonValidationResult.validationError) {
+            reject(commonValidationResult);
+            return;
+        } else {
+            validatedParams = commonValidationResult.validatedParams;
+        }
+
+        // brokers
+        if (rawParams.brokers) {
+            validatedParams.brokers = common.validateBrokerParam(rawParams.brokers);
+            if (!validatedParams.brokers) {
+                reject( { validationError: "You must supply a 'brokers' parameter as an array of Message Hub brokers." });
+                return;
+            }
+        } else {
+            reject( { validationError: "You must supply a 'brokers' parameter." });
+            return;
+        }
+
+        validatedParams.isMessageHub = false;
+
+        resolve(validatedParams);
+    });
+
+    return promise;
+}
+
+exports.main = main;
diff --git a/action/kafkaFeedWeb_package.json b/action/kafkaFeedWeb_package.json
new file mode 100644
index 0000000..78b5c57
--- /dev/null
+++ b/action/kafkaFeedWeb_package.json
@@ -0,0 +1,5 @@
+{
+  "name": "kafkaFeedWeb",
+  "version": "1.0.0",
+  "main": "kafkaFeedWeb.js"
+}
diff --git a/action/kafkaFeed_package.json b/action/kafkaFeed_package.json
new file mode 100644
index 0000000..02a1614
--- /dev/null
+++ b/action/kafkaFeed_package.json
@@ -0,0 +1,5 @@
+{
+  "name": "kafkaFeed",
+  "version": "1.0.0",
+  "main": "kafkaFeed.js"
+}
diff --git a/action/lib/Database.js b/action/lib/Database.js
new file mode 100644
index 0000000..fe8f778
--- /dev/null
+++ b/action/lib/Database.js
@@ -0,0 +1,63 @@
+// constructor for DB object - a thin, promise-loving wrapper around nano
+module.exports = function(dbURL, dbName) {
+    var nano = require('nano')(dbURL);
+    this.db = nano.db.use(dbName);
+
+    this.getTrigger = function(triggerFQN) {
+        return new Promise((resolve, reject) => {
+            this.db.get(triggerFQN, (err, result) => {
+                if(err) {
+                    reject(err);
+                } else {
+                    resolve(result);
+                }
+            });
+        });
+    };
+
+    this.ensureTriggerIsUnique = function(triggerFQN) {
+        return this.getTrigger(this.db, triggerFQN)
+            .then(result => {
+                return Promise.reject('Trigger already exists');
+            })
+            .catch(err => {
+                // turn that frown upside-down!
+                return true;
+            });
+    };
+
+    this.recordTrigger = function(params) {
+        console.log('recording trigger');
+
+        params['_id'] = params.triggerName;
+        params['status'] = {
+            'active': true,
+            'dateChanged': Math.round(new Date().getTime() / 1000)
+        };
+
+        return new Promise((resolve, reject) => {
+            this.db.insert(params, (err, result) => {
+                if(err) {
+                    reject(err);
+                } else {
+                    resolve(result);
+                }
+            });
+        });
+    };
+
+    this.deleteTrigger = function(triggerFQN) {
+        return this.getTrigger(triggerFQN)
+            .then(doc => {
+                return new Promise((resolve, reject) => {
+                    this.db.destroy(doc._id, doc._rev, (err, result) => {
+                        if(err) {
+                            reject(err);
+                        } else {
+                            resolve(result);
+                        }
+                    });
+                });
+            })
+    };
+};
diff --git a/action/lib/common.js b/action/lib/common.js
new file mode 100644
index 0000000..def07e4
--- /dev/null
+++ b/action/lib/common.js
@@ -0,0 +1,177 @@
+const request = require('request-promise');
+
+function triggerComponents(triggerName) {
+    var split = triggerName.split("/");
+
+    return {
+        namespace: split[1],
+        triggerName: split[2]
+    };
+}
+
+function getTriggerURL(authKey, endpoint, triggerName) {
+    var massagedAPIHost = endpoint.replace(/https?:\/\/(.*)/, "$1");
+
+    var components = triggerComponents(triggerName);
+    var namespace = components.namespace;
+    var trigger = components.triggerName;
+
+    var url = `https://${authKey}@${massagedAPIHost}/api/v1/namespaces/${encodeURIComponent(namespace)}/triggers/${encodeURIComponent(trigger)}`;
+
+    return url;
+}
+
+function verifyTriggerAuth(triggerURL) {
+    var options = {
+        method: 'GET',
+        url: triggerURL,
+        rejectUnauthorized: false,
+        headers: {
+            'Content-Type': 'application/json',
+            'User-Agent': 'whisk'
+        }
+    };
+
+    return request(options)
+        .catch(err => {
+            console.log(`Trigger auth error: ${JSON.stringify(err)}`);
+            return Promise.reject({ authError: 'You are not authorized for this trigger.'});
+        });
+}
+
+function validateBrokerParam(brokerParam) {
+    if (isNonEmptyArray(brokerParam)) {
+        return brokerParam;
+    } else if (typeof brokerParam === 'string') {
+        return brokerParam.split(',');
+    } else {
+        return undefined;
+    }
+}
+
+function getBooleanFromArgs(args, key) {
+    return (typeof args[key] !== 'undefined' && args[key] && (args[key] === true || args[key].toString().trim().toLowerCase() === 'true'));
+}
+
+function isNonEmptyArray(obj) {
+    return obj && Array.isArray(obj) && obj.length !== 0;
+}
+
+// Return the trigger FQN with the resolved namespace
+// This is required to avoid naming conflicts when using the default namespace "_"
+function getTriggerFQN(triggerName) {
+    var components = triggerName.split('/');
+    return `/${process.env['__OW_NAMESPACE']}/${components[2]}`;
+}
+
+function massageParamsForWeb(rawParams) {
+    var massagedParams = Object.assign({}, rawParams);
+
+    // remove these parameters as they may conflict with bound parameters of the web action
+    delete massagedParams.endpoint;
+    delete massagedParams.bluemixServiceName;
+    delete massagedParams.lifecycleEvent;
+
+    return massagedParams;
+}
+
+function getWebActionURL(endpoint, actionName) {
+    return `https://${endpoint}/api/v1/web/whisk.system/messagingWeb/${actionName}.http`;
+}
+
+function createTrigger(endpoint, params, actionName) {
+    var options = {
+        method: 'PUT',
+        url: getWebActionURL(endpoint, actionName),
+        rejectUnauthorized: false,
+        json: true,
+        body: params,
+        headers: {
+            'Content-Type': 'application/json',
+            'Accept': 'text/plain',
+            'User-Agent': 'whisk'
+        }
+    };
+
+    return request(options)
+        .then(response => {
+            // do not pass the response back to the caller, its contents are secret
+            return;
+        })
+        .catch(error => {
+            console.log(`Error creating trigger: ${JSON.stringify(error, null, 2)}`);
+            return Promise.reject(error.response.body);
+        });
+}
+
+function deleteTrigger(endpoint, params, actionName) {
+    var options = {
+        method: 'DELETE',
+        url: getWebActionURL(endpoint, actionName),
+        rejectUnauthorized: false,
+        json: true,
+        body: params,
+        headers: {
+            'Content-Type': 'application/json',
+            'Accept': 'text/plain',
+            'User-Agent': 'whisk'
+        }
+    };
+
+    return request(options)
+        .then(response => {
+            // do not pass the response back to the caller, its contents are secret
+            return;
+        }).catch(error => {
+            console.log(`Error deleting trigger: ${JSON.stringify(error, null, 2)}`);
+            return Promise.reject(error.response.body);
+        });
+}
+
+// perform parameter validation that is common to both feed actions
+function performCommonParameterValidation(rawParams) {
+    var validatedParams = {};
+
+    // topic
+    if (rawParams.topic && rawParams.topic.length > 0) {
+        validatedParams.topic = rawParams.topic;
+    } else {
+        return { validationError: "You must supply a 'topic' parameter." };
+    }
+
+    // triggerName
+    if (rawParams.triggerName) {
+        validatedParams.triggerName = rawParams.triggerName;
+    } else {
+        return { validationError: "You must supply a 'triggerName' parameter." };
+    }
+
+    validatedParams.isJSONData = getBooleanFromArgs(rawParams, 'isJSONData');
+    validatedParams.isBinaryValue = getBooleanFromArgs(rawParams, 'isBinaryValue');
+
+    if (validatedParams.isJSONData && validatedParams.isBinaryValue) {
+        return { validationError: 'isJSONData and isBinaryValue cannot both be enabled.' };
+    }
+
+    // now that everything else is valid, let's add these
+    validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
+    validatedParams.authKey = rawParams.authKey;
+    validatedParams.triggerURL = getTriggerURL(validatedParams.authKey, rawParams.endpoint, rawParams.triggerName);
+
+    const uuid = require('uuid');
+    validatedParams.uuid = uuid.v4();
+
+    return { validatedParams: validatedParams };
+}
+
+module.exports = {
+    'createTrigger': createTrigger,
+    'deleteTrigger': deleteTrigger,
+    'getBooleanFromArgs': getBooleanFromArgs,
+    'getTriggerFQN': getTriggerFQN,
+    'getTriggerURL': getTriggerURL,
+    'massageParamsForWeb': massageParamsForWeb,
+    'performCommonParameterValidation': performCommonParameterValidation,
+    'validateBrokerParam': validateBrokerParam,
+    'verifyTriggerAuth': verifyTriggerAuth,
+};
diff --git a/action/messageHubFeed.js b/action/messageHubFeed.js
index ed521d3..818520a 100644
--- a/action/messageHubFeed.js
+++ b/action/messageHubFeed.js
@@ -1,4 +1,4 @@
-var request = require('request');
+const common = require('./lib/common');
 
 /**
  *   Feed to listen to MessageHub messages
@@ -9,213 +9,24 @@
  *  @param {bool}   isJSONData - attempt to parse messages as JSON
  *  @param {bool}   isBinaryKey - encode key as Base64
  *  @param {bool}   isBinaryValue - encode message as Base64
- *  @param {string} endpoint - address to OpenWhisk deployment
+ *  @param {string} endpoint - address to OpenWhisk deployment (expected to be bound at deployment)
  */
 function main(params) {
-    var promise = new Promise(function(resolve, reject) {
-        if(!params.package_endpoint) {
-            reject('Could not find the package_endpoint parameter.');
-            return;
-        }
+    const endpoint = params.endpoint;
+    const webActionName = 'messageHubFeedWeb'
 
-        var triggerComponents = params.triggerName.split("/");
-        var namespace = encodeURIComponent(process.env['__OW_NAMESPACE']);
-        var trigger = encodeURIComponent(triggerComponents[2]);
-        var feedServiceURL = 'http://' + params.package_endpoint + '/triggers/' + namespace + '/' + trigger;
+    var massagedParams = common.massageParamsForWeb(params);
+    massagedParams.triggerName = common.getTriggerFQN(params.triggerName);
 
-        if (params.lifecycleEvent === 'CREATE') {
-            // makes a PUT request to create the trigger in the feed service provider
-            var putTrigger = function(validatedParams) {
-                var body = validatedParams;
-
-                // params.endpoint may already include the protocol - if so,
-                // strip it out
-                var massagedAPIHost = params.endpoint.replace(/https?:\/\/(.*)/, "$1");
-                body.triggerURL = 'https://' + process.env['__OW_API_KEY'] + "@" + massagedAPIHost + '/api/v1/namespaces/' + namespace + '/triggers/' + trigger;
-
-                var options = {
-                    method: 'PUT',
-                    url: feedServiceURL,
-                    body: JSON.stringify(body),
-                    headers: {
-                        'Content-Type': 'application/json',
-                        'User-Agent': 'whisk'
-                    }
-                };
-
-                return doRequest(options);
-            };
-
-            return validateParameters(params)
-                .then(checkMessageHubCredentials)
-                .then(putTrigger)
-                .then(resolve)
-                .catch(reject);
-        } else if (params.lifecycleEvent === 'DELETE') {
-            var authorizationHeader = 'Basic ' + new Buffer(process.env['__OW_API_KEY']).toString('base64');
-
-            var options = {
-                method: 'DELETE',
-                url: feedServiceURL,
-                headers: {
-                    'Content-Type': 'application/json',
-                    'Authorization': authorizationHeader,
-                    'User-Agent': 'whisk'
-                }
-            };
-
-            doRequest(options)
-                .then(resolve)
-                .catch(reject);
-        }
-    });
-
-    return promise;
-}
-
-function checkMessageHubCredentials(params) {
-    // make sure we have valid MH credentials
-    console.log("Checking Message Hub credentials...");
-
-    // listing topics seems to be the simplest way to check auth
-    var topicURL = params.kafka_admin_url + '/admin/topics';
-
-    var options = {
-        method: 'GET',
-        url: topicURL,
-        headers: {
-            'X-Auth-Token': params.username + params.password
-        }
-    };
-
-    return doRequest(options)
-        .then(function(body) {
-            console.log("Successfully authenticated with Message Hub");
-
-            var topicNames = body.response.map(function(topic) {
-                return topic.name
-            });
-
-            if(topicNames.indexOf(params.topic) < 0) {
-                return Promise.reject('Topic does not exist. You must create the topic first: ' + params.topic);
-            } else {
-                console.log("Topic exists.");
-                // return the params so they can be chained into the next function
-                return params;
-            }
-        }, function(authError) {
-            return Promise.reject('Could not authenticate with Message Hub. Please check your credentials.');
-        });
-}
-
-function doRequest(options) {
-    var requestPromise = new Promise(function (resolve, reject) {
-        request(options, function (error, response, body) {
-            if (error) {
-                reject({
-                    response: response,
-                    error: error,
-                    body: JSON.parse(body)
-                });
-            } else {
-                console.log("Status code: " + response.statusCode);
-
-                if (response.statusCode >= 400) {
-                    console.log("Response: " + JSON.stringify(body));
-                    reject({
-                        statusCode: response.statusCode,
-                        response: JSON.parse(body)
-                    });
-                } else {
-                    resolve({
-                        response: JSON.parse(body)
-                    });
-                }
-            }
-        });
-    });
-
-    return requestPromise;
-}
-
-function validateParameters(rawParams) {
-    var promise = new Promise(function(resolve, reject) {
-        var validatedParams = {};
-
-        validatedParams.isMessageHub = true;
-        validatedParams.isJSONData = getBooleanFromArgs(rawParams, 'isJSONData');
-        validatedParams.isBinaryValue = getBooleanFromArgs(rawParams, 'isBinaryValue');
-        validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
-
-        if (validatedParams.isJSONData && validatedParams.isBinaryValue) {
-            reject('isJSONData and isBinaryValue cannot both be enabled.');
-            return;
-        }
-
-        // topic
-        if (rawParams.topic && rawParams.topic.length > 0) {
-            validatedParams.topic = rawParams.topic;
-        } else {
-            reject('You must supply a "topic" parameter.');
-            return;
-        }
-
-        // kafka_brokers_sasl
-        if (rawParams.kafka_brokers_sasl) {
-            validatedParams.brokers = validateBrokerParam(rawParams.kafka_brokers_sasl);
-            if(!validatedParams.brokers) {
-                reject('You must supply a "kafka_brokers_sasl" parameter as an array of Message Hub brokers.');
-                return;
-            }
-        } else {
-            reject('You must supply a "kafka_brokers_sasl" parameter as an array of Message Hub brokers.');
-            return;
-        }
-
-        // user
-        if (rawParams.user) {
-            validatedParams.username = rawParams.user;
-        } else {
-            reject('You must supply a "user" parameter to authenticate with Message Hub.');
-            return;
-        }
-
-        // password
-        if (rawParams.password) {
-            validatedParams.password = rawParams.password;
-        } else {
-            reject('You must supply a "password" parameter to authenticate with Message Hub.');
-            return;
-        }
-
-        // kafka_admin_url
-        if(rawParams.kafka_admin_url) {
-            validatedParams.kafka_admin_url = rawParams.kafka_admin_url;
-        } else {
-            reject('You must supply a "kafka_admin_url" parameter.');
-            return;
-        }
-
-        resolve(validatedParams);
-    });
-
-    return promise;
-}
-
-function validateBrokerParam(brokerParam) {
-    if(isNonEmptyArray(brokerParam)) {
-        return brokerParam;
-    } else if (typeof brokerParam === 'string') {
-        return brokerParam.split(',');
-    } else {
-        return undefined;
+    if (params.lifecycleEvent === 'CREATE') {
+        return common.createTrigger(endpoint, massagedParams, webActionName);
+    } else if (params.lifecycleEvent === 'DELETE') {
+        return common.deleteTrigger(endpoint, massagedParams, webActionName);
     }
+
+    return {
+        error: 'unsupported lifecycleEvent'
+    };
 }
 
-function getBooleanFromArgs(args, key) {
-  return (typeof args[key] !== 'undefined' && args[key] && (args[key] === true || args[key].toString().trim().toLowerCase() === 'true'));
-}
-
-function isNonEmptyArray(obj) {
-    return obj && Array.isArray(obj) && obj.length !== 0;
-}
+exports.main = main;
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
new file mode 100644
index 0000000..2a7b4e3
--- /dev/null
+++ b/action/messageHubFeedWeb.js
@@ -0,0 +1,174 @@
+const common = require('./lib/common');
+const Database = require('./lib/Database');
+
+/**
+ *   Feed to listen to MessageHub messages
+ *  @param {string} brokers - array of Message Hub brokers
+ *  @param {string} username - Kafka username
+ *  @param {string} password - Kafka password
+ *  @param {string} topic - topic to subscribe to
+ *  @param {bool}   isJSONData - attempt to parse messages as JSON
+ *  @param {bool}   isBinaryKey - encode key as Base64
+ *  @param {bool}   isBinaryValue - encode message as Base64
+ *  @param {string} endpoint - address to OpenWhisk deployment (expected to be bound at deployment)
+ *  @param {string} DB_URL - URL for the DB, must include authentication (expected to be bound at deployment)
+ *  @param {string} DB_NAME - DB name (expected to be bound at deployment)
+ */
+function main(params) {
+    var promise = new Promise((resolve, reject) => {
+        // hold off initializing this until definitely needed
+        var db;
+
+        if (params.__ow_method === "put") {
+            return validateParameters(params)
+                .then(validatedParams => {
+                    console.log(`VALIDATED: ${JSON.stringify(validatedParams, null, 2)}`);
+                    db = new Database(params.DB_URL, params.DB_NAME);
+
+                    var promises = [];
+
+                    // do these in parallel!
+                    promises.push(db.ensureTriggerIsUnique(validatedParams.triggerName));
+                    promises.push(common.verifyTriggerAuth(validatedParams.triggerURL));
+                    promises.push(checkMessageHubCredentials(validatedParams));
+
+                    return Promise.all(promises)
+                        .then(result => validatedParams);
+                })
+                .then(validatedParams => db.recordTrigger(validatedParams))
+                .then(result => {
+                    console.log('successfully wrote the trigger');
+                    resolve();
+                })
+                .catch(error => {
+                    console.log(`Failed to write the trigger ${error}`);
+
+                    // defaults to potentially be overridden
+                    var statusCode = 500;
+                    var body = error.toString();
+
+                    if(error.validationError) {
+                        statusCode = 400;
+                        body = error.validationError;
+                    } else if(error.authError) {
+                        statusCode = 401;
+                        body = error.authError;
+                    }
+
+                    resolve({
+                        statusCode: statusCode,
+                        headers: {'Content-Type': 'text/plain'},
+                        body: body
+                    });
+                });
+        } else if (params.__ow_method === "delete") {
+            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+
+            return common.verifyTriggerAuth(triggerURL)
+                .then(() => {
+                    db = new Database(params.DB_URL, params.DB_NAME);
+                    return db.deleteTrigger(params.triggerName);
+                })
+                .then(resolve)
+                .catch(reject);
+        } else {
+            resolve({
+                statusCode: 400,
+                headers: {'Content-Type': 'text/plain'},
+                body: 'unsupported lifecycleEvent'
+            });
+        }
+    });
+
+    return promise;
+}
+
+function validateParameters(rawParams) {
+    var promise = new Promise((resolve, reject) => {
+        var validatedParams;
+
+        var commonValidationResult = common.performCommonParameterValidation(rawParams);
+        if(commonValidationResult.validationError) {
+            reject(commonValidationResult);
+            return;
+        } else {
+            validatedParams = commonValidationResult.validatedParams;
+        }
+
+        // kafka_brokers_sasl
+        if (rawParams.kafka_brokers_sasl) {
+            validatedParams.brokers = common.validateBrokerParam(rawParams.kafka_brokers_sasl);
+            if (!validatedParams.brokers) {
+                reject( { validationError: "You must supply a 'kafka_brokers_sasl' parameter as an array of Message Hub brokers." });
+                return;
+            }
+        } else {
+            reject( { validationError: "You must supply a 'kafka_brokers_sasl' parameter." });
+            return;
+        }
+
+        // user
+        if (rawParams.user) {
+            validatedParams.username = rawParams.user;
+        } else {
+            reject( { validationError: "You must supply a 'user' parameter to authenticate with Message Hub." });
+            return;
+        }
+
+        // password
+        if (rawParams.password) {
+            validatedParams.password = rawParams.password;
+        } else {
+            reject( { validationError: "You must supply a 'password' parameter to authenticate with Message Hub." });
+            return;
+        }
+
+        // kafka_admin_url
+        if (rawParams.kafka_admin_url) {
+            validatedParams.kafka_admin_url = rawParams.kafka_admin_url;
+        } else {
+            reject( { validationError: "You must supply a 'kafka_admin_url' parameter." });
+            return;
+        }
+
+        validatedParams.isMessageHub = true;
+
+        resolve(validatedParams);
+    });
+
+    return promise;
+}
+
+function checkMessageHubCredentials(params) {
+    // listing topics seems to be the simplest way to check auth
+    var topicURL = params.kafka_admin_url + '/admin/topics';
+
+    var options = {
+        method: 'GET',
+        url: topicURL,
+        json: true,
+        headers: {
+            'X-Auth-Token': params.username + params.password
+        }
+    };
+
+    const request = require('request-promise');
+
+    return request(options)
+        .then(body => {
+            console.log("Successfully authenticated with Message Hub");
+
+            var topicNames = body.map(topic => {
+                return topic.name
+            });
+
+            if (topicNames.indexOf(params.topic) < 0) {
+                return Promise.reject( 'Topic does not exist. You must create the topic first: ' + params.topic );
+            }
+        }, function (authError) {
+            console.log(`authError: ${JSON.stringify(authError)}`);
+            return Promise.reject( 'Could not authenticate with Message Hub. Please check your credentials.' );
+        });
+}
+
+exports.main = main;
diff --git a/action/messageHubFeedWeb_package.json b/action/messageHubFeedWeb_package.json
new file mode 100644
index 0000000..7735d19
--- /dev/null
+++ b/action/messageHubFeedWeb_package.json
@@ -0,0 +1,5 @@
+{
+  "name": "messageHubFeedWeb",
+  "version": "1.0.0",
+  "main": "messageHubFeedWeb.js"
+}
diff --git a/action/messageHubFeed_package.json b/action/messageHubFeed_package.json
new file mode 100644
index 0000000..a86035a
--- /dev/null
+++ b/action/messageHubFeed_package.json
@@ -0,0 +1,5 @@
+{
+  "name": "messageHubFeed",
+  "version": "1.0.0",
+  "main": "messageHubFeed.js"
+}
diff --git a/installCatalog.sh b/installCatalog.sh
index d265e13..9a459d3 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -4,8 +4,8 @@
 # automatically
 #
 # To run this command
-# ./installCatalog.sh  <AUTH> <EDGEHOST> <KAFKA_TRIGGER_HOST> <KAFKA_TRIGGER_PORT> <APIHOST>
-# AUTH and APIHOST are found in $HOME/.wskprops
+# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# authkey and apihost are found in $HOME/.wskprops
 
 set -e
 set -x
@@ -15,16 +15,15 @@
 
 if [ $# -eq 0 ]
 then
-echo "Usage: ./installCatalog.sh <authkey> <apihost> <kafkatriggerhost> <kafkatriggerport>"
+echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>"
 fi
 
 AUTH="$1"
 EDGEHOST="$2"
-KAFKA_TRIGGER_HOST="$3"
-KAFKA_TRIGGER_PORT="$4"
+DB_URL="$3"
+DB_NAME="${4}ow_kafka_triggers"
 APIHOST="$5"
 
-
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
 if [ -f "$AUTH" ]; then
@@ -34,29 +33,65 @@
 # Make sure that the APIHOST is not empty.
 : ${APIHOST:?"APIHOST must be set and non-empty"}
 
-KAFKA_PROVIDER_ENDPOINT=$KAFKA_TRIGGER_HOST':'$KAFKA_TRIGGER_PORT
-
 PACKAGE_HOME="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 
 export WSK_CONFIG_FILE= # override local property file to avoid namespace clashes
 
-echo Installing the kafka package and feed action.
+echo Installing the Message Hub package and feed action.
 
 $WSK_CLI -i --apihost "$EDGEHOST" package update messaging \
     --auth "$AUTH" \
     --shared yes \
     -a parameters '[ {"name":"kafka_brokers_sasl", "required":true, "description": "Array of Message Hub brokers", "bindTime":true},{"name":"user", "required":true, "description": "Message Hub username", "bindTime":true},{"name":"password", "required":true, "description": "Message Hub password", "bindTime":true, "type":"password"},{"name":"topic", "required":true, "description": "Topic to subscribe to"},{"name":"isJSONData", "required":false, "description": "Attempt to parse message value as JSON"},{"name":"isBinaryKey", "required":false, "description": "Encode key as Base64"},{"name":"isBinaryValue", "required":false, "description": "Encode message value as Base64"},{"name":"endpoint", "required":true, "description": "Hostname and port of OpenWhisk deployment"},{"name":"kafka_admin_url", "required":true, "description": "Your Message Hub admin REST URL", "bindTime":true}]' \
     -p bluemixServiceName 'messagehub' \
-    -p endpoint "$APIHOST" \
-    -p package_endpoint $KAFKA_PROVIDER_ENDPOINT
+    -p endpoint "$APIHOST"
 
-$WSK_CLI -i --apihost "$EDGEHOST" action update messaging/messageHubFeed "$PACKAGE_HOME/action/messageHubFeed.js" \
+# make messageHubFeed.zip
+OLD_PATH=`pwd`
+cd action
+
+if [ -e messageHubFeed.zip ]
+then
+    rm -rf messageHubFeed.zip
+fi
+
+cp -f messageHubFeed_package.json package.json
+zip -r messageHubFeed.zip lib package.json messageHubFeed.js
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 messaging/messageHubFeed "$PACKAGE_HOME/action/messageHubFeed.zip" \
     --auth "$AUTH" \
     -a feed true \
     -a description 'Feed to list to Message Hub messages' \
     -a parameters '[ {"name":"kafka_brokers_sasl", "required":true, "description": "Array of Message Hub brokers"},{"name":"user", "required":true, "description": "Message Hub username"},{"name":"password", "required":true, "description": "Message Hub password", "type":"password"},{"name":"topic", "required":true, "description": "Topic to subscribe to"},{"name":"isJSONData", "required":false, "description": "Attempt to parse message value as JSON"},{"name":"isBinaryKey", "required":false, "description": "Encode key as Base64"},{"name":"isBinaryValue", "required":false, "description": "Encode message value as Base64"},{"name":"endpoint", "required":true, "description": "Hostname and port of OpenWhisk deployment"},{"name":"kafka_admin_url", "required":true, "description": "Your Message Hub admin REST URL"}]' \
     -a sampleInput '{"kafka_brokers_sasl":"[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\"]", "username":"someUsername", "password":"somePassword", "topic":"mytopic", "isJSONData": "false", "endpoint":"openwhisk.ng.bluemix.net", "kafka_admin_url":"https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443"}'
 
+# create messagingWeb package and web version of feed action
+$WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+    --auth "$AUTH" \
+    --shared no \
+    -p endpoint "$APIHOST" \
+    -p DB_URL "$DB_URL" \
+    -p DB_NAME "$DB_NAME" \
+
+# make messageHubFeedWeb.zip
+
+if [ -e messageHubFeedWeb.zip ]
+then
+    rm -rf messageHubFeedWeb.zip
+fi
+
+cp -f messageHubFeedWeb_package.json package.json
+zip -r messageHubFeedWeb.zip lib package.json messageHubFeedWeb.js
+
+cd $OLD_PATH
+
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 messagingWeb/messageHubFeedWeb "$PACKAGE_HOME/action/messageHubFeedWeb.zip" \
+    --auth "$AUTH" \
+    --web true \
+    -a description 'Write a new trigger to MH provider DB' \
+    -a parameters '[ {"name":"kafka_brokers_sasl", "required":true, "description": "Array of Message Hub brokers"},{"name":"user", "required":true, "description": "Message Hub username"},{"name":"password", "required":true, "description": "Message Hub password", "type":"password"},{"name":"topic", "required":true, "description": "Topic to subscribe to"},{"name":"isJSONData", "required":false, "description": "Attempt to parse message value as JSON"},{"name":"isBinaryKey", "required":false, "description": "Encode key as Base64"},{"name":"isBinaryValue", "required":false, "description": "Encode message value as Base64"},{"name":"endpoint", "required":true, "description": "Hostname and port of OpenWhisk deployment"},{"name":"kafka_admin_url", "required":true, "description": "Your Message Hub admin REST URL"}]'
+
 $WSK_CLI -i --apihost "$EDGEHOST" action update messaging/messageHubProduce "$PACKAGE_HOME/action/messageHubProduce.py" \
     --auth "$AUTH" \
     --kind python:3 \
diff --git a/installKafka.sh b/installKafka.sh
old mode 100644
new mode 100755
index 96aaa14..cd92d1a
--- a/installKafka.sh
+++ b/installKafka.sh
@@ -4,8 +4,8 @@
 # automatically
 #
 # To run this command
-# ./installKafka.sh  <AUTH> <EDGEHOST> <KAFKA_TRIGGER_HOST> <KAFKA_TRIGGER_PORT> <APIHOST>
-# AUTH and APIHOST are found in $HOME/.wskprops
+# ./installKafka.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# authkey and apihost are found in $HOME/.wskprops
 
 set -e
 set -x
@@ -15,16 +15,15 @@
 
 if [ $# -eq 0 ]
 then
-echo "Usage: ./installCatalog.sh <authkey> <apihost> <kafkatriggerhost> <kafkatriggerport>"
+echo "Usage: ./installKafka.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>"
 fi
 
 AUTH="$1"
 EDGEHOST="$2"
-KAFKA_TRIGGER_HOST="$3"
-KAFKA_TRIGGER_PORT="$4"
+DB_URL="$3"
+DB_NAME="${4}ow_kafka_triggers"
 APIHOST="$5"
 
-
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
 if [ -f "$AUTH" ]; then
@@ -34,20 +33,61 @@
 # Make sure that the APIHOST is not empty.
 : ${APIHOST:?"APIHOST must be set and non-empty"}
 
-KAFKA_PROVIDER_ENDPOINT=$KAFKA_TRIGGER_HOST':'$KAFKA_TRIGGER_PORT
-
 PACKAGE_HOME="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 
 export WSK_CONFIG_FILE= # override local property file to avoid namespace clashes
 
-echo Installing the kafka feed action.
+echo Installing the Kafka package and feed action.
 
-$WSK_CLI -i --apihost "$EDGEHOST" action update messaging/kafkaFeed "$PACKAGE_HOME/action/kafkaFeed.js" \
+# make kafkaFeed.zip
+OLD_PATH=`pwd`
+cd action
+
+if [ -e kafkaFeed.zip ]
+then
+    rm -rf kafkaFeed.zip
+fi
+
+cp -f kafkaFeed_package.json package.json
+zip -r kafkaFeed.zip lib package.json kafkaFeed.js
+cd $OLD_PATH
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 messaging/kafkaFeed "$PACKAGE_HOME/action/kafkaFeed.zip" \
     --auth "$AUTH" \
+    -a feed true \
     -a description 'Feed to listen to Kafka messages' \
     -a parameters '[ {"name":"brokers", "required":true, "description": "Array of Kafka brokers"}, {"name":"topic", "required":true, "description": "Topic to subscribe to"}, {"name":"isJSONData", "required":false, "description": "Attempt to parse message value as JSON"}, {"name":"isBinaryKey", "required":false, "description": "Encode key as Base64"}, {"name":"isBinaryValue", "required":false, "description": "Encode message value as Base64"}, {"name":"endpoint", "required":true, "description": "Hostname and port of OpenWhisk deployment"}]' \
     -a sampleInput '{"brokers":"[\"127.0.0.1:9093\"]", "topic":"mytopic", "isJSONData":"false", "endpoint": "openwhisk.ng.bluemix.net"}'
 
+# create messagingWeb package and web version of feed action
+$WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+    --auth "$AUTH" \
+    --shared no \
+    -p endpoint "$APIHOST" \
+    -p DB_URL "$DB_URL" \
+    -p DB_NAME "$DB_NAME" \
+
+# make kafkaFeedWeb.zip
+OLD_PATH=`pwd`
+cd action
+
+if [ -e kafkaFeedWeb.zip ]
+then
+    rm -rf kafkaFeedWeb.zip
+fi
+
+cp -f kafkaFeedWeb_package.json package.json
+zip -r kafkaFeedWeb.zip lib package.json kafkaFeedWeb.js
+
+cd $OLD_PATH
+
+
+$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 messagingWeb/kafkaFeedWeb "$PACKAGE_HOME/action/kafkaFeedWeb.zip" \
+    --auth "$AUTH" \
+    --web true \
+    -a description 'Write a new trigger to Kafka provider DB' \
+    -a parameters '[ {"name":"brokers", "required":true, "description": "Array of Kafka brokers"},{"name":"topic", "required":true, "description": "Topic to subscribe to"},{"name":"isJSONData", "required":false, "description": "Attempt to parse message value as JSON"},{"name":"isBinaryKey", "required":false, "description": "Encode key as Base64"},{"name":"isBinaryValue", "required":false, "description": "Encode message value as Base64"},{"name":"endpoint", "required":true, "description": "Hostname and port of OpenWhisk deployment"}]'
+
 $WSK_CLI -i --apihost "$EDGEHOST" action update messaging/kafkaProduce "$PACKAGE_HOME/action/kafkaProduce.py" \
     --auth "$AUTH" \
     --kind python:3 \
diff --git a/provider/app.py b/provider/app.py
index 7263339..f189981 100644
--- a/provider/app.py
+++ b/provider/app.py
@@ -14,139 +14,23 @@
 
 import logging
 import os
-import requests
-import sys
-import urllib
-import uuid
 
-from datetime import datetime
-from flask import Flask, jsonify, request
-from consumer import Consumer
+from flask import Flask, jsonify
 from consumercollection import ConsumerCollection
 from database import Database
-from threading import Lock
 from thedoctor import TheDoctor
-from urlparse import urlparse
 from health import generateHealthReport
 from gevent.wsgi import WSGIServer
+from service import Service
 
 
 app = Flask(__name__)
 app.debug = False
+
 database = Database()
-
 consumers = ConsumerCollection()
 
 
-@app.route('/triggers/<namespace>/<trigger>', methods=['PUT'])
-def postTrigger(namespace, trigger):
-    body = request.get_json(force=True, silent=True)
-    triggerFQN = '/' + namespace + '/' + trigger
-    expectedRoute = urllib.quote('/namespaces/' + namespace + '/triggers/' + trigger)
-    missing = getMissingPostFields(body)
-
-    if consumers.hasConsumerForTrigger(triggerFQN):
-        logging.warn("[{}] Trigger already exists".format(triggerFQN))
-        response = jsonify({
-            'success': False,
-            'error': "trigger already exists"
-        })
-        response.status_code = 409
-    elif len(missing) > 0:
-        response = jsonify({
-            'success': False,
-            'error': 'missing fields: %s' % ', '.join(missing)
-        })
-        response.status_code = 400
-        return response
-    elif not body["triggerURL"].endswith(expectedRoute):
-        logging.warn("[{}] Trigger and namespace from route must correspond to triggerURL".format(triggerFQN))
-        response = jsonify({
-            'success': False,
-            'error': "trigger and namespace from route must correspond to triggerURL"
-        })
-        response.status_code = 409
-    elif not body["isMessageHub"] and not enable_generic_kafka:
-        # Generic Kafka triggers has been disabled
-        logging.warn("[{}] Attempt to create generic kafka trigger while function is disabled".format(triggerFQN))
-        response = jsonify({
-            'success': False,
-            'error': "only triggers for Message Hub instances are allowed."
-        })
-        response.status_code = 403
-    else:
-        logging.info("[{}] Ensuring user has access rights to post a trigger".format(triggerFQN))
-
-        try:
-            trigger_get_response = requests.get(body["triggerURL"], verify=check_ssl)
-        except:
-            triggerURL = urlparse(body["triggerURL"])
-
-            if triggerURL.port != None:
-                triggerAddress = "{}:{}".format(triggerURL.hostname, triggerURL.port)
-            else:
-                triggerAddress = "{}".format(triggerURL.hostname)
-
-            logging.warn("[{}] Failed to communicate with OpenWhisk server ({}) for authentication".format(triggerFQN, triggerAddress))
-            response = jsonify({
-                'success': False,
-                'error': "failed to communicate with OpenWhisk server ({}) for authentication.".format(triggerAddress)
-            })
-            response.status_code = 500
-            return response
-
-        trigger_get_status_code = trigger_get_response.status_code
-        logging.info("[{}] Repsonse status code from trigger authorization {}".format(triggerFQN,
-                                                                                      trigger_get_status_code))
-        if trigger_get_status_code == 200:
-            logging.info("[{}] User authenticated. About to create consumer {}".format(triggerFQN, str(body)))
-            createAndRunConsumer(triggerFQN, body)
-            logging.info("[{}] Finished creating consumer.".format(triggerFQN))
-            response = jsonify({'success': True})
-            response.status_code = trigger_get_status_code
-        elif trigger_get_status_code == 401:
-            logging.warn("[{}] User not authorized to post trigger".format(triggerFQN))
-            response = jsonify({
-                'success': False,
-                'error': 'not authorized'
-            })
-            response.status_code = trigger_get_status_code
-        else:
-            logging.warn("[{}] Trigger authentication request failed with error code {}".format(triggerFQN,
-                trigger_get_status_code))
-            response = jsonify({'success': False})
-            response.status_code = trigger_get_status_code
-
-    return response
-
-
-@app.route('/triggers/<namespace>/<trigger>', methods=['DELETE'])
-def deleteTrigger(namespace, trigger):
-    auth = request.authorization
-    body = request.get_json(force=True, silent=True)
-
-    triggerFQN = '/' + namespace + '/' + trigger
-    consumer = consumers.getConsumerForTrigger(triggerFQN)
-    if consumer != None:
-        if authorizedForTrigger(auth, consumer):
-            if consumer.desiredState() == Consumer.State.Disabled:
-                # it's already disabled, just delete it
-                database.deleteTrigger(triggerFQN)
-                consumers.removeConsumerForTrigger(triggerFQN)
-            else:
-                consumer.shutdown()
-
-            response = jsonify({'success': True})
-        else:
-            response = jsonify({'error': 'not authorized'})
-            response.status_code = 401
-    else:
-        response = jsonify({'error': 'not found'})
-        response.status_code = 404
-
-    return response
-
-
 @app.route('/')
 def testRoute():
     return jsonify('Hi!')
@@ -157,67 +41,12 @@
     return jsonify(generateHealthReport(consumers))
 
 
-def authorizedForTrigger(auth, consumer):
-    triggerURL = urlparse(consumer.triggerURL)
-
-    return (auth != None and consumer != None and triggerURL.username == auth.username and triggerURL.password == auth.password)
-
-
-def createAndRunConsumer(triggerFQN, params, record=True):
-    # generate a random uuid for new triggers
-    if not 'uuid' in params:
-        params['uuid'] = str(uuid.uuid4())
-
-    # Create a representation for this trigger, even if it is disabled
-    # This allows it to appear in /health as well as allow it to be deleted
-    # Creating this object is lightweight and does not initialize any connections
-    consumer = Consumer(triggerFQN, params)
-    consumers.addConsumerForTrigger(triggerFQN, consumer)
-
-    if 'status' not in params or params['status']['active'] == True:
-        logging.info('{} Trigger was determined to be active, starting...'.format(triggerFQN))
-        consumer.start()
-    else:
-        logging.info('{} Trigger was determined to be disabled, not starting...'.format(triggerFQN))
-
-    if record:
-        database.recordTrigger(triggerFQN, params)
-
-
-def restoreTriggers():
-    for triggerDoc in database.triggers():
-        triggerFQN = triggerDoc['_id']
-        logging.debug('Restoring trigger {}'.format(triggerFQN))
-
-        try:
-            createAndRunConsumer(triggerFQN, triggerDoc, record=False)
-        except:
-            logging.warn('Skipping consumer due to caught exception: {}'.format(triggerDoc))
-
-
-def getMissingPostFields(fields):
-    missing = []
-    requiredFields = ['brokers', 'topic', 'triggerURL', 'isMessageHub']
-
-    if fields is None:
-        missing.extend(requiredFields)
-    else:
-        # Message Hub also requires 'username' and 'password fields'
-        if 'isMessageHub' in fields and fields['isMessageHub'] == True:
-            requiredFields.extend(['username', 'password'])
-
-        missing.extend([i for i in requiredFields if i not in fields])
-
-    if len(missing) > 0:
-        logging.info("Required fields are missing {}".format(missing))
-
-    return missing
-
-
 def main():
     logger = logging.getLogger()
     logger.setLevel(logging.INFO)
 
+    component = os.getenv('INSTANCE', 'messageHubTrigger-0')
+
     # Make sure we log to the console
     streamHandler = logging.StreamHandler()
     formatter = logging.Formatter('[%(asctime)s.%(msecs)03dZ] [%(levelname)s] [??] [kafkatriggers] %(message)s', datefmt="%Y-%m-%dT%H:%M:%S")
@@ -226,7 +55,7 @@
 
     # also log to file if /logs is present
     if os.path.isdir('/logs'):
-        fh = logging.FileHandler('/logs/kafkatriggers_logs.log')
+        fh = logging.FileHandler('/logs/{}_logs.log'.format(component))
         fh.setFormatter(formatter)
         logger.addHandler(fh)
 
@@ -246,7 +75,7 @@
 
     TheDoctor(consumers).start()
 
-    restoreTriggers()
+    Service(consumers).start()
 
     port = int(os.getenv('PORT', 5000))
     server = WSGIServer(('', port), app, log=logging.getLogger())
diff --git a/provider/consumer.py b/provider/consumer.py
index 0f0b69c..44a4321 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -61,6 +61,9 @@
     def shutdown(self):
         self.thread.shutdown()
 
+    def disable(self):
+        self.thread.setDesiredState(Consumer.State.Disabled)
+
     def start(self):
         self.thread.start()
 
@@ -216,7 +219,6 @@
 
         if self.desiredState() == Consumer.State.Dead:
             logging.info('[{}] Permanently killing consumer because desired state is Dead'.format(self.trigger))
-            self.database.deleteTrigger(self.trigger)
         elif self.desiredState() == Consumer.State.Restart:
             logging.info('[{}] Quietly letting the consumer thread stop in order to allow restart.'.format(self.trigger))
             # nothing else to do because this Thread is about to go away
diff --git a/provider/database.py b/provider/database.py
index 996bc16..73ba241 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -17,18 +17,18 @@
 import time
 import uuid
 
-from cloudant import Cloudant
+from cloudant.client import CouchDB
 from cloudant.result import Result
-from threading import Lock
 
 class Database:
     db_prefix = os.getenv('DB_PREFIX', '')
     dbname = db_prefix + 'ow_kafka_triggers'
-    username = os.environ['CLOUDANT_USER']
-    password = os.environ['CLOUDANT_PASS']
 
-    lock = Lock()
-    client = Cloudant(username, password, account=username)
+    username = os.environ['DB_USER']
+    password = os.environ['DB_PASS']
+    url = os.environ['DB_URL']
+
+    client = CouchDB(username, password, url=url)
     client.connect()
 
     filters_design_doc_id = '_design/filters'
@@ -41,79 +41,35 @@
         logging.warn('Database does not exist - creating it.')
         database = client.create_database(dbname)
 
-    def recordTrigger(self, triggerFQN, doc):
-        with self.lock:
-            try:
-                document = dict(doc)
-                document['_id'] = triggerFQN
-
-                if 'status' not in document:
-                    # set the status as active
-                    status = {
-                        'active': True,
-                        'dateChanged': time.time()
-                    }
-
-                    document['status'] = status
-
-                logging.info('Writing trigger {} to DB'.format(triggerFQN))
-                result = self.database.create_document(document)
-                logging.info('Successfully wrote trigger {} to DB'.format(triggerFQN))
-
-                return result
-            except Exception as e:
-                logging.error('[{}] Uncaught exception while recording trigger to database: {}'.format(triggerFQN, e))
-
-
-    def deleteTrigger(self, triggerFQN):
-        with self.lock:
-            try:
-                document = self.database[triggerFQN]
-
-                if document.exists():
-                    logging.info('Found trigger to delete from DB: {}'.format(triggerFQN))
-                    document.delete()
-                    logging.info('Successfully deleted trigger from DB: {}'.format(triggerFQN))
-                else:
-                    logging.warn('Attempted to delete non-existent trigger from DB: {}'.format(triggerFQN))
-            except Exception as e:
-                logging.error('[{}] Uncaught exception while deleting trigger from database: {}'.format(triggerFQN, e))
 
     def disableTrigger(self, triggerFQN, status_code):
-        with self.lock:
-            try:
-                document = self.database[triggerFQN]
+        try:
+            document = self.database[triggerFQN]
 
-                if document.exists():
-                    logging.info('Found trigger to disable from DB: {}'.format(triggerFQN))
+            if document.exists():
+                logging.info('Found trigger to disable from DB: {}'.format(triggerFQN))
 
-                    status = {
-                        'active': False,
-                        'dateChanged': time.time(),
-                        'reason': {
-                            'kind': 'AUTO',
-                            'statusCode': status_code,
-                            'message': 'Automatically disabled after receiving a {} status code when firing the trigger.'.format(status_code)
-                        }
+                status = {
+                    'active': False,
+                    'dateChanged': time.time(),
+                    'reason': {
+                        'kind': 'AUTO',
+                        'statusCode': status_code,
+                        'message': 'Automatically disabled after receiving a {} status code when firing the trigger.'.format(status_code)
                     }
+                }
 
-                    document['status'] = status
-                    document.save()
+                document['status'] = status
+                document.save()
 
-                    logging.info('{} Successfully recorded trigger as disabled.'.format(triggerFQN))
-            except Exception as e:
-                logging.error('[{}] Uncaught exception while disabling trigger: {}'.format(triggerFQN, e))
+                logging.info('{} Successfully recorded trigger as disabled.'.format(triggerFQN))
+        except Exception as e:
+            logging.error('[{}] Uncaught exception while disabling trigger: {}'.format(triggerFQN, e))
 
 
-    def triggers(self):
-        allDocs = []
+    def changesFeed(self):
+        return self.database.infinite_changes(include_docs=True)
 
-        logging.info('Fetching all triggers from DB')
-        for document in self.database.get_view_result(self.filters_design_doc_id, self.only_triggers_view_id, include_docs=True):
-            allDocs.append(document['doc'])
-
-        logging.info('Successfully retrieved {} triggers'.format(len(allDocs)))
-        return allDocs
 
     def migrate(self):
         logging.info('Starting DB migration')
diff --git a/provider/health.py b/provider/health.py
index 3eb19ec..2024d7b 100644
--- a/provider/health.py
+++ b/provider/health.py
@@ -14,7 +14,6 @@
 
 import psutil   # https://pythonhosted.org/psutil/
 
-from consumercollection import ConsumerCollection
 from datetime import datetime
 
 MILLISECONDS_IN_SECOND = 1000
@@ -118,7 +117,6 @@
 
 def getConsumers(consumers):
     consumerReports = []
-    currentTime = datetime.now()
 
     consumerCopyRO = consumers.getCopyForRead()
     for consumerId in consumerCopyRO:
diff --git a/provider/service.py b/provider/service.py
new file mode 100644
index 0000000..9b24517
--- /dev/null
+++ b/provider/service.py
@@ -0,0 +1,89 @@
+# Copyright 2016 IBM Corp. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from consumer import Consumer
+from database import Database
+from threading import Thread
+
+class Service (Thread):
+    def __init__(self, consumers):
+        Thread.__init__(self)
+        self.daemon = True
+
+        self.changes = Database().changesFeed()
+        self.consumers = consumers
+
+    def run(self):
+        while True:
+            for change in self.changes:
+                if "deleted" in change and change["deleted"] == True:
+                    logging.info('[changes] Found a delete')
+                    consumer = self.consumers.getConsumerForTrigger(change['id'])
+                    if consumer != None:
+                        if consumer.desiredState() == Consumer.State.Disabled:
+                            # just remove it from memory
+                            logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
+                            self.consumers.removeConsumerForTrigger(consumer.trigger)
+                        else:
+                            logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
+                            consumer.shutdown()
+                # since we can't use a filter function for the feed (then
+                # you don't get deletes) we need to manually verify this
+                # is a valid trigger doc that has changed
+                elif 'triggerURL' in change['doc']:
+                    logging.info('[changes] Found a change in a trigger document')
+                    document = change['doc']
+
+                    if not self.consumers.hasConsumerForTrigger(change["id"]):
+                        logging.info('[{}] Found a new trigger to create'.format(change["id"]))
+                        self.createAndRunConsumer(document)
+                    else:
+                        logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
+                        existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
+
+                        if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                            # disabled trigger has become active
+                            logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
+                            self.createAndRunConsumer(document)
+                        elif existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
+                            # running trigger should become disabled
+                            logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
+                            existingConsumer.disable()
+                        else:
+                            logging.debug('[changes] Found non-interesting trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
+                else:
+                    logging.debug('[changes] Found a change for a non-trigger document')
+
+            logging.error("[changes] uh-oh! I made it out of the changes for loop!")
+
+
+    def createAndRunConsumer(self, doc):
+        triggerFQN = doc['_id']
+
+        # Create a representation for this trigger, even if it is disabled
+        # This allows it to appear in /health as well as allow it to be deleted
+        # Creating this object is lightweight and does not initialize any connections
+        consumer = Consumer(triggerFQN, doc)
+        self.consumers.addConsumerForTrigger(triggerFQN, consumer)
+
+        if self.__isTriggerDocActive(doc):
+            logging.info('[{}] Trigger was determined to be active, starting...'.format(triggerFQN))
+            consumer.start()
+        else:
+            logging.info('[{}] Trigger was determined to be disabled, not starting...'.format(triggerFQN))
+
+    def __isTriggerDocActive(self, doc):
+        return ('status' not in doc or doc['status']['active'] == True)
diff --git a/provider/thedoctor.py b/provider/thedoctor.py
index c861518..a36dc90 100644
--- a/provider/thedoctor.py
+++ b/provider/thedoctor.py
@@ -2,7 +2,6 @@
 import time
 
 from consumer import Consumer
-from consumercollection import ConsumerCollection
 from threading import Thread
 
 class TheDoctor (Thread):
diff --git a/tests/dat/multipleValueTypes.json b/tests/dat/multipleValueTypes.json
index 2f3b082..e22e0c4 100644
--- a/tests/dat/multipleValueTypes.json
+++ b/tests/dat/multipleValueTypes.json
@@ -2,6 +2,9 @@
   "kafka_brokers_sasl": [
     "someBroker"
   ],
+  "brokers": [
+    "someBroker"
+  ],
   "topic": "someTopic",
   "user": "someUser",
   "password": "somePassword",
diff --git a/tests/src/test/scala/system/packages/KafkaFeedTests.scala b/tests/src/test/scala/system/packages/KafkaFeedTests.scala
index cc59a36..07ea59d 100644
--- a/tests/src/test/scala/system/packages/KafkaFeedTests.scala
+++ b/tests/src/test/scala/system/packages/KafkaFeedTests.scala
@@ -42,24 +42,16 @@
 
   implicit val wskprops = WskProps()
   val wsk = new Wsk()
-  val actionName = "kafkaFeedAction"
-  val actionFile = "../action/kafkaFeed.js"
+
+  val messagingPackage = "/whisk.system/messaging"
+  val kafkaFeed = "kafkaFeed"
+  val actionName = s"${messagingPackage}/${kafkaFeed}"
 
   behavior of "Kafka feed action"
 
-  override def beforeAll() {
-    wsk.action.create(actionName, Some(actionFile))
-    super.beforeAll()
-  }
-
-  override def afterAll()  {
-    wsk.action.delete(actionName)
-    super.afterAll()
-  }
-
   it should "reject invocation when topic argument is missing" in {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"topic\" parameter.")
+      "error" -> JsString("You must supply a 'topic' parameter.")
     )
 
     runActionWithExpectedResult(actionName, "dat/missingTopic.json", expectedOutput, false)
@@ -67,20 +59,12 @@
 
   it should "reject invocation when brokers argument is missing" in  {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"brokers\" parameter as an array of Kafka brokers.")
+      "error" -> JsString("You must supply a 'brokers' parameter.")
     )
 
     runActionWithExpectedResult(actionName, "dat/missingBrokers.json", expectedOutput, false)
   }
 
-  it should "reject invocation when package_endpoint argument is missing" in {
-    val expectedOutput = JsObject(
-      "error" -> JsString("Could not find the package_endpoint parameter.")
-    )
-
-    runActionWithExpectedResult(actionName, "dat/missingPackageEndpoint.json", expectedOutput, false)
-  }
-
   it should "reject invocation when isJSONData and isBinaryValue are both enable" in {
     val expectedOutput = JsObject(
       "error" -> JsString("isJSONData and isBinaryValue cannot both be enabled.")
diff --git a/tests/src/test/scala/system/packages/KafkaFeedWebTests.scala b/tests/src/test/scala/system/packages/KafkaFeedWebTests.scala
new file mode 100644
index 0000000..3ff26a8
--- /dev/null
+++ b/tests/src/test/scala/system/packages/KafkaFeedWebTests.scala
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import org.junit.runner.RunWith
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+
+import common.Wsk
+import common.WskProps
+import common.TestUtils.FORBIDDEN
+
+import spray.json._
+
+@RunWith(classOf[JUnitRunner])
+class KafkaFeedWebTests
+  extends FlatSpec
+    with BeforeAndAfter
+    with Matchers {
+
+  val wskprops = WskProps()
+
+  val webAction = "/whisk.system/messagingWeb/kafkaFeedWeb"
+  val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http"
+
+  val completeParams = JsObject(
+    "triggerName" -> JsString("/invalidNamespace/invalidTrigger"),
+    "topic" -> JsString("someTopic"),
+    "brokers" -> JsArray(JsString("someBroker")),
+    "user" -> JsString("someUsername"),
+    "password" -> JsString("somePassword"),
+    "kafka_admin_url" -> JsString("https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443"),
+    "authKey" -> JsString("DoesNotWork")
+  )
+
+  def makePutCallWithExpectedResult(params: JsObject, expectedResult: String, expectedCode: Int) = {
+    val response = RestAssured.given()
+        .contentType("application/json\r\n")
+        .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+        .body(params.toString())
+        .put(webActionURL)
+    assert(response.statusCode() == expectedCode)
+    response.body.asString shouldBe expectedResult
+  }
+
+  def makeDeleteCallWithExpectedResult(expectedResult: String, expectedCode: Int) = {
+    val response = RestAssured.given().contentType("application/json\r\n").config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).delete(webActionURL)
+    assert(response.statusCode() == expectedCode)
+    response.body.asString shouldBe expectedResult
+  }
+
+  behavior of "Kafka feed web action"
+
+  it should "not be obtainable using the CLI" in {
+      val wsk = new Wsk()
+      implicit val wp = wskprops
+
+      wsk.action.get(webAction, FORBIDDEN)
+  }
+
+  it should "reject post of a trigger due to missing brokers argument" in {
+    val params = JsObject(completeParams.fields - "brokers")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'brokers' parameter.", 400)
+  }
+
+  it should "reject post of a trigger due to missing topic argument" in {
+    val params = JsObject(completeParams.fields - "topic")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'topic' parameter.", 400)
+  }
+
+  it should "reject post of a trigger due to missing triggerName argument" in {
+    val params = JsObject(completeParams.fields - "triggerName")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'triggerName' parameter.", 400)
+  }
+
+  it should "reject put of a trigger when authentication fails" in {
+    makePutCallWithExpectedResult(completeParams, "You are not authorized for this trigger.", 401)
+  }
+
+  // it should "reject delete of a trigger that does not exist" in {
+  //   val expectedJSON = JsObject(
+  //     "triggerName" -> JsString("/invalidNamespace/invalidTrigger"),
+  //     "error" -> JsString("not found")
+  //   )
+  //
+  //   makeDeleteCallWithExpectedResult(expectedJSON, 404)
+  // }
+}
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d47d56c..299087a 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -67,63 +67,50 @@
 
   implicit val wskprops = WskProps()
   val wsk = new Wsk()
-  val actionName = "messageHubFeedAction"
-  val actionFile = "../action/messageHubFeed.js"
+  val actionName = s"${messagingPackage}/${messageHubFeed}"
 
   behavior of "Message Hub feed action"
 
-  override def beforeAll() {
-    wsk.action.create(actionName, Some(actionFile))
-    super.beforeAll()
-  }
-
-  override def afterAll() {
-    wsk.action.delete(actionName)
-    super.afterAll()
-  }
-
   it should "reject invocation when topic argument is missing" in {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"topic\" parameter."))
+      "error" -> JsString("You must supply a 'topic' parameter.")
+    )
 
     runActionWithExpectedResult(actionName, "dat/missingTopic.json", expectedOutput, false)
   }
 
   it should "reject invocation when kafka_brokers_sasl argument is missing" in {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"kafka_brokers_sasl\" parameter as an array of Message Hub brokers."))
+      "error" -> JsString("You must supply a 'kafka_brokers_sasl' parameter.")
+    )
 
     runActionWithExpectedResult(actionName, "dat/missingBrokers.json", expectedOutput, false)
   }
 
   it should "reject invocation when kafka_admin_url argument is missing" in {
     val expectedOutput = JsObject("error" -> JsString(
-      "You must supply a \"kafka_admin_url\" parameter."))
+      "You must supply a 'kafka_admin_url' parameter.")
+    )
 
     runActionWithExpectedResult(actionName, "dat/missingAdminURL.json", expectedOutput, false)
   }
 
   it should "reject invocation when user argument is missing" in {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"user\" parameter to authenticate with Message Hub."))
+      "error" -> JsString("You must supply a 'user' parameter to authenticate with Message Hub.")
+    )
 
     runActionWithExpectedResult(actionName, "dat/missingUser.json", expectedOutput, false)
   }
 
   it should "reject invocation when password argument is missing" in {
     val expectedOutput = JsObject(
-      "error" -> JsString("You must supply a \"password\" parameter to authenticate with Message Hub."))
+      "error" -> JsString("You must supply a 'password' parameter to authenticate with Message Hub.")
+    )
 
     runActionWithExpectedResult(actionName, "dat/missingPassword.json", expectedOutput, false)
   }
 
-  it should "reject invocation when package_endpoint argument is missing" in {
-    val expectedOutput = JsObject(
-      "error" -> JsString("Could not find the package_endpoint parameter."))
-
-    runActionWithExpectedResult(actionName, "dat/missingPackageEndpoint.json", expectedOutput, false)
-  }
-
   it should "reject invocation when isJSONData and isBinaryValue are both enable" in {
     val expectedOutput = JsObject(
       "error" -> JsString("isJSONData and isBinaryValue cannot both be enabled."))
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedWebTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedWebTests.scala
new file mode 100644
index 0000000..b8aa723
--- /dev/null
+++ b/tests/src/test/scala/system/packages/MessageHubFeedWebTests.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import org.junit.runner.RunWith
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+
+import common.Wsk
+import common.WskProps
+import common.TestUtils.FORBIDDEN
+
+import spray.json._
+
+@RunWith(classOf[JUnitRunner])
+class MessageHubFeedWebTests
+  extends FlatSpec
+    with BeforeAndAfter
+    with Matchers {
+
+  val wskprops = WskProps()
+
+  val webAction = "/whisk.system/messagingWeb/messageHubFeedWeb"
+  val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http"
+
+  val completeParams = JsObject(
+    "triggerName" -> JsString("/invalidNamespace/invalidTrigger"),
+    "topic" -> JsString("someTopic"),
+    "kafka_brokers_sasl" -> JsArray(JsString("someBroker")),
+    "user" -> JsString("someUsername"),
+    "password" -> JsString("somePassword"),
+    "kafka_admin_url" -> JsString("https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443"),
+    "authKey" -> JsString("DoesNotWork")
+  )
+
+  def makePutCallWithExpectedResult(params: JsObject, expectedResult: String, expectedCode: Int) = {
+    val response = RestAssured.given()
+        .contentType("application/json\r\n")
+        .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+        .body(params.toString())
+        .put(webActionURL)
+    assert(response.statusCode() == expectedCode)
+    response.body.asString shouldBe expectedResult
+  }
+
+  def makeDeleteCallWithExpectedResult(expectedResult: String, expectedCode: Int) = {
+    val response = RestAssured.given().contentType("application/json\r\n").config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).delete(webActionURL)
+    assert(response.statusCode() == expectedCode)
+    response.body.asString shouldBe expectedResult
+  }
+
+  behavior of "Message Hub feed web action"
+
+  it should "not be obtainable using the CLI" in {
+      val wsk = new Wsk()
+      implicit val wp = wskprops
+
+      wsk.action.get(webAction, FORBIDDEN)
+  }
+
+  it should "reject post of a trigger due to missing kafka_brokers_sasl argument" in {
+    val params = JsObject(completeParams.fields - "kafka_brokers_sasl")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'kafka_brokers_sasl' parameter.", 400)
+  }
+
+  it should "reject post of a trigger due to missing topic argument" in {
+    val params = JsObject(completeParams.fields - "topic")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'topic' parameter.", 400)
+  }
+
+  it should "reject post of a trigger due to missing triggerName argument" in {
+    val params = JsObject(completeParams.fields - "triggerName")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'triggerName' parameter.", 400)
+  }
+
+  it should "reject post of a trigger due to missing user argument" in {
+    val params = JsObject(completeParams.fields - "user")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'user' parameter to authenticate with Message Hub.", 400)
+  }
+
+  it should "reject post of a trigger due to missing password argument" in {
+    val params = JsObject(completeParams.fields - "password")
+
+    makePutCallWithExpectedResult(params, "You must supply a 'password' parameter to authenticate with Message Hub.", 400)
+  }
+
+  it should "reject put of a trigger when authentication fails" in {
+    makePutCallWithExpectedResult(completeParams, "You are not authorized for this trigger.", 401)
+  }
+
+  // it should "reject delete of a trigger that does not exist" in {
+  //   val expectedJSON = JsObject(
+  //     "triggerName" -> JsString("/invalidNamespace/invalidTrigger"),
+  //     "error" -> JsString("not found")
+  //   )
+  //
+  //   makeDeleteCallWithExpectedResult(expectedJSON, 404)
+  // }
+}
diff --git a/tests/src/test/scala/system/packages/MessagingServiceTests.scala b/tests/src/test/scala/system/packages/MessagingServiceTests.scala
index 5ddd4a5..0e531f8 100644
--- a/tests/src/test/scala/system/packages/MessagingServiceTests.scala
+++ b/tests/src/test/scala/system/packages/MessagingServiceTests.scala
@@ -21,8 +21,6 @@
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
 import com.jayway.restassured.RestAssured
-import common.WhiskProperties
-import spray.json._
 
 @RunWith(classOf[JUnitRunner])
 class MessagingServiceTests
@@ -30,25 +28,13 @@
     with BeforeAndAfter
     with Matchers {
 
-  val healthEndpoint = s"/health"
+  val healthEndpoint = "/health"
 
   val getMessagingAddress =
     if (System.getProperty("host") != "" && System.getProperty("port") != "") {
       "http://" + System.getProperty("host") + ":" + System.getProperty("port")
     }
 
-  def makePutCallWithExpectedResult(url: String, params: JsObject, expectedResult: JsObject, expectedCode: Int) = {
-    val response = RestAssured.given().body(params.toString()).put(url)
-    assert(response.statusCode() == expectedCode)
-    response.body.asString.parseJson.asJsObject shouldBe expectedResult
-  }
-
-  def makeDeleteCallWithExpectedResult(url: String, expectedResult: JsObject, expectedCode: Int) = {
-    val response = RestAssured.given().delete(url)
-    assert(response.statusCode() == expectedCode)
-    response.body.asString.parseJson.asJsObject shouldBe expectedResult
-  }
-
   behavior of "Messaging feed provider endpoint"
 
   it should "return status code HTTP 200 OK from /health endpoint" in {
@@ -56,196 +42,4 @@
 
     assert(response.statusCode() == 200 && response.asString().contains("consumers"))
   }
-
-  it should "reject post of a trigger when missing all arguments" in {
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: brokers, topic, triggerURL, isMessageHub"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", JsObject(), expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing brokers argument" in {
-    val params = JsObject(
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString("someURL"),
-      "isMessageHub" -> JsBoolean(false)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: brokers"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing topic argument" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "triggerURL" -> JsString("someURL"),
-      "isMessageHub" -> JsBoolean(false)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: topic"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing triggerURL argument" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "topic" -> JsString("someTopic"),
-      "isMessageHub" -> JsBoolean(false)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: triggerURL"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing isMessageHub argument" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "triggerURL" -> JsString("someURL"),
-      "topic" -> JsString("someTopic")
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: isMessageHub"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing username argument" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString("someURL"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: username"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to missing password argument" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString("someURL"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("missing fields: password"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 400)
-  }
-
-  it should "reject post of a trigger due to mismatch between triggerURL and trigger name" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString(s"https://someKey@${WhiskProperties.getEdgeHost}/api/v1/namespaces/invalidNamespace/triggers/someTrigger"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("trigger and namespace from route must correspond to triggerURL"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress +  "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 409)
-  }
-
-  it should "reject post of a trigger due to mismatch between triggerURL and namespace" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString(s"https://someKey@${WhiskProperties.getEdgeHost}/api/v1/namespaces/someNamespace/triggers/invalidTrigger"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("trigger and namespace from route must correspond to triggerURL"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 409)
-  }
-
-  it should "reject post of a trigger when authentication fails" in {
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString(s"https://someKey@${WhiskProperties.getEdgeHost}/api/v1/namespaces/invalidNamespace/triggers/invalidTrigger"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString("not authorized"),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 401)
-  }
-
-  it should "reject delete of a trigger that does not exist" in {
-    val expectedJSON = JsObject(
-      "error" -> JsString("not found")
-    )
-
-    makeDeleteCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", expectedJSON, 404)
-  }
-
-  it should "reject post of a trigger when OpenWhisk host and port fails" in {
-    val address = "0.0.0.0.0.0:9999"
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString(s"https://someKey@$address/api/v1/namespaces/invalidNamespace/triggers/invalidTrigger"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString(s"failed to communicate with OpenWhisk server ($address) for authentication."),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 500)
-  }
-
-  it should "reject post of a trigger when OpenWhisk host fails" in {
-    val address = "0.0.0.0.0.0"
-    val params = JsObject(
-      "brokers" -> JsArray(JsString("someBroker")),
-      "username" -> JsString("someUsername"),
-      "password" -> JsString("somePassword"),
-      "topic" -> JsString("someTopic"),
-      "triggerURL" -> JsString(s"https://someKey@$address/api/v1/namespaces/invalidNamespace/triggers/invalidTrigger"),
-      "isMessageHub" -> JsBoolean(true)
-    )
-    val expectedJSON = JsObject(
-      "error" -> JsString(s"failed to communicate with OpenWhisk server ($address) for authentication."),
-      "success" -> JsBoolean(false)
-    )
-
-    makePutCallWithExpectedResult(getMessagingAddress + "/triggers/invalidNamespace/invalidTrigger", params, expectedJSON, 500)
-  }
 }