Support for worker assignment on trigger creation (#132)
* Support for multiple workers
* pin the version of nano to avoid breaking changes
diff --git a/actions/changesWebAction.js b/actions/changesWebAction.js
index c6c445e..5ee9e2e 100644
--- a/actions/changesWebAction.js
+++ b/actions/changesWebAction.js
@@ -9,13 +9,14 @@
return sendError(400, 'no trigger name parameter was provided');
}
- var triggerID = params.triggerName;
- var triggerParts = parseQName(triggerID);
+ var triggerParts = parseQName(params.triggerName);
+ var triggerID = `:${triggerParts.namespace}:${triggerParts.name}`;
var triggerURL = `https://${params.apihost}/api/v1/namespaces/${triggerParts.namespace}/triggers/${triggerParts.name}`;
var nano = require('nano')(params.DB_URL);
var db = nano.db.use(params.DB_NAME);
+ var workers = params.workers instanceof Array ? params.workers : [];
if (params.__ow_method === "put") {
@@ -67,23 +68,25 @@
query_params: query_params,
status: {
'active': true,
- 'dateChanged': new Date().toISOString(),
+ 'dateChanged': new Date().toISOString()
}
};
return new Promise(function (resolve, reject) {
verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
- return verifyUserDB(newTrigger);
+ return getWorkerID(db, workers);
})
- .then(() => {
- return createTrigger(db, triggerID, newTrigger);
+ .then((worker) => {
+ console.log('trigger will be assigned to worker ' + worker);
+ newTrigger.worker = worker;
+ return createTrigger(db, triggerID, newTrigger);
})
.then(() => {
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
- body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+ body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
});
})
.catch(err => {
@@ -106,7 +109,7 @@
resolve({
statusCode: 200,
headers: {'Content-Type': 'application/json'},
- body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+ body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
});
})
.catch(err => {
@@ -119,6 +122,44 @@
}
}
+function getWorkerID(db, availabeWorkers) {
+
+ return new Promise((resolve, reject) => {
+ var workerID = availabeWorkers[0] || 'worker0';
+
+ if (availabeWorkers.length > 1) {
+ db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
+ if (!err) {
+ var triggersByWorker = {};
+
+ availabeWorkers.forEach(worker => {
+ triggersByWorker[worker] = 0;
+ });
+
+ body.rows.forEach(row => {
+ if (row.key in triggersByWorker) {
+ triggersByWorker[row.key] = row.value;
+ }
+ });
+
+ // find which worker has the least number of assigned triggers
+ for (var worker in triggersByWorker) {
+ if (triggersByWorker[worker] < triggersByWorker[workerID]) {
+ workerID = worker;
+ }
+ }
+ resolve(workerID);
+ } else {
+ reject(err);
+ }
+ });
+ }
+ else {
+ resolve(workerID);
+ }
+ });
+}
+
function createTrigger(triggerDB, triggerID, newTrigger) {
return new Promise(function(resolve, reject) {
@@ -150,7 +191,8 @@
updateTrigger(triggerDB, triggerID, (retryCount + 1))
.then(() => {
resolve();
- }).catch(err => {
+ })
+ .catch(err => {
reject(err);
});
}, 1000);
@@ -268,7 +310,7 @@
return {
statusCode: statusCode,
headers: { 'Content-Type': 'application/json' },
- body: new Buffer(JSON.stringify(params)).toString('base64'),
+ body: new Buffer(JSON.stringify(params)).toString('base64')
};
}
diff --git a/installCatalog.sh b/installCatalog.sh
index 2083cc2..69286df 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -4,7 +4,7 @@
# automatically
#
# To run this command
-# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>
set -e
set -x
@@ -14,7 +14,7 @@
if [ $# -eq 0 ]
then
-echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>"
+echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>"
fi
AUTH="$1"
@@ -22,6 +22,7 @@
DB_URL="$3"
DB_NAME="${4}cloudanttrigger"
APIHOST="$5"
+WORKERS="$6"
# If the auth key file exists, read the key in the file. Otherwise, take the
# first argument as the key itself.
@@ -57,10 +58,19 @@
-p dbname '' \
-p apihost "$APIHOST"
-$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
- -p DB_URL "$DB_URL" \
- -p DB_NAME "$DB_NAME" \
- -p apihost "$APIHOST"
+if [ -n "$WORKERS" ];
+then
+ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
+ -p DB_URL "$DB_URL" \
+ -p DB_NAME "$DB_NAME" \
+ -p apihost "$APIHOST" \
+ -p workers "$WORKERS"
+else
+ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
+ -p DB_URL "$DB_URL" \
+ -p DB_NAME "$DB_NAME" \
+ -p apihost "$APIHOST"
+fi
# Cloudant feed action
diff --git a/package.json b/package.json
index e70f4a6..3f8286c 100644
--- a/package.json
+++ b/package.json
@@ -12,7 +12,7 @@
"moment": "^2.11.1",
"lodash": "^3.10.1",
"request": "2.69.0",
- "nano": "^6.2.0",
+ "nano": "6.3.0",
"json-stringify-safe": "^5.0.1",
"http-status-codes": "^1.0.5",
"request-promise": "^1.0.2",
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 9e8c2f4..ea6d906 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -127,36 +127,33 @@
this.disableTrigger = function(id, statusCode, message) {
var method = 'disableTrigger';
- //only active/master provider should update the database
- if (utils.activeHost === utils.host) {
- triggerDB.get(id, function (err, existing) {
- if (!err) {
- if (!existing.status || existing.status.active === true) {
- var updatedTrigger = existing;
- var status = {
- 'active': false,
- 'dateChanged': new Date().toISOString(),
- 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
- };
- updatedTrigger.status = status;
+ triggerDB.get(id, function (err, existing) {
+ if (!err) {
+ if (!existing.status || existing.status.active === true) {
+ var updatedTrigger = existing;
+ var status = {
+ 'active': false,
+ 'dateChanged': new Date().toISOString(),
+ 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
+ };
+ updatedTrigger.status = status;
- triggerDB.insert(updatedTrigger, id, function (err) {
- if (err) {
- logger.error(method, 'there was an error while disabling', id, 'in database. ' + err);
- }
- else {
- logger.info(method, 'trigger', id, 'successfully disabled in database');
- }
- });
- }
+ triggerDB.insert(updatedTrigger, id, function (err) {
+ if (err) {
+ logger.error(method, 'there was an error while disabling', id, 'in database. ' + err);
+ }
+ else {
+ logger.info(method, 'trigger', id, 'successfully disabled in database');
+ }
+ });
}
- else {
- logger.info(method, 'could not find', id, 'in database');
- //make sure it is removed from memory as well
- utils.deleteTrigger(id);
- }
- });
- }
+ }
+ else {
+ logger.info(method, 'could not find', id, 'in database');
+ //make sure it is removed from memory as well
+ utils.deleteTrigger(id);
+ }
+ });
};
// Delete a trigger: stop listening for changes and remove it.
diff --git a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
index 0a5cb45..effc91b 100644
--- a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
+++ b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
@@ -88,7 +88,7 @@
println(s"created a test doc at $now")
// get activation list of the trigger, expecting exactly 1
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 60).length
val nowPoll = Instant.now(Clock.systemUTC())
println(s"Found activation size ($nowPoll): $activations")
withClue("Change feed trigger count: ") { activations should be(1) }
diff --git a/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala b/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala
new file mode 100644
index 0000000..725f2fb
--- /dev/null
+++ b/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import com.jayway.restassured.http.ContentType
+import common._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.{StringJsonFormat, _}
+import spray.json.{pimpAny, _}
+import system.CloudantUtil
+import whisk.core.database.test.DatabaseScriptTestUtils
+import whisk.utils.JsHelpers
+
+
+@RunWith(classOf[JUnitRunner])
+class CloudantMultiWorkersTests extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with WskTestHelpers
+ with StreamLogging
+ with DatabaseScriptTestUtils {
+
+ val wskprops = WskProps()
+ val wsk = new Wsk
+ val auth = WhiskProperties.getBasicAuth
+ val user = auth.fst
+ val password = auth.snd
+
+ val webAction = "/whisk.system/cloudantWeb/changesWebAction"
+ val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http"
+
+ val myCloudantCreds = CloudantUtil.Credential.makeFromVCAPFile("cloudantNoSQLDB", this.getClass.getSimpleName)
+
+ behavior of "Cloudant multi workers feed tests"
+
+ it should "create triggers assigned to worker0 and worker1" in withAssetCleaner(WskProps()) {
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+
+ val worker0Trigger = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val worker0Params = JsObject(
+ "triggerName" -> JsString(worker0Trigger),
+ "authKey" -> JsString(s"$user:$password"),
+ "username" -> myCloudantCreds.user.toJson,
+ "password" -> myCloudantCreds.password.toJson,
+ "host" -> myCloudantCreds.host().toJson,
+ "dbname" -> myCloudantCreds.dbname.toJson,
+ "workers" -> JsArray(JsString("worker0")))
+
+ val worker1Trigger = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val worker1Params = JsObject(
+ "triggerName" -> JsString(worker1Trigger),
+ "authKey" -> JsString(s"$user:$password"),
+ "username" -> myCloudantCreds.user.toJson,
+ "password" -> myCloudantCreds.password.toJson,
+ "host" -> myCloudantCreds.host().toJson,
+ "dbname" -> myCloudantCreds.dbname.toJson,
+ "workers" -> JsArray(JsString("worker0"), JsString("worker1")))
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ wsk.trigger.create(worker0Trigger)
+
+ //create trigger feed and assign to worker0
+ makePutCallWithExpectedResult(worker0Params, 200)
+
+ wsk.trigger.create(worker1Trigger)
+
+ //create trigger feed and assign to worker0 or worker1
+ //the one with the least assigned triggers will be chosen
+ makePutCallWithExpectedResult(worker1Params, 200)
+
+ val dbName = s"${dbPrefix}cloudanttrigger"
+ val documents = getAllDocs(dbName)
+
+ val worker1Doc = documents
+ .fields("rows")
+ .convertTo[List[JsObject]]
+ .filter(_.fields("id").convertTo[String].equals(s":_:$worker1Trigger"))
+
+ JsHelpers.getFieldPath(worker1Doc(0), "doc", "worker") shouldBe Some(JsString("worker1"))
+ } finally {
+ //delete triggers
+ wsk.trigger.delete(worker0Trigger)
+ wsk.trigger.delete(worker1Trigger)
+
+ makeDeleteCallWithExpectedResult(worker0Params, 200)
+ makeDeleteCallWithExpectedResult(worker1Params, 200)
+
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
+
+ def makePutCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+ val response = RestAssured.given()
+ .contentType(ContentType.JSON)
+ .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+ .body(params.toString())
+ .put(webActionURL)
+ assert(response.statusCode() == expectedCode)
+ }
+
+ def makeDeleteCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+ val response = RestAssured.given()
+ .contentType(ContentType.JSON)
+ .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+ .body(params.toString())
+ .delete(webActionURL)
+ assert(response.statusCode() == expectedCode)
+ }
+
+
+}