trigger fires to 1M + parameter handling + error handling + includeDoc
removal
diff --git a/actions/changes.js b/actions/changes.js
index fe35d67..7f471e1 100644
--- a/actions/changes.js
+++ b/actions/changes.js
@@ -15,13 +15,58 @@
var dbname = msg.dbname;
var user = msg.username;
var pass = msg.password;
- var includeDoc = msg.includeDoc || false;
var host = msg.host;
var protocol = msg.protocol || 'https';
var port = msg.port;
- var maxTriggers = msg.maxTriggers || 1000;
+ var maxTriggers = msg.maxTriggers;
+
+ var validProperties = {
+ authKey: "",
+ bluemixServiceName: "",
+ dbname: "",
+ host: "",
+ lifecycleEvent: "",
+ maxTriggers: "",
+ package_endpoint: "",
+ password: "",
+ triggerName: "",
+ username: ""
+ }
if (lifecycleEvent === 'CREATE') {
+
+ // handle any invalid parameters here
+ for(var prop in msg) {
+ if (!(prop in validProperties)) {
+ var eMsg = 'cloudant trigger feed: invalid property not supported: ' + prop;
+ console.log(eMsg,'[error:]', whisk.error(eMsg));
+ return;
+ }
+ }
+
+ // check for missing mandatory parameters
+ var paramError;
+ if (!dbname) {
+ paramError = 'cloudant trigger feed: missing dbname parameter - ' + dbname;
+ console.log(paramError, '[error:]', whisk.error(paramError));
+ return;
+ }
+ if (!host) {
+ paramError = 'cloudant trigger feed: missing host parameter - ' + host;
+ console.log(paramError, '[error:]', whisk.error(paramError));
+ return;
+ }
+ if (!user) {
+ paramError = 'cloudant trigger feed: missing username parameter - ' + user;
+ console.log(paramError, '[error:]', whisk.error(paramError));
+ return;
+ }
+ if (!pass) {
+ paramError = 'cloudant trigger feed: missing password parameter - ' + pass;
+ console.log(paramError, '[error:]', whisk.error(paramError));
+ return;
+ }
+
// auth key for trigger
var apiKey = msg.authKey;
var auth = apiKey.split(':');
@@ -33,7 +78,6 @@
input["dbname"] = dbname;
input["user"] = user;
input["pass"] = pass;
- input["includeDoc"] = includeDoc;
input["apikey"] = apiKey;
input["maxTriggers"] = maxTriggers;
input["callback"] = {};
@@ -44,7 +88,8 @@
} else if (lifecycleEvent === 'DELETE') {
return cloudantHelper(provider_endpoint, 'delete', replaceNameTrigger);
} else {
- return whisk.error('operation is neither CREATE or DELETE');
+ var eMsg = 'operation is neither CREATE or DELETE';
+ whisk.error(eMsg);
}
}
diff --git a/provider/app.js b/provider/app.js
index 18a6ddb..7f135b3 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -8,13 +8,11 @@
var request = require('request');
var bodyParser = require('body-parser');
var logger = require('./Logger');
-var RequestAgent = require('agentkeepalive');
var ProviderUtils = require('./lib/utils.js');
var ProviderHealth = require('./lib/health.js');
var ProviderRAS = require('./lib/ras.js');
var ProviderUpdate = require('./lib/update.js');
-var ProviderCreate = require('./lib/create.js');
var ProviderDelete = require('./lib/delete.js');
var constants = require('./lib/constants.js');
@@ -30,18 +28,9 @@
// Whisk API Router Host
var routerHost = process.env.ROUTER_HOST || 'localhost';
-// This is the maximum times a single trigger is allow to fire.
-// Trigger should not be allow to be created with a value higher than this value
-// Trigger can be created with a value lower than this between 1 and this value
-var triggerFireLimit = 10000;
-
// Maximum number of times to retry the invocation of an action
// before deleting the associated trigger
-var retriesBeforeDelete = 5;
-
-// The maxSockets determines how many concurrent sockets the agent can have open per
-// host, is present in an agent by default with value ??.
-var maximumDbConnections = 50;
+var retriesBeforeDelete = constants.RETRIES_BEFORE_DELETE;
// Allow invoking servers with self-signed certificates.
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
@@ -65,17 +54,19 @@
function createDatabase (nanop) {
+ logger.info(tid, 'createDatabase', 'creating the trigger database');
if (nanop !== null) {
nanop.db.create(databaseName, function(err, body, header) {
if (!err) {
logger.info(tid, databaseName, ' database for triggers was created.');
} else {
- logger.info(tid, databaseName, err);
+ logger.info(tid, databaseName, 'failed to create the trigger database. it might already exist ',err);
}
});
var chDb = nanop.db.use(databaseName);
return chDb;
} else {
+ logger.info(tid, databaseName, 'failed to create the trigger database. nano provider did not get created. check db URL: ' + dbHost);
return null;
}
@@ -83,27 +74,16 @@
function createTriggerDb () {
- var nanop = null;
+ logger.info('url is ' + dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
+ var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- var connectionAgent = new RequestAgent({
- maxSockets: maximumDbConnections
- });
-
- // no need for a promise here, but leaving code inplace until we prove out the question of cookie usage
- var promise = new Promise(function(resolve, reject) {
-
- logger.info('url is ' + dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- resolve(createDatabase (nanop));
-
- });
-
- return promise;
+ return createDatabase (nanop);
}
// Initialize the Provider Server
function init(server) {
+
if (server !== null) {
var address = server.address();
if (address === null) {
@@ -113,16 +93,17 @@
}
///
- var triggerDBPromise = createTriggerDb();
- triggerDBPromise.then(function (nanoDb) {
+ var nanoDb = createTriggerDb();
+ if (nanoDb === null) {
+ logger.error(tid, 'init', 'found an error creating database: ', err);
+ } else {
logger.info(tid, 'init', 'trigger storage database details: ', nanoDb);
- var providerUtils = new ProviderUtils (tid, logger, app, retriesBeforeDelete, nanoDb, triggerFireLimit, routerHost);
+ var providerUtils = new ProviderUtils (tid, logger, app, retriesBeforeDelete, nanoDb, routerHost);
var providerRAS = new ProviderRAS (tid, logger, providerUtils);
var providerHealth = new ProviderHealth (tid, logger, providerUtils);
var providerUpdate = new ProviderUpdate (tid, logger, providerUtils);
- var providerCreate = new ProviderCreate (tid, logger, providerUtils);
var providerDelete = new ProviderDelete (tid, logger, providerUtils);
// RAS Endpoint
@@ -134,16 +115,12 @@
// Endpoint for Update OR Create a Trigger
app.put(providerUpdate.endPoint, providerUtils.authorize, providerUpdate.update);
- // Endpoint for Creating a new Trigger
- app.post(providerCreate.endPoint, providerUtils.authorize, providerCreate.create);
-
// Endpoint for Deleting an existing Trigger.
app.delete(providerDelete.endPoint, providerUtils.authorize, providerDelete.delete);
providerUtils.initAllTriggers();
- }, function(err) {
- logger.info(tid, 'init', 'found an error creating database: ', err);
- });
+
+ }
}
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 2ca1292..ccb5c2c 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -1,5 +1,9 @@
const TRIGGER_DB_SUFFIX = 'cloudanttrigger';
+const DEFAULT_TRIGGER_COUNT = -1;
+const RETRIES_BEFORE_DELETE = 10;
module.exports = {
- TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX
+ TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
+ DEFAULT_TRIGGER_COUNT: DEFAULT_TRIGGER_COUNT,
+ RETRIES_BEFORE_DELETE: RETRIES_BEFORE_DELETE
};
diff --git a/provider/lib/create.js b/provider/lib/create.js
deleted file mode 100644
index a7d5080..0000000
--- a/provider/lib/create.js
+++ /dev/null
@@ -1,46 +0,0 @@
-module.exports = function(tid, logger, utils) {
-
- // Test Endpoint
- this.endPoint = '/cloudanttriggers/:id';
-
- // Create Logic
- this.create = function (req, res) {
-
- var method = 'PUT /cloudanttriggers';
-
- logger.info(tid, method);
- var args = typeof req.body === 'object' ? req.body : JSON.parse(req.body);
- if(args.maxTriggers > utils.triggersLimit) {
- // TODO: update error code to indicate that content provided is not correct
- logger.warn(tid, method, 'maxTriggers > ' + utils.triggersLimit + ' is not allowed');
- res.status(400).json({
- error: 'maxTriggers > ' + utils.triggersLimit + ' is not allowed'
- });
- return;
- } else if (!args.callback || !args.callback.action || !args.callback.action.name) {
- // TODO: update error code to indicate that content provided is not correct
- logger.warn(tid, method, 'Your callback is unknown for cloudant trigger:', args.callback);
- res.status(400).json({
- error: 'You callback is unknown for cloudant trigger.'
- });
- return;
- }
- var id = req.params.id;
- var trigger = utils.initTrigger(args, id);
- // 10 is number of retries to create a trigger.
- var promise = utils.createTrigger(trigger, 10);
- promise.then(function(newTrigger) {
- logger.info(tid, method, "Trigger was added and database is confirmed.", newTrigger);
- utils.addTriggerToDB(newTrigger, res);
- }, function(err) {
- logger.error(tid, method, "Trigger could not be created.", err);
- utils.deleteTrigger(id);
- res.status(400).json({
- message: "Trigger could not be created.",
- error: err
- });
- });
-
- };
-
-};
diff --git a/provider/lib/update.js b/provider/lib/update.js
index 8d51570..a467d29 100644
--- a/provider/lib/update.js
+++ b/provider/lib/update.js
@@ -28,7 +28,7 @@
var id = req.params.id;
var trigger = utils.initTrigger(args, id);
// 10 is number of retries to create a trigger.
- var promise = utils.createTrigger(trigger, 10);
+ var promise = utils.createTrigger(trigger, utils.retryCount);
promise.then(function(newTrigger) {
logger.info(tid, method, "Trigger was added and database is confirmed.", newTrigger);
utils.addTriggerToDB(newTrigger, res);
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index af24b85..10b3da6 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,6 +1,6 @@
var _ = require('lodash');
var request = require('request');
-var RequestAgent = require('agentkeepalive');
+var constants = require('./constants.js');
module.exports = function(
tid,
@@ -8,7 +8,6 @@
app,
retriesBeforeDelete,
triggerDB,
- triggersFireLimit,
routerHost
) {
@@ -17,9 +16,16 @@
this.app = app;
this.retriesBeforeDelete = retriesBeforeDelete;
this.triggerDB = triggerDB;
- this.triggersLimit = triggersFireLimit;
this.routerHost = routerHost;
+ this.logger.info (tid, 'utils', 'recieved database to store triggers: ' + triggerDB);
+
+ // this is the default trigger fire limit (in the event that is was not set during trigger creation)
+ this.defaultTriggerFireLimit = constants.DEFAULT_TRIGGER_COUNT;
+
+ // maximum number of times to create a trigger
+ this.retryCount = constants.RETRIES_BEFORE_DELETE;
+
// Log HTTP Requests
app.use(function(req, res, next) {
if (req.url.indexOf('/cloudanttriggers') === 0) {
@@ -31,6 +37,9 @@
this.module = 'utils';
this.triggers = {};
+ // we need a way of know if the triggers should fire without max fire constraint (ie fire infinite times)
+ this.unlimitedTriggerFires = false;
+
var that = this;
// Add a trigger: listen for changes and dispatch.
@@ -38,10 +47,6 @@
var method = 'createTrigger';
- // The maxSockets determines how many concurrent sockets the agent can have open per
- // host, is present in an agent by default with value ??.
- var maximumDbConnections = 50;
-
// Cleanup connection when trigger is deleted.
var sinceToUse = dataTrigger.since ? dataTrigger.since : "now";
var nanoConnection;
@@ -51,7 +56,6 @@
dbProtocol = dataTrigger.protocol;
}
- // input["accounturl"] = "https://" + host;
// unless specified host will default to accounturl without the https:// in front
var dbHost;
if (dataTrigger.host) {
@@ -61,10 +65,6 @@
dbHost = dbHost.replace('https://','');
}
- var connectionAgent = new RequestAgent({
- maxSockets: maximumDbConnections
- });
-
// both couch and cloudant should have their URLs in the username:password@host format
dbURL = dbProtocol + '://' + dataTrigger.user + ':' + dataTrigger.pass + '@' + dbHost;
@@ -76,38 +76,53 @@
logger.info(tid, method,'found trigger url: ', dbURL);
nanoConnection = require('nano')(dbURL);
- // no need for a promise here, but leaving code inplace until we prove out the question of cookie usage
- return new Promise(function(resolve, reject) {
+ try {
+
var triggeredDB = nanoConnection.use(dataTrigger.dbname);
+
// Listen for changes on this database.
- var feed = triggeredDB.follow({since: sinceToUse, include_docs: dataTrigger.includeDoc});
+ // always set the include doc setting to false
+ var feed = triggeredDB.follow({since: sinceToUse, include_docs: false});
dataTrigger.feed = feed;
that.triggers[dataTrigger.id] = dataTrigger;
feed.on('change', function (change) {
var triggerHandle = that.triggers[dataTrigger.id];
- logger.info(tid, method, 'Got change from', dataTrigger.dbname, change);
- if(triggerHandle && triggerHandle.triggersLeft > 0 && triggerHandle.retriesLeft > 0) {
- try {
- that.invokeWhiskAction(dataTrigger.id, change);
- } catch (e) {
- logger.error(tid, method, 'Exception occurred in callback', e);
+
+ logger.info(tid, method, 'Got change from', dataTrigger.dbname, change, triggerHandle);
+ logger.info(tid, method, 'Found triggerHandle', triggerHandle);
+
+ if (triggerHandle && triggerHandle.retriesLeft > 0) {
+
+ logger.info(tid, method, 'triggers left:', triggerHandle.triggersLeft);
+ logger.info(tid, method, 'retries left:', triggerHandle.retriesLeft);
+
+ if (triggerHandle.triggersLeft === -1) {
+ logger.info(tid, method, 'found a trigger fire limit set to -1. setting it to fire infinately many times');
+ that.unlimitedTriggerFires = true;
+ } else {
+ that.unlimitedTriggerFires = false;
+ }
+
+ if(that.unlimitedTriggerFires || triggerHandle.triggersLeft > 0) {
+ try {
+ logger.info(tid, method, 'found a valid trigger. lets fire this trigger', triggerHandle);
+ that.fireTrigger(dataTrigger.id, change);
+ } catch (e) {
+ logger.error(tid, method, 'Exception occurred in callback', e);
+ }
}
}
});
feed.follow();
- resolve(feed);
-
- }).then(function(feed) {
-
- return new Promise(function(resolve, reject) {
+ return new Promise(function(resolve, reject) {
feed.on('error', function (err) {
logger.error(tid, method,'Error occurred for trigger', dataTrigger.id, '(db ' + dataTrigger.dbname + '):', err);
- // revive the feed if an error ocurred for now
+ // revive the feed if an error occured for now
// the user should be in charge of removing the feeds
logger.info(tid, "attempting to recreate trigger", dataTrigger.id);
that.deleteTrigger(dataTrigger.id);
@@ -135,20 +150,43 @@
});
});
- }, function (err) {
- logger.info('caught an exception: ' + err);
- return Promise.reject(err);
- }).catch(function (err) {
+
+ } catch (err) {
logger.info('caught an exception: ' + err);
return Promise.reject(err);
- });
+ }
};
this.initTrigger = function (obj, id) {
- logger.info(tid, 'initTrigger', obj);
- var includeDoc = ((obj.includeDoc === true || obj.includeDoc.toString().trim().toLowerCase() === 'true')) || "false";
+ var method = 'initTrigger';
+
+ // validate parameters here
+ logger.info(tid, method, 'create has recieved the following request args', JSON.stringify(obj));
+
+ // if the trigger creation request has not set the max trigger fire limit
+ // we will set it here (default value can be updated in ./constants.js)
+ if (!obj.maxTriggers) {
+ logger.info(tid, method, 'maximum trigger fires has not been set by requester. setting it to the default value of infinity.');
+ logger.info(tid, method, 'setting trigger fire limit', that.defaultTriggerFireLimit)
+ obj.maxTriggers = that.defaultTriggerFireLimit;
+ } else {
+ logger.info(tid, method, 'maximum trigger fires has been set to:', obj.maxTriggers);
+ }
+
+ // if we find that includeDoc is set to true we should warn user here
+ // (note: this will only be the set for old feeds. we no longer allow
+ // this to be set for newly created feeds).
+ if (obj.includeDoc && (obj.includeDoc === true || obj.includeDoc.toString().trim().toLowerCase() === 'true')) {
+ logger.warn(tid, method, 'cloudant trigger feed: includeDoc parameter is no longer supported and will be ignored.');
+ }
+
+ var includeDoc;
+ if (obj.includeDoc) {
+ includeDoc = ((obj.includeDoc === true || obj.includeDoc.toString().trim().toLowerCase() === 'true')) || 'false';
+ }
+
var trigger = {
id: id,
accounturl: obj.accounturl,
@@ -180,9 +218,17 @@
if(!err) {
body.rows.forEach(function(trigger) {
var cloudantTrigger = that.initTrigger(trigger.doc, trigger.doc.id);
- // check here for triggers left if none left end here, and dont create
- if (cloudantTrigger.triggersLeft > 0) {
- that.createTrigger(cloudantTrigger, 10);
+
+ if (cloudantTrigger.triggersLeft === -1) {
+ logger.info(tid, method, 'found a trigger fire limit set to -1. setting it to fire infinately many times');
+ that.unlimitedTriggerFires = true;
+ } else {
+ that.unlimitedTriggerFires = false;
+ }
+
+ // check here for triggers left if none left end here, and don't create
+ if (that.unlimitedTriggerFires || cloudantTrigger.triggersLeft > 0) {
+ that.createTrigger(cloudantTrigger, that.retryCount);
} else {
logger.info(tid, method, 'found a trigger with no triggers left to fire off.');
}
@@ -262,20 +308,29 @@
};
- this.invokeWhiskAction = function (id, change) {
- var method = 'invokeWhiskAction';
+ this.fireTrigger = function (id, change) {
+ var method = 'fireTrigger';
var dataTrigger = that.triggers[id];
var apikey = dataTrigger.apikey;
var triggerName = dataTrigger.callback.action.name;
var triggerObj = that.parseQName(triggerName);
- logger.info(tid, method, 'invokeWhiskAction: change =', change);
+ logger.info(tid, method, 'fireTrigger: change =', change);
- var form = change.hasOwnProperty('doc') ? change.doc : change;
- // always store changes
- //var form = change;
+ var form = change;
+ // pass the fire trigger both the change and an object containing
+ // whisk related details
+ if (dataTrigger.includeDoc === true || dataTrigger.includeDoc === 'true') {
+ var whiskPayloadObject = {
+ 'error' : {
+ 'code' : 1,
+ 'message' : 'includeDoc parameter is no longer supported.'
+ }
+ };
+ form.whisk = whiskPayloadObject;
+ }
- logger.info(tid, method, 'invokeWhiskAction: form =', form);
+ logger.info(tid, method, 'fireTrigger: form =', form);
logger.info(tid, method, 'for trigger', id, 'invoking action', triggerName, 'with db update', JSON.stringify(form));
var host = 'https://'+routerHost+':'+443;
@@ -283,7 +338,11 @@
var auth = apikey.split(':');
logger.info(tid, method, uri, auth, form);
- dataTrigger.triggersLeft--;
+ // only manage trigger fires if they are not infinite
+ if (!that.unlimitedTriggerFires) {
+ logger.info(tid, method, 'found a trigger fire limit set to -1. setting it to fire infinately many times');
+ dataTrigger.triggersLeft--;
+ }
request({
method: 'post',
@@ -299,7 +358,12 @@
logger.info(tid, method, 'done http request, body', body);
if(error || response.statusCode >= 400) {
dataTrigger.retriesLeft--;
- dataTrigger.triggersLeft++; // setting the counter back to where it used to be
+
+ // only manage trigger fires if they are not infinite
+ if (!that.unlimitedTriggerFires) {
+ dataTrigger.triggersLeft++; // setting the counter back to where it used to be
+ }
+
logger.error(tid, method, 'there was an error invoking', id, response ? response.statusCode : response, error, body);
} else {
dataTrigger.retriesLeft = that.retriesBeforeDelete; // reset retry counter