blob: 1947497a6f484587b4e385681fe175456df0f53c [file] [log] [blame]
var _ = require('lodash');
var request = require('request');
var CronJob = require('cron').CronJob;
var constants = require('./constants.js');
module.exports = function(
logger,
app,
triggerDB,
routerHost
) {
this.logger = logger;
this.app = app;
this.triggerDB = triggerDB;
this.routerHost = routerHost;
// this is the default trigger fire limit (in the event that is was not set during trigger creation)
this.defaultTriggerFireLimit = constants.DEFAULT_MAX_TRIGGERS;
this.retryDelay = constants.RETRY_DELAY;
this.retryAttempts = constants.RETRY_ATTEMPTS;
// Log HTTP Requests
app.use(function(req, res, next) {
if (req.url.indexOf('/alarmtriggers') === 0) {
logger.info('HttpRequest', req.method, req.url);
}
next();
});
this.module = 'utils';
this.triggers = {};
var that = this;
this.createTrigger = function(newTrigger) {
var method = 'createTrigger';
try {
var triggerIdentifier = that.getTriggerIdentifier(newTrigger.apikey, newTrigger.namespace, newTrigger.name);
var cronHandle;
return new Promise(function(resolve, reject) {
// to avoid multiple cron jobs for the same trigger we will only create a cron job if
// the trigger is not already in the list of identified triggers
if (!(triggerIdentifier in that.triggers)) {
cronHandle = new CronJob(newTrigger.cron,
function onTick() {
var triggerHandle = that.triggers[triggerIdentifier];
if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
try {
that.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
} catch (e) {
logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
}
}
}
);
logger.info(method, triggerIdentifier, 'starting cron job');
cronHandle.start();
that.triggers[triggerIdentifier] = {
cron: newTrigger.cron,
cronHandle: cronHandle,
triggersLeft: newTrigger.maxTriggers,
maxTriggers: newTrigger.maxTriggers,
apikey: newTrigger.apikey,
name: newTrigger.name,
namespace: newTrigger.namespace
};
}
else {
logger.info(method, triggerIdentifier, 'already exists');
}
resolve(triggerIdentifier);
});
} catch (err) {
return Promise.reject(err);
}
};
this.fireTrigger = function (namespace, name, payload, apikey) {
var method = 'fireTrigger';
var triggerIdentifier = that.getTriggerIdentifier(apikey, namespace, name);
var routerHost = process.env.ROUTER_HOST;
var host = "https://" + routerHost + ":443";
// https://github.com/openwhisk/openwhisk-alarms-trigger/issues/9
// resolved this in create.js by validating apikey and failing with a useful message if it is
// not present
var auth = apikey.split(':');
var dataTrigger = that.triggers[triggerIdentifier];
var uri = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
that.postTrigger(dataTrigger, payload, uri, auth, that.retryAttempts)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully fired');
}).catch(err => {
logger.error(method, err);
});
};
this.postTrigger = function (dataTrigger, payload, uri, auth, retryCount) {
var method = 'postTrigger';
return new Promise(function(resolve, reject) {
request({
method: 'post',
uri: uri,
auth: {
user: auth[0],
pass: auth[1]
},
json: payload
}, function(error, response) {
try {
var triggerIdentifier = that.getTriggerIdentifier(dataTrigger.apikey, dataTrigger.namespace, dataTrigger.name);
logger.info(method, triggerIdentifier, 'http post request, STATUS:', response ? response.statusCode : response);
if (error || response.statusCode >= 400) {
logger.error(method, 'there was an error invoking', triggerIdentifier, response ? response.statusCode : error);
if (!error && [408, 429, 500, 502, 503].indexOf(response.statusCode) === -1) {
//delete dead triggers
that.deleteTrigger(dataTrigger.namespace, dataTrigger.name, dataTrigger.apikey);
reject('Deleted dead trigger ' + triggerIdentifier);
}
else {
if (retryCount > 0) {
logger.info(method, 'attempting to fire trigger again', triggerIdentifier, 'Retry Count:', (retryCount - 1));
setTimeout(function () {
that.postTrigger(dataTrigger, payload, uri, auth, (retryCount - 1))
.then(triggerId => {
resolve(triggerId);
}).catch(err => {
reject(err);
});
}, that.retryDelay);
} else {
reject('Unable to reach server to fire trigger ' + triggerIdentifier);
}
}
} else {
// only manage trigger fires if they are not infinite
if (dataTrigger.maxTriggers !== -1) {
dataTrigger.triggersLeft--;
}
logger.info(method, 'fired', triggerIdentifier, dataTrigger.triggersLeft, 'triggers left');
if (dataTrigger.triggersLeft === 0) {
logger.info(method, 'no more triggers left, deleting', triggerIdentifier);
that.deleteTrigger(dataTrigger.namespace, dataTrigger.name, dataTrigger.apikey);
}
resolve(triggerIdentifier);
}
}
catch(err) {
reject('Exception occurred while firing trigger ' + err);
}
});
});
};
this.deleteTrigger = function (namespace, name, apikey) {
var method = 'deleteTrigger';
var triggerIdentifier = that.getTriggerIdentifier(apikey, namespace, name);
if (that.triggers[triggerIdentifier]) {
if (that.triggers[triggerIdentifier].cronHandle) {
that.triggers[triggerIdentifier].cronHandle.stop();
}
delete that.triggers[triggerIdentifier];
logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted');
that.deleteTriggerFromDB(triggerIdentifier);
return true;
}
else {
logger.info(method, 'trigger', triggerIdentifier, 'could not be found');
return false;
}
};
this.deleteTriggerFromDB = function (triggerIdentifier) {
var method = 'deleteTriggerFromDB';
that.triggerDB.get(triggerIdentifier, function (err, body) {
if (!err) {
that.triggerDB.destroy(body._id, body._rev, function (err) {
if (err) {
logger.error(method, 'there was an error while deleting', triggerIdentifier, 'from database');
}
else {
logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from database');
}
});
}
else {
logger.error(method, 'there was an error while deleting', triggerIdentifier, 'from database');
}
});
};
this.getTriggerIdentifier = function (apikey, namespace, name) {
return apikey + '/' + namespace + '/' + name;
};
this.initAllTriggers = function () {
var method = 'initAllTriggers';
logger.info(method, 'resetting system from last state');
that.triggerDB.view('filters', 'only_triggers', {include_docs: true}, function(err, body) {
if (!err) {
body.rows.forEach(function(trigger) {
//check if trigger still exists in whisk db
var namespace = trigger.doc.namespace;
var name = trigger.doc.name;
var apikey = trigger.doc.apikey;
var triggerIdentifier = that.getTriggerIdentifier(apikey, namespace, name);
logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists');
var host = 'https://' + routerHost +':'+ 443;
var triggerURL = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
var auth = apikey.split(':');
request({
method: 'get',
url: triggerURL,
auth: {
user: auth[0],
pass: auth[1]
}
}, function(error, response) {
//delete from database if trigger no longer exists (404)
if (!error && response.statusCode === 404) {
logger.info(method, 'trigger', triggerIdentifier, 'could not be found');
that.deleteTriggerFromDB(triggerIdentifier);
}
else {
that.createTrigger(trigger.doc)
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created successfully');
})
.catch(err => {
logger.error(method, err);
that.deleteTriggerFromDB(triggerIdentifier);
});
}
});
});
}
else {
logger.error(method, 'could not get latest state from database');
}
});
};
this.sendError = function (method, code, message, res) {
logger.warn(method, message);
res.status(code).json({error: message});
};
this.authorize = function(req, res, next) {
if (!req.headers.authorization) {
return that.sendError(400, 'Malformed request, authentication header expected', res);
}
var parts = req.headers.authorization.split(' ');
if (parts[0].toLowerCase() !== 'basic' || !parts[1]) {
return that.sendError(400, 'Malformed request, basic authentication expected', res);
}
var auth = new Buffer(parts[1], 'base64').toString();
auth = auth.match(/^([^:]*):(.*)$/);
if (!auth) {
return that.sendError(400, 'Malformed request, authentication invalid', res);
}
req.user = {
uuid: auth[1],
key: auth[2]
};
next();
};
};