Support for worker assignment on trigger creation (#132)

* Support for multiple workers

* pin the version of nano to avoid breaking changes
diff --git a/actions/changesWebAction.js b/actions/changesWebAction.js
index c6c445e..5ee9e2e 100644
--- a/actions/changesWebAction.js
+++ b/actions/changesWebAction.js
@@ -9,13 +9,14 @@
         return sendError(400, 'no trigger name parameter was provided');
     }
 
-    var triggerID = params.triggerName;
-    var triggerParts = parseQName(triggerID);
+    var triggerParts = parseQName(params.triggerName);
+    var triggerID = `:${triggerParts.namespace}:${triggerParts.name}`;
 
     var triggerURL = `https://${params.apihost}/api/v1/namespaces/${triggerParts.namespace}/triggers/${triggerParts.name}`;
 
     var nano = require('nano')(params.DB_URL);
     var db = nano.db.use(params.DB_NAME);
+    var workers = params.workers instanceof Array ? params.workers : [];
 
     if (params.__ow_method === "put") {
 
@@ -67,23 +68,25 @@
             query_params: query_params,
             status: {
                 'active': true,
-                'dateChanged': new Date().toISOString(),
+                'dateChanged': new Date().toISOString()
             }
         };
 
         return new Promise(function (resolve, reject) {
             verifyTriggerAuth(triggerURL, params.authKey, false)
             .then(() => {
-                 return verifyUserDB(newTrigger);
+                return getWorkerID(db, workers);
             })
-            .then(() => {
-                 return createTrigger(db, triggerID, newTrigger);
+            .then((worker) => {
+                console.log('trigger will be assigned to worker ' + worker);
+                newTrigger.worker = worker;
+                return createTrigger(db, triggerID, newTrigger);
             })
             .then(() => {
                  resolve({
                      statusCode: 200,
                      headers: {'Content-Type': 'application/json'},
-                     body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+                     body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
                  });
             })
             .catch(err => {
@@ -106,7 +109,7 @@
                 resolve({
                     statusCode: 200,
                     headers: {'Content-Type': 'application/json'},
-                    body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'),
+                    body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64')
                 });
             })
             .catch(err => {
@@ -119,6 +122,44 @@
     }
 }
 
+function getWorkerID(db, availabeWorkers) {
+
+    return new Promise((resolve, reject) => {
+        var workerID = availabeWorkers[0] || 'worker0';
+
+        if (availabeWorkers.length > 1) {
+            db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) {
+                if (!err) {
+                    var triggersByWorker = {};
+
+                    availabeWorkers.forEach(worker => {
+                        triggersByWorker[worker] = 0;
+                    });
+
+                    body.rows.forEach(row => {
+                        if (row.key in triggersByWorker) {
+                            triggersByWorker[row.key] = row.value;
+                        }
+                    });
+
+                    // find which worker has the least number of assigned triggers
+                    for (var worker in triggersByWorker) {
+                        if (triggersByWorker[worker] < triggersByWorker[workerID]) {
+                            workerID = worker;
+                        }
+                    }
+                    resolve(workerID);
+                } else {
+                    reject(err);
+                }
+            });
+        }
+        else {
+            resolve(workerID);
+        }
+    });
+}
+
 function createTrigger(triggerDB, triggerID, newTrigger) {
 
     return new Promise(function(resolve, reject) {
@@ -150,7 +191,8 @@
                                 updateTrigger(triggerDB, triggerID, (retryCount + 1))
                                 .then(() => {
                                     resolve();
-                                }).catch(err => {
+                                })
+                                .catch(err => {
                                     reject(err);
                                 });
                             }, 1000);
@@ -268,7 +310,7 @@
     return {
         statusCode: statusCode,
         headers: { 'Content-Type': 'application/json' },
-        body: new Buffer(JSON.stringify(params)).toString('base64'),
+        body: new Buffer(JSON.stringify(params)).toString('base64')
     };
 }
 
diff --git a/installCatalog.sh b/installCatalog.sh
index 2083cc2..69286df 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -4,7 +4,7 @@
 # automatically
 #
 # To run this command
-# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>
 
 set -e
 set -x
@@ -14,7 +14,7 @@
 
 if [ $# -eq 0 ]
 then
-echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>"
+echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>"
 fi
 
 AUTH="$1"
@@ -22,6 +22,7 @@
 DB_URL="$3"
 DB_NAME="${4}cloudanttrigger"
 APIHOST="$5"
+WORKERS="$6"
 
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
@@ -57,10 +58,19 @@
     -p dbname '' \
     -p apihost "$APIHOST"
 
-$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
-     -p DB_URL "$DB_URL" \
-     -p DB_NAME "$DB_NAME" \
-     -p apihost "$APIHOST"
+if [ -n "$WORKERS" ];
+then
+    $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME" \
+        -p apihost "$APIHOST" \
+        -p workers "$WORKERS"
+else
+    $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no cloudantWeb \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME" \
+        -p apihost "$APIHOST"
+fi
 
 # Cloudant feed action
 
diff --git a/package.json b/package.json
index e70f4a6..3f8286c 100644
--- a/package.json
+++ b/package.json
@@ -12,7 +12,7 @@
     "moment": "^2.11.1",
     "lodash": "^3.10.1",
     "request": "2.69.0",
-    "nano": "^6.2.0",
+    "nano": "6.3.0",
     "json-stringify-safe": "^5.0.1",
     "http-status-codes": "^1.0.5",
     "request-promise": "^1.0.2",
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 9e8c2f4..ea6d906 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -127,36 +127,33 @@
     this.disableTrigger = function(id, statusCode, message) {
         var method = 'disableTrigger';
 
-        //only active/master provider should update the database
-        if (utils.activeHost === utils.host) {
-            triggerDB.get(id, function (err, existing) {
-                if (!err) {
-                    if (!existing.status || existing.status.active === true) {
-                        var updatedTrigger = existing;
-                        var status = {
-                            'active': false,
-                            'dateChanged': new Date().toISOString(),
-                            'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
-                        };
-                        updatedTrigger.status = status;
+        triggerDB.get(id, function (err, existing) {
+            if (!err) {
+                if (!existing.status || existing.status.active === true) {
+                    var updatedTrigger = existing;
+                    var status = {
+                        'active': false,
+                        'dateChanged': new Date().toISOString(),
+                        'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message}
+                    };
+                    updatedTrigger.status = status;
 
-                        triggerDB.insert(updatedTrigger, id, function (err) {
-                            if (err) {
-                                logger.error(method, 'there was an error while disabling', id, 'in database. ' + err);
-                            }
-                            else {
-                                logger.info(method, 'trigger', id, 'successfully disabled in database');
-                            }
-                        });
-                    }
+                    triggerDB.insert(updatedTrigger, id, function (err) {
+                        if (err) {
+                            logger.error(method, 'there was an error while disabling', id, 'in database. ' + err);
+                        }
+                        else {
+                            logger.info(method, 'trigger', id, 'successfully disabled in database');
+                        }
+                    });
                 }
-                else {
-                    logger.info(method, 'could not find', id, 'in database');
-                    //make sure it is removed from memory as well
-                    utils.deleteTrigger(id);
-                }
-            });
-        }
+            }
+            else {
+                logger.info(method, 'could not find', id, 'in database');
+                //make sure it is removed from memory as well
+                utils.deleteTrigger(id);
+            }
+        });
     };
 
     // Delete a trigger: stop listening for changes and remove it.
diff --git a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
index 0a5cb45..effc91b 100644
--- a/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
+++ b/tests/src/test/scala/system/health/CloudantHealthFeedTests.scala
@@ -88,7 +88,7 @@
                 println(s"created a test doc at $now")
 
                 // get activation list of the trigger, expecting exactly 1
-                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 60).length
                 val nowPoll = Instant.now(Clock.systemUTC())
                 println(s"Found activation size ($nowPoll): $activations")
                 withClue("Change feed trigger count: ") { activations should be(1) }
diff --git a/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala b/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala
new file mode 100644
index 0000000..725f2fb
--- /dev/null
+++ b/tests/src/test/scala/system/packages/CloudantMultiWorkersTests.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import com.jayway.restassured.http.ContentType
+import common._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.{StringJsonFormat, _}
+import spray.json.{pimpAny, _}
+import system.CloudantUtil
+import whisk.core.database.test.DatabaseScriptTestUtils
+import whisk.utils.JsHelpers
+
+
+@RunWith(classOf[JUnitRunner])
+class CloudantMultiWorkersTests extends FlatSpec
+    with Matchers
+    with WskActorSystem
+    with WskTestHelpers
+    with StreamLogging
+    with DatabaseScriptTestUtils {
+
+    val wskprops = WskProps()
+    val wsk = new Wsk
+    val auth = WhiskProperties.getBasicAuth
+    val user = auth.fst
+    val password = auth.snd
+
+    val webAction = "/whisk.system/cloudantWeb/changesWebAction"
+    val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http"
+
+    val myCloudantCreds = CloudantUtil.Credential.makeFromVCAPFile("cloudantNoSQLDB", this.getClass.getSimpleName)
+
+    behavior of "Cloudant multi workers feed tests"
+
+    it should "create triggers assigned to worker0 and worker1" in withAssetCleaner(WskProps()) {
+        (wp, assetHelper) =>
+            implicit val wskprops = wp // shadow global props and make implicit
+
+            val worker0Trigger = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+            val worker0Params = JsObject(
+                "triggerName" -> JsString(worker0Trigger),
+                "authKey" -> JsString(s"$user:$password"),
+                "username" -> myCloudantCreds.user.toJson,
+                "password" -> myCloudantCreds.password.toJson,
+                "host" -> myCloudantCreds.host().toJson,
+                "dbname" -> myCloudantCreds.dbname.toJson,
+                "workers" -> JsArray(JsString("worker0")))
+
+            val worker1Trigger = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+            val worker1Params = JsObject(
+                "triggerName" -> JsString(worker1Trigger),
+                "authKey" -> JsString(s"$user:$password"),
+                "username" -> myCloudantCreds.user.toJson,
+                "password" -> myCloudantCreds.password.toJson,
+                "host" -> myCloudantCreds.host().toJson,
+                "dbname" -> myCloudantCreds.dbname.toJson,
+                "workers" -> JsArray(JsString("worker0"), JsString("worker1")))
+
+            try {
+                CloudantUtil.setUp(myCloudantCreds)
+
+                wsk.trigger.create(worker0Trigger)
+
+                //create trigger feed and assign to worker0
+                makePutCallWithExpectedResult(worker0Params, 200)
+
+                wsk.trigger.create(worker1Trigger)
+
+                //create trigger feed and assign to worker0 or worker1
+                //the one with the least assigned triggers will be chosen
+                makePutCallWithExpectedResult(worker1Params, 200)
+
+                val dbName = s"${dbPrefix}cloudanttrigger"
+                val documents = getAllDocs(dbName)
+
+                val worker1Doc = documents
+                        .fields("rows")
+                        .convertTo[List[JsObject]]
+                        .filter(_.fields("id").convertTo[String].equals(s":_:$worker1Trigger"))
+
+                JsHelpers.getFieldPath(worker1Doc(0), "doc", "worker") shouldBe Some(JsString("worker1"))
+            } finally {
+                //delete triggers
+                wsk.trigger.delete(worker0Trigger)
+                wsk.trigger.delete(worker1Trigger)
+
+                makeDeleteCallWithExpectedResult(worker0Params, 200)
+                makeDeleteCallWithExpectedResult(worker1Params, 200)
+
+                CloudantUtil.unsetUp(myCloudantCreds)
+            }
+    }
+
+    def makePutCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+        val response = RestAssured.given()
+                .contentType(ContentType.JSON)
+                .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+                .body(params.toString())
+                .put(webActionURL)
+        assert(response.statusCode() == expectedCode)
+    }
+
+    def makeDeleteCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+        val response = RestAssured.given()
+                .contentType(ContentType.JSON)
+                .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation()))
+                .body(params.toString())
+                .delete(webActionURL)
+        assert(response.statusCode() == expectedCode)
+    }
+
+
+}