Add Redis support for managing active state (#104)

* Add Redis support for managing active state
* prevent duplicate active hosts
diff --git a/.gitignore b/.gitignore
index 7f6823b..ede5a15 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
 .gradle
 build/
+node_modules/
+action/*.zip
\ No newline at end of file
diff --git a/actions/changes.js b/actions/changes.js
index 1c30ba8..7115024 100644
--- a/actions/changes.js
+++ b/actions/changes.js
@@ -1,7 +1,6 @@
 var request = require('request');
 
 function main(msg) {
-    console.log("cloudant trigger feed: ", msg);
 
     // for creation -> CREATE
     // for deletion -> DELETE
diff --git a/package.json b/package.json
index 1932a43..7eaeff6 100644
--- a/package.json
+++ b/package.json
@@ -16,6 +16,8 @@
     "nano": "^6.2.0",
     "json-stringify-safe": "^5.0.1",
     "http-status-codes": "^1.0.5",
-    "request-promise": "^1.0.2"
+    "request-promise": "^1.0.2",
+    "redis":"^2.7.1",
+    "bluebird": "^3.5.0"
   }
 }
\ No newline at end of file
diff --git a/provider/app.js b/provider/app.js
index 5af7424..75a8883 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -7,6 +7,7 @@
 var express = require('express');
 var request = require('request');
 var bodyParser = require('body-parser');
+var bluebird = require('bluebird');
 var logger = require('./Logger');
 
 var ProviderUtils = require('./lib/utils.js');
@@ -32,15 +33,16 @@
 var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
+var redisUrl = process.env.REDIS_URL;
 var ddname = '_design/triggers';
 
 // Create the Provider Server
 var server = http.createServer(app);
-server.listen(app.get('port'), function(){
+server.listen(app.get('port'), function() {
     logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
 });
 
-function createDatabase (nanop) {
+function createDatabase(nanop) {
     var method = 'createDatabase';
     logger.info(method, 'creating the trigger database');
 
@@ -80,19 +82,45 @@
     });
 }
 
-function createTriggerDb () {
+function createTriggerDb() {
     var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
     if (nanop !== null) {
-        return createDatabase (nanop);
+        return createDatabase(nanop);
     }
     else {
         Promise.reject('nano provider did not get created.  check db URL: ' + dbHost);
     }
 }
 
+function createRedisClient() {
+    var method = 'createRedisClient';
+
+    return new Promise(function(resolve, reject) {
+        if (redisUrl) {
+            var redis = require('redis');
+            bluebird.promisifyAll(redis.RedisClient.prototype);
+            var client = redis.createClient(redisUrl);
+
+            client.on("connect", function () {
+                resolve(client);
+            });
+
+            client.on("error", function (err) {
+                logger.error(method, 'Error creating redis', err);
+                reject(err);
+            });
+        }
+        else {
+            resolve();
+        }
+    });
+}
+
 // Initialize the Provider Server
 function init(server) {
     var method = 'init';
+    var nanoDb;
+    var providerUtils;
 
     if (server !== null) {
         var address = server.address();
@@ -103,25 +131,30 @@
     }
 
     createTriggerDb()
-    .then(nanoDb => {
-        logger.info(method, 'trigger storage database details:', nanoDb);
+        .then(db => {
+            nanoDb = db;
+            return createRedisClient();
+        })
+        .then(client => {
+            providerUtils = new ProviderUtils(logger, nanoDb, client);
+            return providerUtils.initRedis();
+        })
+        .then(() => {
+            var providerRAS = new ProviderRAS();
+            var providerHealth = new ProviderHealth(providerUtils);
+            var providerActivation = new ProviderActivation(logger, providerUtils);
 
-        var providerUtils = new ProviderUtils(logger, nanoDb);
-        var providerRAS = new ProviderRAS();
-        var providerHealth = new ProviderHealth(providerUtils);
-        var providerActivation = new ProviderActivation(logger, providerUtils);
+            // RAS Endpoint
+            app.get(providerRAS.endPoint, providerRAS.ras);
 
-        // RAS Endpoint
-        app.get(providerRAS.endPoint, providerRAS.ras);
+            // Health Endpoint
+            app.get(providerHealth.endPoint, providerHealth.health);
 
-        // Health Endpoint
-        app.get(providerHealth.endPoint, providerHealth.health);
+            // Activation Endpoint
+            app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
 
-        // Activation Endpoint
-        app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
-
-        providerUtils.initAllTriggers();
-    }).catch(err => {
+            providerUtils.initAllTriggers();
+        }).catch(err => {
         logger.error(method, 'an error occurred creating database:', err);
     });
 
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 82c29f0..be9ee88 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -1,33 +1,60 @@
-module.exports = function(logger, providerUtils) {
+module.exports = function(logger, utils) {
 
-  // Active Endpoint
-  this.endPoint = '/active';
+    // Active Endpoint
+    this.endPoint = '/active';
 
-  this.active = function (req, res) {
-      var method = 'active';
-      var response = {};
+    this.active = function(req, res) {
+        var method = 'active';
 
-      if (req.query && req.query.active) {
-          var errorMessage = "Invalid query string";
-          try {
-              var active = JSON.parse(req.query.active);
-              if (typeof active !== 'boolean') {
-                  response.error = errorMessage;
-              }
-              else if (providerUtils.active !== active) {
-                  var message = 'The active state has been changed';
-                  logger.info(method, message, 'to', active);
-                  providerUtils.active = active;
-                  response.message = message;
-              }
-          }
-          catch (e) {
-              response.error = errorMessage;
-          }
-      }
-      response.active = providerUtils.active;
-      response.worker = providerUtils.worker;
-      res.send(response);
-  };
+        var response = {
+            worker: utils.worker,
+            host: utils.host,
+            active: utils.host === utils.activeHost
+        };
+
+        if (req.query && req.query.active) {
+            var query = req.query.active.toLowerCase();
+
+            if (query !== 'true' && query !== 'false') {
+                response.error = "Invalid query string";
+                res.send(response);
+                return;
+            }
+
+            var redundantHost = utils.host === 'host0' ? 'host1' : 'host0';
+            var activeHost = query === 'true' ? utils.host : redundantHost;
+            if (utils.activeHost !== activeHost) {
+                if (utils.redisClient) {
+                    utils.redisClient.hsetAsync(utils.redisHash, utils.redisKey, activeHost)
+                    .then(() => {
+                        response.active = 'swapping';
+                        utils.redisClient.publish(utils.redisHash, activeHost);
+                        var msg = 'Active host swap in progress';
+                        logger.info(method, msg);
+                        response.message = msg;
+                        res.send(response);
+                    })
+                    .catch(err => {
+                        response.error = err;
+                        res.send(response);
+                    });
+                }
+                else {
+                    response.active = utils.host === activeHost;
+                    utils.activeHost = activeHost;
+                    var message = 'The active state has changed';
+                    logger.info(method, message, 'to', activeHost);
+                    response.message = message;
+                    res.send(response);
+                }
+            }
+            else {
+                res.send(response);
+            }
+        }
+        else {
+            res.send(response);
+        }
+    };
 
 };
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index d8f11b8..810ea62 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -2,10 +2,12 @@
 const DEFAULT_MAX_TRIGGERS = -1;
 const RETRY_ATTEMPTS = 12;
 const RETRY_DELAY = 1000; //in milliseconds
+const REDIS_KEY = 'active';
 
 module.exports = {
     TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
     DEFAULT_MAX_TRIGGERS: DEFAULT_MAX_TRIGGERS,
     RETRY_ATTEMPTS: RETRY_ATTEMPTS,
-    RETRY_DELAY: RETRY_DELAY
+    RETRY_DELAY: RETRY_DELAY,
+    REDIS_KEY: REDIS_KEY
 };
diff --git a/provider/lib/ras.js b/provider/lib/ras.js
index c5bf9c2..2b7d352 100644
--- a/provider/lib/ras.js
+++ b/provider/lib/ras.js
@@ -1,4 +1,4 @@
-module.exports = function(logger) {
+module.exports = function() {
 
   // Test Endpoint
   this.endPoint = '/ping';
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index db5c18e..68b9d9c 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -5,14 +5,19 @@
 
 module.exports = function(
   logger,
-  triggerDB
+  triggerDB,
+  redisClient
 ) {
     this.module = 'utils';
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
-    this.active =  !(process.env.ACTIVE && process.env.ACTIVE.toLowerCase() === 'false');
     this.worker = process.env.WORKER || "worker0";
+    this.host = process.env.ACTIVE !== undefined && process.env.ACTIVE.toLowerCase() === 'false' ? 'host1' : 'host0';
+    this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
+    this.redisClient = redisClient;
+    this.redisHash = triggerDB.config.db + '_' + this.worker;
+    this.redisKey = constants.REDIS_KEY;
 
     this.retryAttempts = constants.RETRY_ATTEMPTS;
 
@@ -50,7 +55,7 @@
             utils.triggers[dataTrigger.id] = dataTrigger;
 
             feed.on('change', function (change) {
-                if (utils.active) {
+                if (utils.activeHost === utils.host) {
                     logger.info(method, 'Trigger', dataTrigger.id, 'got change from', dataTrigger.dbname);
 
                     var triggerHandle = utils.triggers[dataTrigger.id];
@@ -87,7 +92,7 @@
 
     };
 
-    this.initTrigger = function (newTrigger) {
+    this.initTrigger = function(newTrigger) {
         var method = 'initTrigger';
 
         logger.info(method, 'create trigger', newTrigger.id, 'with the following args', newTrigger);
@@ -113,16 +118,16 @@
         return trigger;
     };
 
-    this.shouldDisableTrigger = function (statusCode) {
+    this.shouldDisableTrigger = function(statusCode) {
         return ((statusCode >= 400 && statusCode < 500) &&
             [HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
     };
 
-    this.disableTrigger = function (id, statusCode, message) {
+    this.disableTrigger = function(id, statusCode, message) {
         var method = 'disableTrigger';
 
         //only active/master provider should update the database
-        if (utils.active) {
+        if (utils.activeHost === utils.host) {
             triggerDB.get(id, function (err, existing) {
                 if (!err) {
                     if (!existing.status || existing.status.active === true) {
@@ -152,7 +157,7 @@
     };
 
     // Delete a trigger: stop listening for changes and remove it.
-    this.deleteTrigger = function (triggerIdentifier) {
+    this.deleteTrigger = function(triggerIdentifier) {
         var method = 'deleteTrigger';
 
         if (utils.triggers[triggerIdentifier].feed) {
@@ -163,7 +168,7 @@
         logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory');
     };
 
-    this.fireTrigger = function (triggerIdentifier, change) {
+    this.fireTrigger = function(triggerIdentifier, change) {
         var method = 'fireTrigger';
 
         var dataTrigger = utils.triggers[triggerIdentifier];
@@ -190,7 +195,7 @@
         });
     };
 
-    this.postTrigger = function (dataTrigger, form, uri, auth, retryCount) {
+    this.postTrigger = function(dataTrigger, form, uri, auth, retryCount) {
         var method = 'postTrigger';
 
         return new Promise(function(resolve, reject) {
@@ -246,7 +251,7 @@
         });
     };
 
-    this.initAllTriggers = function () {
+    this.initAllTriggers = function() {
         var method = 'initAllTriggers';
 
         logger.info(method, 'resetting system from last state');
@@ -302,7 +307,7 @@
         });
     };
 
-    this.setupFollow = function setupFollow(seq) {
+    this.setupFollow = function(seq) {
         var method = 'setupFollow';
 
         var feed = triggerDB.follow({ since: seq, include_docs: true, filter: ddname + '/' + filter, query_params: { worker: utils.worker } });
@@ -361,7 +366,6 @@
             var endpointAuth = utils.endpointAuth.split(':');
 
             if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
-                logger.info(method, 'Authentication successful');
                 next();
             }
             else {
@@ -374,12 +378,12 @@
         }
     };
 
-    this.sendError = function (method, code, message, res) {
+    this.sendError = function(method, code, message, res) {
         logger.error(method, message);
         res.status(code).json({error: message});
     };
 
-    this.parseQName = function (qname, separator) {
+    this.parseQName = function(qname, separator) {
         var parsed = {};
         var delimiter = separator || ':';
         var defaultNamespace = '_';
@@ -394,4 +398,51 @@
         return parsed;
     };
 
+    this.initRedis = function() {
+        var method = 'initRedis';
+
+        return new Promise(function(resolve, reject) {
+
+            if (redisClient) {
+                var subscriber = redisClient.duplicate();
+
+                //create a subscriber client that listens for requests to perform swap
+                subscriber.on("message", function (channel, message) {
+                    if (message === 'host0' || message === 'host1') {
+                        logger.info(method, message, "set to active host in channel", channel);
+                        utils.activeHost = message;
+                    }
+                });
+
+                subscriber.subscribe(utils.redisHash);
+
+                redisClient.hgetAsync(utils.redisHash, utils.redisKey)
+                .then(activeHost => {
+                    return utils.initActiveHost(activeHost);
+                    })
+                    .then(resolve)
+                    .catch(err => {
+                        reject(err);
+                    });
+            }
+            else {
+                resolve();
+            }
+        });
+    };
+
+    this.initActiveHost = function(activeHost) {
+        var method = 'initActiveHost';
+
+        if (activeHost === null) {
+            //initialize redis key with active host
+            logger.info(method, 'redis hset', utils.redisHash, utils.redisKey, utils.activeHost);
+            return redisClient.hsetAsync(utils.redisHash, utils.redisKey, utils.activeHost);
+        }
+        else {
+            utils.activeHost = activeHost;
+            return Promise.resolve();
+        }
+    };
+
 };
diff --git a/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala b/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala
new file mode 100644
index 0000000..6cc69f8
--- /dev/null
+++ b/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.redundancy
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import common.{WhiskProperties, Wsk, WskProps, WskTestHelpers}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.{pimpAny, _}
+import system.CloudantUtil
+
+/**
+ * These tests verify that a cloudant redundancy (master/slave) configuration
+ * works as expected.  They will only run properly in an environment with two
+ * alarms containers running concurrently and env var ACTIVE set to true in
+ * one container and false in the other.  This test also assumes that redis and
+ * the active endpoint authorization are configured.  For the auth set the
+ * ENDPOINT_AUTH env var in your containers to match the testing.auth property
+ * found in your whisk.properties.  To configure redis simply set the REDIS_URL
+ * env var in your containers to point to the openwhisk redis container and make
+ * sure the container is deployed.  You can run redis.yml to deploy it.
+ */
+@RunWith(classOf[JUnitRunner])
+class CloudantRedundancyTests
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with WskTestHelpers {
+
+    val wskprops = WskProps()
+    val wsk = new Wsk
+    val myCloudantCreds = CloudantUtil.Credential.makeFromVCAPFile("cloudantNoSQLDB", this.getClass.getSimpleName)
+    var edgeHost = WhiskProperties.getEdgeHost()
+    val auth = WhiskProperties.getBasicAuth
+    val user = auth.fst
+    val password = auth.snd
+
+    var endpointPrefix = s"https://$user:$password@$edgeHost/cloudanttrigger/worker0/"
+
+    behavior of "Cloudant redundancy tests"
+
+    it should "fire cloudant trigger before the swap" in withAssetCleaner(wskprops) {
+        (wp, assetHelper) =>
+            implicit val wskprops = wp // shadow global props and make implicit
+            val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+            val packageName = "dummyCloudantPackage"
+            val feed = "changes"
+
+            try {
+                CloudantUtil.setUp(myCloudantCreds)
+
+                // the package cloudant should be there
+                val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+                println("fetched package cloudant")
+                packageGetResult.stdout should include("ok")
+
+                // create package binding
+                assetHelper.withCleaner(wsk.pkg, packageName) {
+                    (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+                }
+
+                // create whisk stuff
+                val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+                    (trigger, name) =>
+                        trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+                            "username" -> myCloudantCreds.user.toJson,
+                            "password" -> myCloudantCreds.password.toJson,
+                            "host" -> myCloudantCreds.host().toJson,
+                            "dbname" -> myCloudantCreds.dbname.toJson))
+                }
+                feedCreationResult.stdout should include("ok")
+
+                Thread.sleep(3000)
+
+                // create a test doc in the sample db
+                println("create a test doc and wait for trigger")
+                CloudantUtil.createDocument(myCloudantCreds, "{\"test\":\"test_doc1\"}")
+
+                // get activation list of the trigger, expecting exactly 1
+                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+                println(s"Found activation size (should be exactly 1): $activations")
+                withClue("Change feed trigger count: ") { activations should be(1) }
+
+                // delete the whisk trigger, which must also delete the feed
+                wsk.trigger.delete(triggerName)
+            } finally {
+                CloudantUtil.unsetUp(myCloudantCreds)
+            }
+    }
+
+    it should "perform active swap by setting host0 active=false" in {
+        val endpointURL = endpointPrefix + "0/active?active=false"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":\"swapping\"}".parseJson.asJsObject
+
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "verify active swap by checking for host0 active=false" in {
+        val endpointURL = endpointPrefix + "0/active"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":false}".parseJson.asJsObject
+
+        Thread.sleep(3000)
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "verify active swap by checking for host1 active=true" in {
+        val endpointURL = endpointPrefix + "1/active"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host1\",\"active\":true}".parseJson.asJsObject
+
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "fire cloudant trigger again after the swap" in withAssetCleaner(wskprops) {
+        (wp, assetHelper) =>
+            implicit val wskprops = wp // shadow global props and make implicit
+            val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+            val packageName = "dummyCloudantPackage"
+            val feed = "changes"
+
+            try {
+                CloudantUtil.setUp(myCloudantCreds)
+
+                // the package cloudant should be there
+                val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+                println("fetched package cloudant")
+                packageGetResult.stdout should include("ok")
+
+                // create package binding
+                assetHelper.withCleaner(wsk.pkg, packageName) {
+                    (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+                }
+
+                // create whisk stuff
+                val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+                    (trigger, name) =>
+                        trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+                            "username" -> myCloudantCreds.user.toJson,
+                            "password" -> myCloudantCreds.password.toJson,
+                            "host" -> myCloudantCreds.host().toJson,
+                            "dbname" -> myCloudantCreds.dbname.toJson))
+                }
+                feedCreationResult.stdout should include("ok")
+
+                Thread.sleep(3000)
+
+                // create a test doc in the sample db
+                println("create a test doc and wait for trigger")
+                CloudantUtil.createDocument(myCloudantCreds, "{\"test\":\"test_doc1\"}")
+
+                // get activation list of the trigger, expecting exactly 1
+                val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+                println(s"Found activation size (should be exactly 1): $activations")
+                withClue("Change feed trigger count: ") { activations should be(1) }
+
+                // delete the whisk trigger, which must also delete the feed
+                wsk.trigger.delete(triggerName)
+            } finally {
+                CloudantUtil.unsetUp(myCloudantCreds)
+            }
+    }
+
+    private def makeGetCallWithExpectedResult(endpointURL: String, expectedResult: JsObject) = {
+        val response = RestAssured.
+                given().
+                config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+                get(endpointURL)
+        assert(response.statusCode() == 200)
+        var result = response.body.asString.parseJson.asJsObject
+        JsObject(result.fields - "message") shouldBe expectedResult
+    }
+
+    override def afterAll() {
+        //swap back to original configuration
+        RestAssured.
+                given().
+                config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+                get(endpointPrefix + "0/active?active=true")
+    }
+
+}