Applying filter on all db changes since 0 takes too long (#82)
diff --git a/provider/app.js b/provider/app.js
index 63d2079..2746a54 100755
--- a/provider/app.js
+++ b/provider/app.js
@@ -23,7 +23,7 @@
app.set('port', process.env.PORT || 8080);
// Allow invoking servers with self-signed certificates.
-process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
+process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
// If it does not already exist, create the triggers database. This is the database that will
// store the managed triggers.
@@ -34,7 +34,8 @@
var dbPrefix = process.env.DB_PREFIX;
var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
var redisUrl = process.env.REDIS_URL;
-var ddname = '_design/' + constants.DESIGN_DOC_NAME;
+var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
+var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
// Create the Provider Server
var server = http.createServer(app);
@@ -42,56 +43,88 @@
logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
});
-function createDatabase(nanop) {
+function createDatabase() {
var method = 'createDatabase';
logger.info(method, 'creating the trigger database');
- return new Promise(function(resolve, reject) {
- nanop.db.create(databaseName, function (err, body) {
- if (!err) {
- logger.info(method, 'created trigger database:', databaseName);
- }
- else if (err.statusCode !== 412) {
- logger.info(method, 'failed to create trigger database:', databaseName, err);
- }
- var db = nanop.db.use(databaseName);
+ var nano = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- var only_triggers_by_worker = function(doc, req) {
- return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') || (doc.worker === req.query.worker));
- }.toString();
+ if (nano !== null) {
+ return new Promise(function (resolve, reject) {
+ nano.db.create(databaseName, function (err, body) {
+ if (!err) {
+ logger.info(method, 'created trigger database:', databaseName);
+ }
+ else if (err.statusCode !== 412) {
+ logger.info(method, 'failed to create trigger database:', databaseName, err);
+ }
- db.get(ddname, function (error, body) {
- if (error) {
- //new design doc
- db.insert({
- filters: {
- only_triggers_by_worker: only_triggers_by_worker
- },
- }, ddname, function (error, body) {
- if (error && error.statusCode !== 409) {
- reject("filter could not be created: " + error);
+ var viewDD = {
+ views: {
+ triggers_by_worker: {
+ map: function (doc) {
+ if (doc.maxTriggers) {
+ emit(doc.worker || 'worker0', 1);
+ }
+ }.toString(),
+ reduce: '_count'
}
- resolve(db);
- });
- }
- else {
+ }
+ };
+
+ createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
+ .then((db) => {
+ var filterDD = {
+ filters: {
+ triggers_by_worker:
+ function (doc, req) {
+ return doc.maxTriggers && ((!doc.worker && req.query.worker === 'worker0') ||
+ (doc.worker === req.query.worker));
+ }.toString()
+ }
+ };
+ return createDesignDoc(db, filterDDName, filterDD);
+ })
+ .then((db) => {
resolve(db);
- }
+ })
+ .catch(err => {
+ reject(err);
+ });
+
});
});
- });
-}
-
-function createTriggerDb() {
- var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
- if (nanop !== null) {
- return createDatabase(nanop);
}
else {
Promise.reject('nano provider did not get created. check db URL: ' + dbHost);
}
}
+function createDesignDoc(db, ddName, designDoc) {
+ var method = 'createDesignDoc';
+
+ return new Promise(function(resolve, reject) {
+
+ db.get(ddName, function (error, body) {
+ if (error) {
+ //new design doc
+ db.insert(designDoc, ddName, function (error, body) {
+ if (error && error.statusCode !== 409) {
+ logger.error(method, error);
+ reject('design doc could not be created: ' + error);
+ }
+ else {
+ resolve(db);
+ }
+ });
+ }
+ else {
+ resolve(db);
+ }
+ });
+ });
+}
+
function createRedisClient() {
var method = 'createRedisClient';
@@ -101,11 +134,11 @@
bluebird.promisifyAll(redis.RedisClient.prototype);
var client = redis.createClient(redisUrl);
- client.on("connect", function () {
+ client.on('connect', function () {
resolve(client);
});
- client.on("error", function (err) {
+ client.on('error', function (err) {
logger.error(method, 'Error connecting to redis', err);
reject(err);
});
@@ -130,7 +163,7 @@
}
}
- createTriggerDb()
+ createDatabase()
.then(db => {
nanoDb = db;
return createRedisClient();
@@ -154,7 +187,8 @@
app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
providerUtils.initAllTriggers();
- }).catch(err => {
+ })
+ .catch(err => {
logger.error(method, 'an error occurred creating database:', err);
});
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index e45e204..f5d7915 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -3,8 +3,9 @@
const RETRY_ATTEMPTS = 10;
const RETRY_DELAY = 1000; //in milliseconds
const REDIS_KEY = 'active';
-const DESIGN_DOC_NAME = 'triggers';
-const FILTER_FUNCTION = 'only_triggers_by_worker';
+const FILTERS_DESIGN_DOC = 'triggerFilters';
+const VIEWS_DESIGN_DOC = 'triggerViews';
+const TRIGGERS_BY_WORKER = 'triggers_by_worker';
module.exports = {
@@ -13,6 +14,7 @@
RETRY_ATTEMPTS: RETRY_ATTEMPTS,
RETRY_DELAY: RETRY_DELAY,
REDIS_KEY: REDIS_KEY,
- DESIGN_DOC_NAME: DESIGN_DOC_NAME,
- FILTER_FUNCTION: FILTER_FUNCTION
+ FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
+ VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
+ TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 359126e..847b4cf 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -14,7 +14,7 @@
this.triggers = {};
this.endpointAuth = process.env.ENDPOINT_AUTH;
this.routerHost = process.env.ROUTER_HOST || 'localhost';
- this.worker = process.env.WORKER || "worker0";
+ this.worker = process.env.WORKER || 'worker0';
this.host = process.env.HOST_INDEX || 'host0';
this.hostMachine = process.env.HOST_MACHINE;
this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
@@ -24,55 +24,47 @@
var retryDelay = constants.RETRY_DELAY;
var retryAttempts = constants.RETRY_ATTEMPTS;
- var ddname = constants.DESIGN_DOC_NAME;
- var filter = constants.FILTER_FUNCTION;
+ var filterDDName = constants.FILTERS_DESIGN_DOC;
+ var viewDDName = constants.VIEWS_DESIGN_DOC;
+ var triggersByWorker = constants.TRIGGERS_BY_WORKER;
var utils = this;
this.createTrigger = function(triggerIdentifier, newTrigger) {
var method = 'createTrigger';
try {
- var cronHandle;
return new Promise(function(resolve, reject) {
- // to avoid multiple cron jobs for the same trigger we will only create a cron job if
- // the trigger is not already in the list of identified triggers
- if (!(triggerIdentifier in utils.triggers)) {
- cronHandle = new CronJob(newTrigger.cron,
- function onTick() {
- if (utils.activeHost === utils.host) {
- var triggerHandle = utils.triggers[triggerIdentifier];
- if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
- try {
- utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
- } catch (e) {
- logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
- }
+ var cronHandle = new CronJob(newTrigger.cron,
+ function onTick() {
+ if (utils.activeHost === utils.host) {
+ var triggerHandle = utils.triggers[triggerIdentifier];
+ if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
+ try {
+ utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e);
}
}
}
- );
- logger.info(method, triggerIdentifier, 'starting cron job');
- cronHandle.start();
+ }
+ );
+ logger.info(method, triggerIdentifier, 'starting cron job');
+ cronHandle.start();
- var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
+ var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
- utils.triggers[triggerIdentifier] = {
- cron: newTrigger.cron,
- cronHandle: cronHandle,
- triggersLeft: maxTriggers,
- maxTriggers: maxTriggers,
- apikey: newTrigger.apikey,
- name: newTrigger.name,
- namespace: newTrigger.namespace
- };
- }
- else {
- logger.info(method, triggerIdentifier, 'already exists');
- }
+ utils.triggers[triggerIdentifier] = {
+ cron: newTrigger.cron,
+ cronHandle: cronHandle,
+ triggersLeft: maxTriggers,
+ maxTriggers: maxTriggers,
+ apikey: newTrigger.apikey,
+ name: newTrigger.name,
+ namespace: newTrigger.namespace
+ };
resolve(triggerIdentifier);
});
-
} catch (err) {
return Promise.reject(err);
}
@@ -82,7 +74,7 @@
var method = 'fireTrigger';
var triggerIdentifier = utils.getTriggerIdentifier(apikey, namespace, name);
- var host = "https://" + utils.routerHost + ":443";
+ var host = 'https://' + utils.routerHost + ':443';
var auth = apikey.split(':');
var dataTrigger = utils.triggers[triggerIdentifier];
var uri = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
@@ -95,7 +87,8 @@
utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers');
logger.error(method, 'no more triggers left, disabled', triggerIdentifier);
}
- }).catch(err => {
+ })
+ .catch(err => {
logger.error(method, err);
});
};
@@ -133,7 +126,8 @@
utils.postTrigger(dataTrigger, payload, uri, auth, (retryCount + 1))
.then(triggerId => {
resolve(triggerId);
- }).catch(err => {
+ })
+ .catch(err => {
reject(err);
});
}, retryDelay);
@@ -216,25 +210,26 @@
this.initAllTriggers = function() {
var method = 'initAllTriggers';
+ //follow the trigger DB
+ utils.setupFollow('now');
+
logger.info(method, 'resetting system from last state');
-
- triggerDB.changes({ since: 0, include_docs: true, filter: ddname + '/' + filter, worker: utils.worker }, (err, changes) => {
+ triggerDB.view(viewDDName, triggersByWorker, {reduce: false, include_docs: true, key: utils.worker}, function(err, body) {
if (!err) {
- changes.results.forEach(function (change) {
- var triggerIdentifier = change.id;
- var doc = change.doc;
+ body.rows.forEach(function (trigger) {
+ var triggerIdentifier = trigger.id;
+ var doc = trigger.doc;
- if (!doc.status || doc.status.active === true) {
+ if ((!doc.status || doc.status.active === true) && !(triggerIdentifier in utils.triggers)) {
//check if trigger still exists in whisk db
var namespace = doc.namespace;
var name = doc.name;
var apikey = doc.apikey;
- logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists');
-
var host = 'https://' + utils.routerHost + ':' + 443;
var triggerURL = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
var auth = apikey.split(':');
+ logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists');
request({
method: 'get',
url: triggerURL,
@@ -247,13 +242,14 @@
if (!error && utils.shouldDisableTrigger(response.statusCode)) {
var message = 'Automatically disabled after receiving a ' + response.statusCode + ' status code on init trigger';
utils.disableTrigger(triggerIdentifier, response.statusCode, message);
- logger.error(method, 'trigger', triggerIdentifier, 'has been disabled due to status code', response.statusCode);
+ logger.error(method, 'trigger', triggerIdentifier, 'has been disabled due to status code:', response.statusCode);
}
else {
utils.createTrigger(triggerIdentifier, doc)
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created successfully');
- }).catch(err => {
+ })
+ .catch(err => {
var message = 'Automatically disabled after receiving exception on init trigger: ' + err;
utils.disableTrigger(triggerIdentifier, undefined, message);
logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -261,11 +257,7 @@
}
});
}
- else {
- logger.info(method, 'ignoring trigger', triggerIdentifier, 'since it is disabled.');
- }
});
- utils.setupFollow(changes.last_seq);
} else {
logger.error(method, 'could not get latest state from database', err);
}
@@ -279,7 +271,7 @@
var feed = triggerDB.follow({
since: seq,
include_docs: true,
- filter: ddname + '/' + filter,
+ filter: filterDDName + '/' + triggersByWorker,
query_params: {worker: utils.worker}
});
@@ -300,7 +292,8 @@
utils.createTrigger(triggerIdentifier, doc)
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created successfully');
- }).catch(err => {
+ })
+ .catch(err => {
var message = 'Automatically disabled after receiving exception on create trigger: ' + err;
utils.disableTrigger(triggerIdentifier, undefined, message);
logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err);
@@ -372,14 +365,14 @@
var subscriber = redisClient.duplicate();
//create a subscriber client that listens for requests to perform swap
- subscriber.on("message", function (channel, message) {
+ subscriber.on('message', function (channel, message) {
if (message === 'host0' || message === 'host1') {
- logger.info(method, message, "set to active host in channel", channel);
+ logger.info(method, message, 'set to active host in channel', channel);
utils.activeHost = message;
}
});
- subscriber.on("error", function (err) {
+ subscriber.on('error', function (err) {
logger.error(method, 'Error connecting to redis', err);
reject(err);
});