Feed trigger update (#145)
* support updating feed parameters
* update tests
* move positive trigger update test to health class
* remove trailing whitespace
* add updatable boolean field to annotation
* write parameters to database after successfully disabling the trigger
* remove trailing whitespace
* missing var declaration
diff --git a/actions/changes.js b/actions/changes.js
index 2bb9ea1..012bf90 100644
--- a/actions/changes.js
+++ b/actions/changes.js
@@ -3,13 +3,14 @@
function main(msg) {
let eventMap = {
- CREATE: 'put',
+ CREATE: 'post',
READ: 'get',
- // UPDATE: 'put',
+ UPDATE: 'put',
DELETE: 'delete'
};
// for creation -> CREATE
// for reading -> READ
+ // for updating -> UPDATE
// for deletion -> DELETE
var lifecycleEvent = msg.lifecycleEvent;
diff --git a/actions/changesWebAction.js b/actions/changesWebAction.js
index 8a405e2..73186d7 100644
--- a/actions/changesWebAction.js
+++ b/actions/changesWebAction.js
@@ -19,7 +19,7 @@
var db = nano.db.use(params.DB_NAME);
var workers = params.workers instanceof Array ? params.workers : [];
- if (params.__ow_method === "put") {
+ if (params.__ow_method === "post") {
// check for parameter errors
if (!params.dbname) {
@@ -138,12 +138,62 @@
});
});
}
+ else if (params.__ow_method === "put") {
+ return new Promise(function (resolve, reject) {
+ verifyTriggerAuth(triggerURL, params.authKey, true)
+ .then(() => {
+ return getTrigger(db, triggerID);
+ })
+ .then(trigger => {
+ if (params.filter || params.query_params) {
+ var updatedParams = {
+ filter: trigger.filter,
+ query_params: trigger.query_params
+ };
+
+ if (params.filter) {
+ updatedParams.filter = params.filter;
+ }
+ if (params.query_params) {
+ if (updatedParams.filter) {
+ if (typeof params.query_params === 'object') {
+ updatedParams.query_params = params.query_params;
+ }
+ else if (typeof params.query_params === 'string') {
+ try {
+ updatedParams.query_params = JSON.parse(params.query_params);
+ }
+ catch (e) {
+ reject(sendError(400, 'The query_params parameter cannot be parsed. Ensure it is valid JSON.'));
+ }
+ }
+ } else {
+ reject(sendError(400, 'The query_params parameter is only allowed if the filter parameter is defined'));
+ }
+ }
+ return updateTrigger(db, triggerID, trigger, updatedParams);
+ } else {
+ reject(sendError(400, 'At least one of filter or query_params parameters must be supplied'));
+ }
+ })
+ .then(() => {
+ resolve({
+ statusCode: 200,
+ headers: {'Content-Type': 'application/json'},
+ body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
+ });
+ })
+ .catch(err => {
+ reject(err);
+ });
+ });
+ }
else if (params.__ow_method === "delete") {
return new Promise(function (resolve, reject) {
verifyTriggerAuth(triggerURL, params.authKey, true)
.then(() => {
- return updateTrigger(db, triggerID, 0);
+ return disableTrigger(db, triggerID, 0);
})
.then(() => {
return deleteTrigger(db, triggerID, 0);
@@ -161,7 +211,7 @@
});
}
else {
- return sendError(400, 'lifecycleEvent must be CREATE or DELETE');
+ return sendError(400, 'unsupported lifecycle event');
}
}
@@ -232,7 +282,7 @@
});
}
-function updateTrigger(triggerDB, triggerID, retryCount) {
+function disableTrigger(triggerDB, triggerID, retryCount) {
return new Promise(function(resolve, reject) {
@@ -245,7 +295,7 @@
if (err) {
if (err.statusCode === 409 && retryCount < 5) {
setTimeout(function () {
- updateTrigger(triggerDB, triggerID, (retryCount + 1))
+ disableTrigger(triggerDB, triggerID, (retryCount + 1))
.then(() => {
resolve();
})
@@ -303,6 +353,53 @@
});
}
+function updateTrigger(triggerDB, triggerID, existing, params) {
+
+ return new Promise(function(resolve, reject) {
+ var message = 'Automatically disabled trigger while updating';
+ var status = {
+ 'active': false,
+ 'dateChanged': Date.now(),
+ 'reason': {'kind': 'AUTO', 'statusCode': undefined, 'message': message}
+ };
+ existing.status = status;
+ triggerDB.insert(existing, triggerID, function (err) {
+ if (err) {
+ reject(sendError(err.statusCode, 'there was an error while disabling the trigger in the database.', err.message));
+ }
+ else {
+ resolve();
+ }
+ });
+ })
+ .then(() => {
+ return getTrigger(triggerDB, triggerID);
+ })
+ .then(trigger => {
+ for (var key in params) {
+ if (params[key]) {
+ trigger[key] = params[key];
+ }
+ }
+ var status = {
+ 'active': true,
+ 'dateChanged': Date.now()
+ };
+ trigger.status = status;
+
+ return new Promise(function(resolve, reject) {
+ triggerDB.insert(trigger, triggerID, function (err) {
+ if (err) {
+ reject(sendError(err.statusCode, 'there was an error while updating and re-enabling the trigger in the database.', err.message));
+ }
+ else {
+ resolve();
+ }
+ });
+ });
+ });
+}
+
function verifyTriggerAuth(triggerURL, authKey, isDelete) {
var auth = authKey.split(':');
diff --git a/installCatalog.sh b/installCatalog.sh
index 69286df..0aef5a6 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -78,7 +78,7 @@
-t 90000 \
-a feed true \
-a description 'Database change feed' \
- -a parameters '[ {"name":"dbname", "required":true}, {"name": "filter", "required":false, "type": "string", "description": "The name of your Cloudant database filter"}, {"name": "query_params", "required":false, "description": "JSON Object containing query parameters that are passed to the filter"} ]' \
+ -a parameters '[ {"name":"dbname", "required":true, "updatable":false}, {"name": "filter", "required":false, "updatable":true, "type": "string", "description": "The name of your Cloudant database filter"}, {"name": "query_params", "required":false, "updatable":true, "description": "JSON Object containing query parameters that are passed to the filter"} ]' \
-a sampleInput '{ "dbname": "mydb", "filter": "mailbox/by_status", "query_params": {"status": "new"} }'
# Cloudant web feed action
diff --git a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
index 48b2f16..b75c1dc 100644
--- a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
+++ b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
@@ -193,7 +193,6 @@
activations should be(1)
}
-
it should "return correct status and configuration" in withAssetCleaner(wskprops) {
val currentTime = s"${System.currentTimeMillis}"
@@ -280,4 +279,106 @@
CloudantUtil.unsetUp(myCloudantCreds)
}
}
+
+ it should "update filter and query_params parameters" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ implicit val wskProps = wp
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("fetched package cloudant")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ val username = myCloudantCreds.user
+ val password = myCloudantCreds.password
+ val host = myCloudantCreds.host()
+ val dbName = myCloudantCreds.dbname
+ val filter = "test_filter/fruit"
+ val queryParams = JsObject("type" -> JsString("tomato"))
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> username.toJson,
+ "password" -> password.toJson,
+ "host" -> host.toJson,
+ "dbname" -> dbName.toJson,
+ "filter" -> filter.toJson,
+ "query_params" -> queryParams
+ ))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ val actionName = s"$packageName/$feed"
+ val readRunResult = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "READ".toJson,
+ "authKey" -> wskProps.authKey.toJson
+ ))
+
+ withActivation(wsk.activation, readRunResult) {
+ activation =>
+ activation.response.success shouldBe true
+
+ inside(activation.response.result) {
+ case Some(result) =>
+ val config = result.getFields("config").head.asInstanceOf[JsObject].fields
+
+ config should contain("filter" -> filter.toJson)
+ config should contain("query_params" -> queryParams)
+ }
+ }
+
+ val updatedFilter = "test_filter/vegetable"
+ val updatedQueryParams = JsObject("type" -> JsString("celery"))
+
+ val updateRunAction = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "UPDATE".toJson,
+ "authKey" -> wskProps.authKey.toJson,
+ "filter" -> updatedFilter.toJson,
+ "query_params" -> updatedQueryParams
+ ))
+
+ withActivation(wsk.activation, updateRunAction) {
+ activation =>
+ activation.response.success shouldBe true
+ }
+
+ val runResult = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "READ".toJson,
+ "authKey" -> wskProps.authKey.toJson
+ ))
+
+ withActivation(wsk.activation, runResult) {
+ activation =>
+ activation.response.success shouldBe true
+
+ inside(activation.response.result) {
+ case Some(result) =>
+ val config = result.getFields("config").head.asInstanceOf[JsObject].fields
+
+ config should contain("filter" -> updatedFilter.toJson)
+ config should contain("query_params" -> updatedQueryParams)
+ }
+ }
+ } finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
}
diff --git a/tests/src/test/scala/system/packages/CloudantFeedTests.scala b/tests/src/test/scala/system/packages/CloudantFeedTests.scala
index 49192bc..f6390f3 100644
--- a/tests/src/test/scala/system/packages/CloudantFeedTests.scala
+++ b/tests/src/test/scala/system/packages/CloudantFeedTests.scala
@@ -396,7 +396,216 @@
} finally {
CloudantUtil.unsetUp(myCloudantCreds)
}
+ }
- }
+ it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+ (wp, assetHelper) =>
+ implicit val wskProps = wp
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("fetched package cloudant")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ val username = myCloudantCreds.user
+ val password = myCloudantCreds.password
+ val host = myCloudantCreds.host()
+ val dbName = myCloudantCreds.dbname
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> username.toJson,
+ "password" -> password.toJson,
+ "host" -> host.toJson,
+ "dbname" -> dbName.toJson
+ ))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ val actionName = s"$packageName/$feed"
+ val run = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "UPDATE".toJson,
+ "authKey" -> wskProps.authKey.toJson
+ ))
+
+ withActivation(wsk.activation, run) {
+ activation =>
+ activation.response.success shouldBe false
+ }
+ } finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
+
+ it should "reject trigger update when query_params is passed in and no filter is defined" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ implicit val wskProps = wp
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ // the package alarms should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("fetched package cloudant")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ val username = myCloudantCreds.user
+ val password = myCloudantCreds.password
+ val host = myCloudantCreds.host()
+ val dbName = myCloudantCreds.dbname
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> username.toJson,
+ "password" -> password.toJson,
+ "host" -> host.toJson,
+ "dbname" -> dbName.toJson
+ ))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ val actionName = s"$packageName/$feed"
+ val run = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "UPDATE".toJson,
+ "authKey" -> wskProps.authKey.toJson,
+ "query_params" -> JsObject("type" -> JsString("tomato"))
+ ))
+
+ withActivation(wsk.activation, run) {
+ activation =>
+ activation.response.success shouldBe false
+ }
+ } finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
+
+ it should "filter out triggers that do not meet the filter criteria before and after updating query_params" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ implicit val wskProps = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+ val actionName = s"$packageName/$feed"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("Fetching cloudant package.")
+ packageGetResult.stdout should include("ok")
+
+ println("Creating cloudant package binding.")
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ //Create filter design doc
+ val filterDesignDoc = CloudantUtil.createDesignFromFile(CloudantUtil.FILTER_DDOC_PATH).toString
+ val getResponse = CloudantUtil.createDocument(myCloudantCreds, filterDesignDoc)
+ getResponse.get("ok").getAsString shouldBe "true"
+
+ println("Creating cloudant trigger feed.")
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> myCloudantCreds.user.toJson,
+ "password" -> myCloudantCreds.password.toJson,
+ "host" -> myCloudantCreds.host().toJson,
+ "dbname" -> myCloudantCreds.dbname.toJson,
+ "filter" -> "test_filter/fruit".toJson,
+ "query_params" -> JsObject("type" -> JsString("tomato"))))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ // Create test docs in cloudant and assert that document was inserted successfully
+ println("Creating a test doc-1 in the cloudant")
+ val response1 = CloudantUtil.createDocument(myCloudantCreds, "{\"kind\":\"fruit\", \"type\":\"apple\"}")
+ response1.get("ok").getAsString() should be("true")
+
+ println("Checking for activations")
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+ println(s"Found activation size (should be exactly 1): $activations")
+ activations should be(1)
+
+ println("Creating a test doc-2 in the cloudant")
+ val response2 = CloudantUtil.createDocument(myCloudantCreds, "{\"kind\":\"dairy\",\"type\":\"butter\"}")
+ response2.get("ok").getAsString() should be("true")
+
+ println("checking for new activations (not expected since it should be filtered out)")
+ val noNewActivations = wsk.activation.pollFor(N = 2, Some(triggerName)).length
+ println(s"Found activation size (should still be 1): $noNewActivations")
+ noNewActivations should be(1)
+
+ println("Updating trigger query_params.")
+ val feedUpdateResult = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "UPDATE".toJson,
+ "authKey" -> wskProps.authKey.toJson,
+ "query_params" -> JsObject("type" -> JsString("avocado"))
+ ))
+ feedUpdateResult.stdout should include("ok")
+
+ val runResult = wsk.action.invoke(actionName, parameters = Map(
+ "triggerName" -> triggerName.toJson,
+ "lifecycleEvent" -> "READ".toJson,
+ "authKey" -> wskProps.authKey.toJson
+ ))
+
+ withActivation(wsk.activation, runResult) {
+ activation =>
+ activation.response.success shouldBe true
+
+ inside(activation.response.result) {
+ case Some(result) =>
+ val config = result.getFields("config").head.asInstanceOf[JsObject].fields
+
+ config should contain("filter" -> "test_filter/fruit".toJson)
+ config should contain("query_params" -> JsObject("type" -> JsString("avocado")))
+ }
+ }
+
+ println("Creating a test doc-3 in the cloudant")
+ val response3 = CloudantUtil.createDocument(myCloudantCreds, "{\"kind\":\"berry\", \"type\":\"avocado\"}")
+ response3.get("ok").getAsString() should be("true")
+
+ println("Checking for new activations (should now have 2)")
+ val newActivations = wsk.activation.pollFor(N = 3, Some(triggerName), retries = 30).length
+ println(s"Found activation size (should be 2): $newActivations")
+ newActivations should be(2)
+
+ }
+ finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
}