add retry support on trigger fire (#48)
diff --git a/provider/Logger.js b/provider/Logger.js
index 311fe97..b17deaa 100644
--- a/provider/Logger.js
+++ b/provider/Logger.js
@@ -3,8 +3,6 @@
var winston = require('winston');
var safeStringify = require('json-stringify-safe');
-var emailRegex = /(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))/g;
-
var logger = new winston.Logger({
transports: [
new winston.transports.Console({
@@ -16,11 +14,6 @@
return '[' + options.timestamp() +'] ['+ options.level.toUpperCase() +'] [??] [alarmsTrigger] ' + options.message;
}
})
- ],
- filters: [
- function maskEmails(level, msg) {
- return msg.replace(emailRegex, 'xxxxxxxx');
- }
]
});
diff --git a/provider/app.js b/provider/app.js
index 6f52d25..15d16ec 100755
--- a/provider/app.js
+++ b/provider/app.js
@@ -25,10 +25,6 @@
// Whisk API Router Host
var routerHost = process.env.ROUTER_HOST || 'localhost';
-// Maximum number of times to retry the invocation of an action
-// before deleting the associated trigger
-var retriesBeforeDelete = constants.RETRIES_BEFORE_DELETE;
-
// Allow invoking servers with self-signed certificates.
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
@@ -116,7 +112,7 @@
.then(nanoDb => {
logger.info('init', 'trigger storage database details:', nanoDb);
- var providerUtils = new ProviderUtils (logger, app, retriesBeforeDelete, nanoDb, routerHost);
+ var providerUtils = new ProviderUtils (logger, app, nanoDb, routerHost);
var providerRAS = new ProviderRAS (logger);
var providerHealth = new ProviderHealth (logger, providerUtils);
var providerCreate = new ProviderCreate (logger, providerUtils);
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 5f8955c..62ac278 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -1,9 +1,11 @@
const TRIGGER_DB_SUFFIX = 'alarmservice';
-const DEFAULT_TRIGGER_COUNT = 1000000;
-const RETRIES_BEFORE_DELETE = 10;
+const DEFAULT_MAX_TRIGGERS = 1000000;
+const RETRY_ATTEMPTS = 10;
+const RETRY_DELAY = 1000; //in milliseconds
module.exports = {
TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
- DEFAULT_TRIGGER_COUNT: DEFAULT_TRIGGER_COUNT,
- RETRIES_BEFORE_DELETE: RETRIES_BEFORE_DELETE
+ DEFAULT_MAX_TRIGGERS: DEFAULT_MAX_TRIGGERS,
+ RETRY_ATTEMPTS: RETRY_ATTEMPTS,
+ RETRY_DELAY: RETRY_DELAY
};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 46136f1..1947497 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -6,19 +6,19 @@
module.exports = function(
logger,
app,
- retriesBeforeDelete,
triggerDB,
routerHost
) {
this.logger = logger;
this.app = app;
- this.retriesBeforeDelete = retriesBeforeDelete;
this.triggerDB = triggerDB;
this.routerHost = routerHost;
// this is the default trigger fire limit (in the event that is was not set during trigger creation)
- this.defaultTriggerFireLimit = constants.DEFAULT_TRIGGER_COUNT;
+ this.defaultTriggerFireLimit = constants.DEFAULT_MAX_TRIGGERS;
+ this.retryDelay = constants.RETRY_DELAY;
+ this.retryAttempts = constants.RETRY_ATTEMPTS;
// Log HTTP Requests
app.use(function(req, res, next) {
@@ -50,14 +50,17 @@
function onTick() {
var triggerHandle = that.triggers[triggerIdentifier];
if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
- that.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+ try {
+ that.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
+ }
}
}
);
logger.info(method, triggerIdentifier, 'starting cron job');
cronHandle.start();
-
that.triggers[triggerIdentifier] = {
cron: newTrigger.cron,
cronHandle: cronHandle,
@@ -87,42 +90,76 @@
// https://github.com/openwhisk/openwhisk-alarms-trigger/issues/9
// resolved this in create.js by validating apikey and failing with a useful message if it is
// not present
- var keyParts = apikey.split(':');
- var triggerHandle = that.triggers[triggerIdentifier];
+ var auth = apikey.split(':');
+ var dataTrigger = that.triggers[triggerIdentifier];
+ var uri = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
- request({
- method: 'POST',
- uri: host + '/api/v1/namespaces/' + namespace + '/triggers/' + name,
- json: payload,
- auth: {
- user: keyParts[0],
- pass: keyParts[1]
- }
- }, function(err, res, body) {
- if (triggerHandle) {
- logger.info(method, triggerIdentifier, 'http post request, STATUS:', res ? res.statusCode : res);
- if (err || res.statusCode >= 400) {
- logger.error(method, 'there was an error invoking', triggerIdentifier, res ? res.statusCode : res, err, body);
- if (!err && [408, 429, 500, 503].indexOf(res.statusCode) === -1) {
- //delete dead triggers
- that.deleteTrigger(triggerHandle.namespace, triggerHandle.name, triggerHandle.apikey);
- }
- } else {
- // only manage trigger fires if they are not infinite
- if (triggerHandle.maxTriggers !== -1) {
- triggerHandle.triggersLeft--;
- }
- logger.info(method, 'fired', triggerIdentifier, 'with body', body, triggerHandle.triggersLeft, 'triggers left');
- }
+ that.postTrigger(dataTrigger, payload, uri, auth, that.retryAttempts)
+ .then(triggerId => {
+ logger.info(method, 'Trigger', triggerId, 'was successfully fired');
+ }).catch(err => {
+ logger.error(method, err);
+ });
+ };
- if (triggerHandle.triggersLeft === 0) {
- logger.info('onTick', 'no more triggers left, deleting');
- that.deleteTrigger(triggerHandle.namespace, triggerHandle.name, triggerHandle.apikey);
+ this.postTrigger = function (dataTrigger, payload, uri, auth, retryCount) {
+ var method = 'postTrigger';
+
+ return new Promise(function(resolve, reject) {
+
+ request({
+ method: 'post',
+ uri: uri,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ json: payload
+ }, function(error, response) {
+ try {
+ var triggerIdentifier = that.getTriggerIdentifier(dataTrigger.apikey, dataTrigger.namespace, dataTrigger.name);
+ logger.info(method, triggerIdentifier, 'http post request, STATUS:', response ? response.statusCode : response);
+
+ if (error || response.statusCode >= 400) {
+ logger.error(method, 'there was an error invoking', triggerIdentifier, response ? response.statusCode : error);
+ if (!error && [408, 429, 500, 502, 503].indexOf(response.statusCode) === -1) {
+ //delete dead triggers
+ that.deleteTrigger(dataTrigger.namespace, dataTrigger.name, dataTrigger.apikey);
+ reject('Deleted dead trigger ' + triggerIdentifier);
+ }
+ else {
+ if (retryCount > 0) {
+ logger.info(method, 'attempting to fire trigger again', triggerIdentifier, 'Retry Count:', (retryCount - 1));
+ setTimeout(function () {
+ that.postTrigger(dataTrigger, payload, uri, auth, (retryCount - 1))
+ .then(triggerId => {
+ resolve(triggerId);
+ }).catch(err => {
+ reject(err);
+ });
+ }, that.retryDelay);
+ } else {
+ reject('Unable to reach server to fire trigger ' + triggerIdentifier);
+ }
+ }
+ } else {
+ // only manage trigger fires if they are not infinite
+ if (dataTrigger.maxTriggers !== -1) {
+ dataTrigger.triggersLeft--;
+ }
+ logger.info(method, 'fired', triggerIdentifier, dataTrigger.triggersLeft, 'triggers left');
+
+ if (dataTrigger.triggersLeft === 0) {
+ logger.info(method, 'no more triggers left, deleting', triggerIdentifier);
+ that.deleteTrigger(dataTrigger.namespace, dataTrigger.name, dataTrigger.apikey);
+ }
+ resolve(triggerIdentifier);
+ }
}
- }
- else {
- logger.info(method, 'trigger', triggerIdentifier, 'was deleted between invocations');
- }
+ catch(err) {
+ reject('Exception occurred while firing trigger ' + err);
+ }
+ });
});
};
@@ -197,7 +234,7 @@
user: auth[0],
pass: auth[1]
}
- }, function(error, response, body) {
+ }, function(error, response) {
//delete from database if trigger no longer exists (404)
if (!error && response.statusCode === 404) {
logger.info(method, 'trigger', triggerIdentifier, 'could not be found');