Support multiple workers (#214)

* phase 3 HA

* clean up logic

* fix scancode issue

* remove workerId from canary doc id

* remove misleading logging statement. modify devGuid for new env var and parameter for install script

* addition of integration tests

* fix travis build failure due to missing EOL

* do not go through trouble of consulting view when only one available worker
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
index 7a57eb2..de39edb 100644
--- a/action/kafkaFeedWeb.js
+++ b/action/kafkaFeedWeb.js
@@ -32,7 +32,14 @@
                         common.verifyTriggerAuth(validatedParams.triggerURL)
                     ]);
                 })
-                .then(() => db.recordTrigger(validatedParams))
+                .then(() => {
+                    var workers = (params.workers || []);
+                    return db.getTriggerAssignment(workers)
+                })
+                .then((worker) => {
+                    validatedParams['worker'] = worker;
+                    return db.recordTrigger(validatedParams);
+                })
                 .then(() => {
                     console.log('successfully wrote the trigger');
                     resolve(common.webResponse(200, validatedParams.uuid));
diff --git a/action/lib/Database.js b/action/lib/Database.js
index fe8f778..65e6a29 100644
--- a/action/lib/Database.js
+++ b/action/lib/Database.js
@@ -3,6 +3,9 @@
     var nano = require('nano')(dbURL);
     this.db = nano.db.use(dbName);
 
+    const designDoc = "filters";
+    const assignmentView = "by-worker";
+
     this.getTrigger = function(triggerFQN) {
         return new Promise((resolve, reject) => {
             this.db.get(triggerFQN, (err, result) => {
@@ -60,4 +63,45 @@
                 });
             })
     };
+
+    this.getTriggerAssignment = function(workers) {
+
+        return new Promise((resolve, reject) => {
+            var assignment = workers[0] || 'worker0';
+
+            if (workers.length > 1) {
+                this.db.view(designDoc, assignmentView, {group: true}, (err, result) => {
+                    if (err) {
+                        reject(err);
+                    } else {
+                        // a map between available workers and their number of assigned triggers
+                        // values will be populated with the results of the assignment view
+                        var counter = {};
+                        workers.forEach(worker => {
+                            counter[worker] = 0;
+                        });
+
+                        // update counter values with the number of assigned triggers
+                        // for each worker
+                        result.rows.forEach(row => {
+                            if (row.key in counter) {
+                                counter[row.key] = row.value;
+                            }
+                        });
+
+                        // find which of the available workers has the least number of
+                        // assigned triggers
+                        for (availableWorker in counter) {
+                            if (counter[availableWorker] < counter[assignment]) {
+                                assignment = availableWorker;
+                            }
+                        }
+                        resolve(assignment);
+                    }
+                });
+            } else {
+                resolve(assignment);
+            }
+        });
+    };
 };
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index eaa3a18..042096a 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -35,7 +35,14 @@
                         checkMessageHubCredentials(validatedParams)
                     ]);
                 })
