Fixing numerous issues discovered whilst developing new event source (#3)
* Fixing issues discovered whilst developing new feed provider plugin.
- Firing triggers should return with status of request
- Removing dead code commented out
- Replace var with const/let to catch more errors
- Allow setting log level dynamically
* Errors incorrectly handled when validation fails
diff --git a/README.md b/README.md
index dc2dd75..472ca3d 100644
--- a/README.md
+++ b/README.md
@@ -73,8 +73,9 @@
- `EVENT_PROVIDER` - NPM module name for event provider plugin.
- `DB_URL` - Trigger DB Cloudant URL.
-- `DB_PREFIX` - Trigger DB table prefix.
+- `TRIGGERS_DB` - Trigger DB table name.
- `ROUTER_HOST` - OpenWhisk platform hostname.
+- (Optional) `LOG_LEVEL` - Set logging level (defaults to `info`)
### running feed provider
@@ -98,6 +99,12 @@
- `<apihost>` - OpenWhisk hostname for firing triggers.
- `<namespace>` - OpenWhisk namespace to install provider action packages
+*The `WSK_CLI` environment variable must refer to the compiled instance of the [Apache OpenWhisk CLI](https://github.com/apache/incubator-openwhisk-cli).*
+
+### optional parameters
+
+If the `EVENT_PROVIDER_LIB` environment variable is set, this will be used as the explicit location to install the event provider library from. This can be used to install from a non-published version of the library, i.e. on the filesystem or a Github repository. If this value is not set, the library will be installed from NPM using the `EVENT_PROVIDER` library name.
+
### actions
Running the script will result in the following actions being installed.
diff --git a/actions/event-actions/changesWebAction.js b/actions/event-actions/changesWebAction.js
index 856e068..5982681 100644
--- a/actions/event-actions/changesWebAction.js
+++ b/actions/event-actions/changesWebAction.js
@@ -36,7 +36,7 @@
common.verifyTriggerAuth(triggerData, false)
.then(() => EventProvider.validate(params))
.catch(err => {
- return reject(common.sendError(400, `Feed parameter validation failed`, err.message));
+ throw common.sendError(400, `Feed parameter validation failed`, err.message);
})
.then(validParams => {
Object.assign(newTrigger, validParams)
@@ -79,7 +79,6 @@
.catch(reject)
});
}
- // HOW TO UPDATE?
else if (params.__ow_method === "put") {
return new Promise(function (resolve, reject) {
@@ -88,7 +87,7 @@
common.verifyTriggerAuth(triggerData, false)
.then(() => EventProvider.validate(params))
.catch(err => {
- return reject(common.sendError(400, `Feed parameter validation failed`, err.message));
+ throw common.sendError(400, `Feed parameter validation failed`, err.message);
})
.then(validParams => {
Object.assign(updatedParams, validParams)
@@ -96,7 +95,7 @@
.then(() => db.getTrigger(triggerID))
.then(trigger => {
if (trigger.status && trigger.status.active === false) {
- return reject(common.sendError(400, `${params.triggerName} cannot be updated because it is disabled`));
+ throw common.sendError(400, `${params.triggerName} cannot be updated because it is disabled`);
}
return db.disableTrigger(triggerID, trigger, 0, 'updating');
})
diff --git a/installCatalog.sh b/installCatalog.sh
index 5fd8fda..8d39d5b 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -9,8 +9,7 @@
set -e
set -x
-: ${OPENWHISK_HOME:?"OPENWHISK_HOME must be set and non-empty"}
-WSK_CLI="$OPENWHISK_HOME/bin/wsk"
+: ${WSK_CLI:?"WSK_CLI must be set and non-empty"}
if [ $# -eq 0 ]; then
echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbtable> <apihost> <workers>"
@@ -24,6 +23,7 @@
NAMESPACE="$6"
WORKERS="$7"
ACTION_RUNTIME_VERSION=${ACTION_RUNTIME_VERSION:="nodejs:10"}
+EVENT_PROVIDER_LIB=${EVENT_PROVIDER_LIB:=$EVENT_PROVIDER}
# If the auth key file exists, read the key in the file. Otherwise, take the
# first argument as the key itself.
@@ -95,7 +95,7 @@
# make changesWebAction.zip
cp -f changesWeb_package.json package.json
npm install
-npm install $EVENT_PROVIDER
+npm install $EVENT_PROVIDER_LIB
if [ -e changesWebAction.zip ]; then
rm -rf changesWebAction.zip
diff --git a/provider/Logger.js b/provider/Logger.js
index eba1d09..c4b5261 100644
--- a/provider/Logger.js
+++ b/provider/Logger.js
@@ -6,6 +6,7 @@
var logger = new winston.Logger({
transports: [
new winston.transports.Console({
+ level: process.env.LOG_LEVEL || 'info',
timestamp: function() {
return moment.utc().format("YYYY-MM-DDTHH:mm:ss.SSS") + 'Z';
},
diff --git a/provider/app.js b/provider/app.js
index e8ba100..4cf7376 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -1,64 +1,66 @@
'use strict';
+
/**
* Service which can be configured to listen for triggers from a provider.
* The Provider will store, invoke, and POST whisk events appropriately.
*/
-var URL = require('url').URL;
-var http = require('http');
-var express = require('express');
-var bodyParser = require('body-parser');
-var bluebird = require('bluebird');
-var logger = require('./Logger');
+const URL = require('url').URL;
+const http = require('http');
+const express = require('express');
+const bodyParser = require('body-parser');
+const bluebird = require('bluebird');
+const Cloudant = require('@cloudant/cloudant')
+const redis = require('redis')
+bluebird.promisifyAll(redis.RedisClient.prototype);
+const logger = require('./Logger');
-//var ProviderUtils = require('./lib/utils.js');
-var ProviderTriggersManager = require('./lib/triggers_manager.js');
-var ProviderHealth = require('./lib/health.js');
-var ProviderRAS = require('./lib/ras.js');
-var ProviderActivation = require('./lib/active.js');
-var constants = require('./lib/constants.js');
+const ProviderTriggersManager = require('./lib/triggers_manager.js');
+const ProviderHealth = require('./lib/health.js');
+const ProviderRAS = require('./lib/ras.js');
+const ProviderActivation = require('./lib/active.js');
+const constants = require('./lib/constants.js');
// Initialize the Express Application
-var app = express();
+const app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.set('port', process.env.PORT || 8080);
-// Allow invoking servers with self-signed certificates.
-process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
+if (!process.env.DB_URL) {
+ throw new Error('Missing DB_URL environment parameter.')
+}
-// If it does not already exist, create the triggers database. This is the database that will
-// store the managed triggers.
-var dbUrl = process.env.DB_URL;
-var dbPrefix = process.env.DB_PREFIX;
-var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
-// OPTIONAL
-var redisUrl = process.env.REDIS_URL;
+const dbUrl = process.env.DB_URL;
+// This is the database that will store the managed triggers.
+const databaseName = process.env.TRIGGERS_DB || constants.DEFAULT_TRIGGERS_DB;
-// OPTIONAL
-var monitoringAuth = process.env.MONITORING_AUTH;
-var monitoringInterval = process.env.MONITORING_INTERVAL || constants.MONITOR_INTERVAL;
+// Optional Configuration Parameters
+const redisUrl = process.env.REDIS_URL;
-var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
-var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
+// Optional Configuration Parameters
+const monitoringAuth = process.env.MONITORING_AUTH;
+const monitoringInterval = process.env.MONITORING_INTERVAL || constants.MONITOR_INTERVAL;
+
+const filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
+const viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
if (!process.env.EVENT_PROVIDER) {
- throw new Exception('Missing EVENT_PROVIDER environment parameter.')
+ throw new Error('Missing EVENT_PROVIDER environment parameter.')
}
const EventProvider = require(process.env.EVENT_PROVIDER)
// Create the Provider Server
-var server = http.createServer(app);
+const server = http.createServer(app);
server.listen(app.get('port'), function() {
logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
});
function createDatabase() {
- var method = 'createDatabase';
- logger.info(method, 'creating the trigger database');
+ const method = 'createDatabase';
+ logger.info(method, 'creating the trigger database', dbUrl);
- console.log(dbUrl);
- var cloudant = require('@cloudant/cloudant')(dbUrl);
+ const cloudant = Cloudant(dbUrl);
if (cloudant !== null) {
return new Promise(function (resolve, reject) {
@@ -70,7 +72,7 @@
logger.info(method, 'failed to create trigger database:', databaseName, err);
}
- var viewDD = {
+ const viewDD = {
views: {
triggers_by_worker: {
map: function (doc) {
@@ -85,7 +87,7 @@
createDesignDoc(cloudant.db.use(databaseName), viewDDName, viewDD)
.then(db => {
- var filterDD = {
+ const filterDD = {
filters: {
triggers_by_worker:
function (doc, req) {
@@ -98,7 +100,7 @@
})
.then(db => {
if (monitoringAuth) {
- var filterDD = {
+ const filterDD = {
filters: {
canary_docs:
function (doc, req) {
@@ -128,7 +130,7 @@
}
function createDesignDoc(db, ddName, designDoc) {
- var method = 'createDesignDoc';
+ const method = 'createDesignDoc';
return new Promise(function(resolve, reject) {
@@ -153,13 +155,11 @@
}
function createRedisClient() {
- var method = 'createRedisClient';
+ const method = 'createRedisClient';
return new Promise(function(resolve, reject) {
if (redisUrl) {
- var client;
- var redis = require('redis');
- bluebird.promisifyAll(redis.RedisClient.prototype);
+ let client;
if (redisUrl.startsWith('rediss://')) {
// If this is a rediss: connection, we have some other steps.
client = redis.createClient(redisUrl, {
@@ -189,12 +189,12 @@
// Initialize the Provider Server
function init(server, EventProvider) {
- var method = 'init';
- var cloudantDb;
- var providerTriggersManager;
+ const method = 'init';
+ let cloudantDb;
+ let providerTriggersManager;
if (server !== null) {
- var address = server.address();
+ const address = server.address();
if (address === null) {
logger.error(method, 'Error initializing server. Perhaps port is already in use.');
process.exit(-1);
@@ -211,9 +211,9 @@
return providerTriggersManager.initRedis();
})
.then(() => {
- var providerRAS = new ProviderRAS();
- var providerHealth = new ProviderHealth(logger, providerTriggersManager);
- var providerActivation = new ProviderActivation(logger, providerTriggersManager);
+ const providerRAS = new ProviderRAS();
+ const providerHealth = new ProviderHealth(logger, providerTriggersManager);
+ const providerActivation = new ProviderActivation(logger, providerTriggersManager);
// RAS Endpoint
app.get(providerRAS.endPoint, providerRAS.ras);
@@ -235,7 +235,6 @@
.catch(err => {
logger.error(method, 'an error occurred creating database:', err);
});
-
}
init(server, EventProvider);
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 305f1c7..3950bb5 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -1,4 +1,4 @@
-const TRIGGER_DB_SUFFIX = 'cloudanttrigger';
+const DEFAULT_TRIGGERS_DB = 'triggers_db';
const DEFAULT_MAX_TRIGGERS = -1;
const RETRY_ATTEMPTS = 12;
const RETRY_DELAY = 1000; //in milliseconds
@@ -12,7 +12,7 @@
module.exports = {
- TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
+ DEFAULT_TRIGGERS_DB: DEFAULT_TRIGGERS_DB,
DEFAULT_MAX_TRIGGERS: DEFAULT_MAX_TRIGGERS,
RETRY_ATTEMPTS: RETRY_ATTEMPTS,
RETRY_DELAY: RETRY_DELAY,
diff --git a/provider/lib/triggers_manager.js b/provider/lib/triggers_manager.js
index 3feacf2..ee162c5 100644
--- a/provider/lib/triggers_manager.js
+++ b/provider/lib/triggers_manager.js
@@ -47,81 +47,6 @@
});
};
- // Add a trigger: listen for changes and dispatch.
- /**
- this.createTrigger = function(triggerData) {
- var method = 'createTrigger';
-
- var Cloudant = require('@cloudant/cloudant');
- var cloudantConnection;
-
- if (triggerData.iamApiKey) {
- var dbURL = `${triggerData.protocol}://${triggerData.host}`;
- if (triggerData.port) {
- dbURL += ':' + triggerData.port;
- }
- cloudantConnection = new Cloudant({ url: dbURL, plugins: { iamauth: { iamApiKey: triggerData.iamApiKey, iamTokenUrl: triggerData.iamUrl } } });
- }
- else {
- var url = `${triggerData.protocol}://${triggerData.user}:${triggerData.pass}@${triggerData.host}`;
- if (triggerData.port) {
- url += ':' + triggerData.port;
- }
- cloudantConnection = Cloudant(url);
- }
-
- try {
- var triggeredDB = cloudantConnection.use(triggerData.dbname);
-
- // Listen for changes on this database.
- var feed = triggeredDB.follow({since: triggerData.since, include_docs: false});
- if (triggerData.filter) {
- feed.filter = triggerData.filter;
- }
- if (triggerData.query_params) {
- feed.query_params = triggerData.query_params;
- }
-
- triggerData.feed = feed;
- self.triggers[triggerData.id] = triggerData;
-
- feed.on('change', function (change) {
- var triggerHandle = self.triggers[triggerData.id];
- if (triggerHandle && shouldFireTrigger(triggerHandle) && hasTriggersRemaining(triggerHandle)) {
- logger.info(method, 'Trigger', triggerData.id, 'got change from', triggerData.dbname);
- try {
- fireTrigger(triggerData.id, change);
- } catch (e) {
- logger.error(method, 'Exception occurred while firing trigger', triggerData.id, e);
- }
- }
- });
-
- feed.follow();
-
- return new Promise(function(resolve, reject) {
- feed.on('error', function (err) {
- logger.error(method,'Error occurred for trigger', triggerData.id, '(db ' + triggerData.dbname + '):', err);
- reject(err);
- });
-
- feed.on('confirm', function () {
- logger.info(method, 'Added cloudant data trigger', triggerData.id, 'listening for changes in database', triggerData.dbname);
- if (isMonitoringTrigger(triggerData.monitor, triggerData.id)) {
- self.monitorStatus.triggerStarted = "success";
- }
- resolve(triggerData.id);
- });
- });
-
- } catch (err) {
- logger.info(method, 'caught an exception for trigger', triggerData.id, err);
- return Promise.reject(err);
- }
-
- };
-*/
-
function initTrigger(newTrigger) {
const maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS;
@@ -218,7 +143,7 @@
var host = 'https://' + self.routerHost;
var uri = host + '/api/v1/namespaces/' + triggerObj.namespace + '/triggers/' + triggerObj.name;
- postTrigger(triggerData, form, uri, 0)
+ return postTrigger(triggerData, form, uri, 0)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully fired');
if (isMonitoringTrigger(triggerData.monitor, triggerId)) {
@@ -236,6 +161,7 @@
})
.catch(err => {
logger.error(method, err);
+ throw err
});
}