-                .then(() => db.recordTrigger(validatedParams))
+                .then(() => {
+                    var workers = (params.workers || []);
+                    return db.getTriggerAssignment(workers)
+                })
+                .then((worker) => {
+                    validatedParams['worker'] = worker;
+                    return db.recordTrigger(validatedParams);
+                })
                 .then(() => {
                     console.log('successfully wrote the trigger');
                     resolve(common.webResponse(200, validatedParams.uuid));
diff --git a/devGuide.md b/devGuide.md
index 81080dc..2b59793 100644
--- a/devGuide.md
+++ b/devGuide.md
@@ -22,6 +22,7 @@
 |INSTANCE|String|A unique identifier for this service. This is useful to differentiate log messages if you run multiple instances of the service|
 |LOCAL_DEV|Boolean|If you are using a locally-deployed OpenWhisk core system, it likely has a self-signed certificate. Set `LOCAL_DEV` to `true` to allow firing triggers without checking the certificate validity. *Do not use this for production systems!*|
 |PAYLOAD_LIMIT|Integer (default=900000)|The maxmimum payload size, in bytes, allowed during message batching. This value should be less than your OpenWhisk deployment's payload limit.|
+|WORKER|String|The ID of this running instances. Useful when running multiple instances. This should be of the form `workerX`. e.g. `worker0`.
 
 With that in mind, starting the feed service might look something like:
 
@@ -52,6 +53,17 @@
 ./installKafka.sh MyOpenWhiskAuthKey 10.0.1.5 https://cloudant_user:cloudant_pw@cloudant.com staging_db_prefix 10.0.1.5
 ```
 
+In addition, when running multiple instances, the following argument is required
+|Name|Description|
+|---|---|
+|workers|An array of the IDs of the running instances with each ID of the form `workerX`. e.g. `["worker0", "worker1"]`|
+
+When running multiple instances, an example run might look something like:
+
+```sh
+./installKafka.sh MyOpenWhiskAuthKey 10.0.1.5 https://cloudant_user:cloudant_pw@cloudant.com staging_db_prefix 10.0.1.5 "[\"worker0\", \"worker1\"]"
+```
+
 # Testing
 To run the automated test suite, you can issue a Gradle command. There are some tests which talk directly to the provider service over REST, and so these tests must know the IP address and port of the running service. This is done by providing the `-Dhealth_url`, `-Dhost` and `-Dport` arguments to Gradle:
 
diff --git a/installCatalog.sh b/installCatalog.sh
index 9a459d3..7055138 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -23,6 +23,7 @@
 DB_URL="$3"
 DB_NAME="${4}ow_kafka_triggers"
 APIHOST="$5"
+WORKERS="$6"
 
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
@@ -66,12 +67,23 @@
     -a sampleInput '{"kafka_brokers_sasl":"[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\"]", "username":"someUsername", "password":"somePassword", "topic":"mytopic", "isJSONData": "false", "endpoint":"openwhisk.ng.bluemix.net", "kafka_admin_url":"https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443"}'
 
 # create messagingWeb package and web version of feed action
-$WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
-    --auth "$AUTH" \
-    --shared no \
-    -p endpoint "$APIHOST" \
-    -p DB_URL "$DB_URL" \
-    -p DB_NAME "$DB_NAME" \
+if [ -n "$WORKERS" ];
+then
+    $WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+        --auth "$AUTH" \
+        --shared no \
+        -p endpoint "$APIHOST" \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME"  \
+        -p workers "$WORKERS"
+else
+    $WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+        --auth "$AUTH" \
+        --shared no \
+        -p endpoint "$APIHOST" \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME"
+fi
 
 # make messageHubFeedWeb.zip
 
diff --git a/installKafka.sh b/installKafka.sh
index 2ed583c..4b3ee24 100755
--- a/installKafka.sh
+++ b/installKafka.sh
@@ -23,6 +23,7 @@
 DB_URL="$3"
 DB_NAME="${4}ow_kafka_triggers"
 APIHOST="$5"
+WORKERS="$6"
 
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
@@ -65,12 +66,23 @@
     -a sampleInput '{"brokers":"[\"127.0.0.1:9093\"]", "topic":"mytopic", "isJSONData":"false", "endpoint": "openwhisk.ng.bluemix.net"}'
 
 # create messagingWeb package and web version of feed action
-$WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
-    --auth "$AUTH" \
-    --shared no \
-    -p endpoint "$APIHOST" \
-    -p DB_URL "$DB_URL" \
-    -p DB_NAME "$DB_NAME" \
+if [ -n "$WORKERS" ];
+then
+    $WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+        --auth "$AUTH" \
+        --shared no \
+        -p endpoint "$APIHOST" \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME"  \
+        -p workers "$WORKERS"
+else
+    $WSK_CLI -i --apihost "$EDGEHOST" package update messagingWeb \
+        --auth "$AUTH" \
+        --shared no \
+        -p endpoint "$APIHOST" \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME"
+fi
 
 # make kafkaFeedWeb.zip
 OLD_PATH=`pwd`
diff --git a/provider/consumer.py b/provider/consumer.py
index 727643a..b3af4b4 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -74,8 +74,12 @@
         self.sharedDictionary['desiredState'] = newState
 
     def shutdown(self):
-        self.sharedDictionary['currentState'] = Consumer.State.Stopping
-        self.setDesiredState(Consumer.State.Dead)
+        if self.currentState() == Consumer.State.Disabled:
+            self.sharedDictionary['currentState'] = Consumer.State.Dead
+            self.setDesiredState(Consumer.State.Dead)
+        else:
+            self.sharedDictionary['currentState'] = Consumer.State.Stopping
+            self.setDesiredState(Consumer.State.Dead)
 
     def disable(self):
         self.setDesiredState(Consumer.State.Disabled)
diff --git a/provider/database.py b/provider/database.py
index 7a7ab41..4832586 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -37,6 +37,7 @@
 
     filters_design_doc_id = '_design/filters'
     only_triggers_view_id = 'only-triggers'
+    by_worker_view_id = 'by-worker'
 
     instance = os.getenv('INSTANCE', 'messageHubTrigger-0')
     canaryId = "canary-{}".format(instance)
@@ -124,12 +125,25 @@
     def migrate(self):
         logging.info('Starting DB migration')
 
+        by_worker_view = {
+            'map': """function(doc) {
+                        if(doc.triggerURL && (!doc.status || doc.status.active)) {
+                            emit(doc.worker || 'worker0', 1);
+                        }
+                    }""",
+            'reduce': '_count'
+        }
+
         filtersDesignDoc = self.database.get_design_document(self.filters_design_doc_id)
 
-        if not filtersDesignDoc.exists():
+        if filtersDesignDoc.exists():
+            if self.by_worker_view_id not in filtersDesignDoc["views"]:
+                filtersDesignDoc["views"][self.by_worker_view_id] = by_worker_view
+                logging.info('Updating the design doc')
+                filtersDesignDoc.save()
+        else:
             logging.info('Creating the design doc')
 
-            # create only-triggers view
             self.database.create_document({
                 '_id': self.filters_design_doc_id,
                 'views': {
@@ -139,10 +153,9 @@
                                         emit(doc._id, 1);
                                     }
                                 }"""
-                    }
+                    },
+                    self.by_worker_view_id: by_worker_view
                 }
             })
-        else:
-            logging.info("design doc already exists")
 
         logging.info('Database migration complete')
diff --git a/provider/service.py b/provider/service.py
index 8a08e79..ae970bb 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -19,6 +19,7 @@
 """
 
 import logging
+import os
 import time
 
 from consumer import Consumer
@@ -45,6 +46,7 @@
         self.canaryGenerator = CanaryDocumentGenerator()
 
         self.consumers = consumers
+        self.workerId = os.getenv("WORKER", "worker0")
 
     def run(self):
         self.canaryGenerator.start()
@@ -89,24 +91,33 @@
                         elif 'triggerURL' in change['doc']:
                             logging.info('[changes] Found a change in a trigger document')
                             document = change['doc']
+                            triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
 
                             if not self.consumers.hasConsumerForTrigger(change["id"]):
-                                logging.info('[{}] Found a new trigger to create'.format(change["id"]))
-                                self.createAndRunConsumer(document)
+                                if triggerIsAssignedToMe:
+                                    logging.info('[{}] Found a new trigger to create'.format(change["id"]))
+                                    self.createAndRunConsumer(document)
+                                else:
+                                    logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
                             else:
-                                logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
                                 existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
 
-                                if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
-                                    # disabled trigger has become active
-                                    logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
-                                    self.createAndRunConsumer(document)
-                                elif existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
+                                if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
                                     # running trigger should become disabled
+                                    # this should be done regardless of which worker the document claims to be assigned to
                                     logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
                                     existingConsumer.disable()
+                                elif triggerIsAssignedToMe:
+                                    logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
+
+                                    if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                        # disabled trigger has become active
+                                        logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
+                                        self.createAndRunConsumer(document)
                                 else:
-                                    logging.debug('[changes] Found non-interesting trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
+                                    # trigger has become reassigned to a different worker
+                                    logging.info("[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
+                                    existingConsumer.shutdown()
                         elif 'canary-timestamp' in change['doc']:
                             # found a canary - update lastCanaryTime
                             logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
@@ -120,6 +131,12 @@
 
             logging.debug("[changes] I made it out of the changes loop!")
 
+    def __isTriggerDocAssignedToMe(self, doc):
+        if "worker" in doc:
+            return doc["worker"] == self.workerId
+        else:
+            return self.workerId == "worker0"
+
     def stopChangesFeed(self):
         if self.changes != None:
             self.changes.stop()
diff --git a/provider/thedoctor.py b/provider/thedoctor.py
index 4c28674..42fa9e7 100644
--- a/provider/thedoctor.py
+++ b/provider/thedoctor.py
@@ -55,9 +55,13 @@
                     consumer.restart()
                 elif consumer.currentState() == Consumer.State.Dead and consumer.desiredState() == Consumer.State.Dead:
                     # Bring out yer dead...
-                    logging.info('[{}] Joining dead process.'.format(consumer.trigger))
-                    # if you don't first join the process, it'll be left hanging around as a "defunct" process
-                    consumer.process.join(1)
+                    if consumer.process.is_alive():
+                        logging.info('[{}] Joining dead process.'.format(consumer.trigger))
+                        # if you don't first join the process, it'll be left hanging around as a "defunct" process
+                        consumer.process.join(1)
+                    else:
+                        logging.info('[{}] Process is already dead.'.format(consumer.trigger))
+
                     logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger))
                     self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
                 elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
new file mode 100644
index 0000000..f6ce723
--- /dev/null
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import system.utils.KafkaUtils
+
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.JsHelpers
+import common.TestHelpers
+import common.StreamLogging
+import common.Wsk
+import common.WskActorSystem
+import common.WskProps
+import common.WskTestHelpers
+import spray.json.DefaultJsonProtocol._
+import spray.json.{pimpAny, _}
+import whisk.core.database.test.DatabaseScriptTestUtils
+import whisk.utils.JsHelpers
+
+@RunWith(classOf[JUnitRunner])
+class MessageHubMultiWorkersTest extends FlatSpec
+  with Matchers
+  with WskActorSystem
+  with BeforeAndAfterAll
+  with TestHelpers
+  with WskTestHelpers
+  with JsHelpers
+  with StreamLogging
+  with DatabaseScriptTestUtils {
+
+  val topic = "test"
+
+  implicit val wskprops = WskProps()
+  val wsk = new Wsk()
+
+  val messagingPackage = "/whisk.system/messaging"
+  val messageHubFeed = "messageHubFeed"
+  val dbName = s"${dbPrefix}ow_kafka_triggers"
+
+  val kafkaUtils = new KafkaUtils
+
+  behavior of "Mussage Hub Feed"
+
+  it should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) {
+
+    (wp, assetHelper) =>
+      val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
+      val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
+
+      val worker0 = s"worker${System.currentTimeMillis()}"
+
+      val parameters = constructParams(List(worker0))
+
+      createTrigger(assetHelper, firstTrigger, parameters)
+      createTrigger(assetHelper, secondTrigger, parameters)
+
+      val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+
+      validateTriggerAssignment(documents, firstTrigger, worker0)
+      validateTriggerAssignment(documents, secondTrigger, worker0)
+  }
+
+  it should "assign a trigger to worker0 and a trigger to worker1 when both workers are available" in withAssetCleaner(wskprops) {
+
+    (wp, assetHelper) =>
+      val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
+      val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
+
+      val worker0 = s"worker${System.currentTimeMillis()}"
+      val worker1 = s"worker${System.currentTimeMillis()}"
+
+      val parameters = constructParams(List(worker0, worker1))
+
+      createTrigger(assetHelper, firstTrigger, parameters)
+      createTrigger(assetHelper, secondTrigger, parameters)
+
+      val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+
+      validateTriggerAssignment(documents, firstTrigger, worker0)
+      validateTriggerAssignment(documents, secondTrigger, worker1)
+  }
+
+  it should "assign a trigger to worker1 when worker0 is removed and there is an assignment imbalance" in withAssetCleaner(wskprops) {
+
+    (wp, assetHelper) =>
+      val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
+      val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
+      val thirdTrigger = s"thirdTrigger-${System.currentTimeMillis()}"
+      val fourthTrigger = s"fourthTrigger-${System.currentTimeMillis()}"
+
+      val worker0 = s"worker${System.currentTimeMillis()}"
+      val worker1 = s"worker${System.currentTimeMillis()}"
+
+      val parameters = constructParams(List(worker1))
+
+      createTrigger(assetHelper, firstTrigger, parameters)
+      createTrigger(assetHelper, secondTrigger, parameters)
+      createTrigger(assetHelper, thirdTrigger, parameters = constructParams(List(worker0, worker1)))
+      createTrigger(assetHelper, fourthTrigger, parameters = constructParams(List(worker1)))
+
+      val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+
+      validateTriggerAssignment(documents, firstTrigger, worker1)
+      validateTriggerAssignment(documents, secondTrigger, worker1)
+      validateTriggerAssignment(documents, thirdTrigger, worker0)
+      validateTriggerAssignment(documents, fourthTrigger, worker1)
+  }
+
+  it should "balance the load accross workers when a worker is added" in withAssetCleaner(wskprops) {
+
+    (wp, assetHelper) =>
+      val firstTrigger = s"firstTrigger-${System.currentTimeMillis()}"
+      val secondTrigger = s"secondTrigger-${System.currentTimeMillis()}"
+      val thirdTrigger = s"thirdTrigger-${System.currentTimeMillis()}"
+      val fourthTrigger = s"fourthTrigger-${System.currentTimeMillis()}"
+      val fifthTrigger = s"fifthTrigger-${System.currentTimeMillis()}"
+      val sixthTrigger = s"sixthTrigger-${System.currentTimeMillis()}"
+
+      val worker0 = s"worker${System.currentTimeMillis()}"
+      val worker1 = s"worker${System.currentTimeMillis()}"
+
+      val parameters = constructParams(List(worker0))
+      val updatedParameters = constructParams(List(worker0, worker1))
+
+      createTrigger(assetHelper, firstTrigger, parameters)
+      createTrigger(assetHelper, secondTrigger, parameters)
+      createTrigger(assetHelper, thirdTrigger, updatedParameters)
+      createTrigger(assetHelper, fourthTrigger, updatedParameters)
+      createTrigger(assetHelper, fifthTrigger, updatedParameters)
+      createTrigger(assetHelper, sixthTrigger, updatedParameters)
+
+      val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+
+      validateTriggerAssignment(documents, firstTrigger, worker0)
+      validateTriggerAssignment(documents, secondTrigger, worker0)
+      validateTriggerAssignment(documents, thirdTrigger, worker1)
+      validateTriggerAssignment(documents, fourthTrigger, worker1)
+      validateTriggerAssignment(documents, fifthTrigger, worker0)
+      validateTriggerAssignment(documents, sixthTrigger, worker1)
+  }
+
+  def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = {
+    val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
+      (trigger, _) =>
+        trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
+    }
+
+    withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) {
+      activation =>
+        // should be successful
+        activation.response.success shouldBe true
+    }
+  }
+
+  def constructParams(workers: List[String]) = {
+    Map(
+      "user" -> kafkaUtils.getAsJson("user"),
+      "password" -> kafkaUtils.getAsJson("password"),
+      "api_key" -> kafkaUtils.getAsJson("api_key"),
+      "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+      "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+      "topic" -> topic.toJson,
+      "workers" -> workers.toJson
+    )
+  }
+
+  def validateTriggerAssignment(documents: List[JsObject], trigger: String, worker: String) = {
+    val doc = documents.filter(_.fields("id").convertTo[String].contains(trigger))
+    JsHelpers.getFieldPath(doc(0), "doc", "worker") shouldBe Some(JsString(worker))
+  }
+}