Add Redis support for managing active state (#66)

* Add Redis support for managing active state
* prevent duplicate active hosts
diff --git a/.gitignore b/.gitignore
index d354568..ac4bde5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@
 node_modules/
 .env
 .vscode
+action/*.zip
diff --git a/action/alarm.js b/action/alarm.js
index 4e6dc09..cda9aee 100644
--- a/action/alarm.js
+++ b/action/alarm.js
@@ -1,7 +1,6 @@
 var request = require('request');
 
 function main(msg) {
-    console.log("alarm: ", msg);
 
     // for creation -> CREATE
     // for deletion -> DELETE
diff --git a/package.json b/package.json
index e17b4a5..03c4cdc 100755
--- a/package.json
+++ b/package.json
@@ -17,6 +17,8 @@
     "moment": "^2.12.0",
     "json-stringify-safe": "^5.0.1",
     "http-status-codes": "^1.0.5",
-    "request-promise": "^1.0.2"
+    "request-promise": "^1.0.2",
+    "redis":"^2.7.1",
+    "bluebird": "^3.5.0"
   }
 }
\ No newline at end of file
diff --git a/provider/app.js b/provider/app.js
index 061577f..094370e 100755
--- a/provider/app.js
+++ b/provider/app.js
@@ -7,6 +7,7 @@
 var express = require('express');
 var request = require('request');
 var bodyParser = require('body-parser');
+var bluebird = require('bluebird');
 var logger = require('./Logger');
 
 var ProviderUtils = require('./lib/utils.js');
@@ -32,11 +33,12 @@
 var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
+var redisUrl = process.env.REDIS_URL;
 var ddname = '_design/triggers';
 
 // Create the Provider Server
 var server = http.createServer(app);
-server.listen(app.get('port'), function(){
+server.listen(app.get('port'), function() {
     logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
 });
 
@@ -83,16 +85,42 @@
 function createTriggerDb() {
     var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
     if (nanop !== null) {
-        return createDatabase (nanop);
+        return createDatabase(nanop);
     }
     else {
         Promise.reject('nano provider did not get created.  check db URL: ' + dbHost);
     }
 }
 
+function createRedisClient() {
+    var method = 'createRedisClient';
+
+    return new Promise(function(resolve, reject) {
+        if (redisUrl) {
+            var redis = require('redis');
+            bluebird.promisifyAll(redis.RedisClient.prototype);
+            var client = redis.createClient(redisUrl);
+
+            client.on("connect", function () {
+                resolve(client);
+            });
+
+            client.on("error", function (err) {
+                logger.error(method, 'Error creating redis', err);
+                reject(err);
+            });
+        }
+        else {
+            resolve();
+        }
+    });
+}
+
 // Initialize the Provider Server
 function init(server) {
     var method = 'init';
+    var nanoDb;
+    var providerUtils;
 
     if (server !== null) {
         var address = server.address();
@@ -103,10 +131,15 @@
     }
 
     createTriggerDb()
-    .then(nanoDb => {
-        logger.info(method, 'trigger storage database details:', nanoDb);
-
-        var providerUtils = new ProviderUtils(logger, nanoDb);
+    .then(db => {
+        nanoDb = db;
+        return createRedisClient();
+    })
+    .then(client => {
+        providerUtils = new ProviderUtils(logger, nanoDb, client);
+        return providerUtils.initRedis();
+    })
+    .then(() => {
         var providerRAS = new ProviderRAS();
         var providerHealth = new ProviderHealth(providerUtils);
         var providerActivation = new ProviderActivation(logger, providerUtils);
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 82c29f0..59e42fe 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -1,33 +1,60 @@
-module.exports = function(logger, providerUtils) {
+module.exports = function(logger, utils) {
 
   // Active Endpoint
   this.endPoint = '/active';
 
-  this.active = function (req, res) {
+  this.active = function(req, res) {
       var method = 'active';
-      var response = {};
+
+      var response = {
+          worker: utils.worker,
+          host: utils.host,
+          active: utils.host === utils.activeHost
+      };
 
       if (req.query && req.query.active) {
-          var errorMessage = "Invalid query string";
-          try {
-              var active = JSON.parse(req.query.active);
-              if (typeof active !== 'boolean') {
-                  response.error = errorMessage;
+          var query = req.query.active.toLowerCase();
+
+          if (query !== 'true' && query !== 'false') {
+              response.error = "Invalid query string";
+              res.send(response);
+              return;
+          }
+
+          var redundantHost = utils.host === 'host0' ? 'host1' : 'host0';
+          var activeHost = query === 'true' ? utils.host : redundantHost;
+          if (utils.activeHost !== activeHost) {
+              if (utils.redisClient) {
+                  utils.redisClient.hsetAsync(utils.redisHash, utils.redisKey, activeHost)
+                  .then(() => {
+                      response.active = 'swapping';
+                      utils.redisClient.publish(utils.redisHash, activeHost);
+                      var msg = 'Active host swap in progress';
+                      logger.info(method, msg);
+                      response.message = msg;
+                      res.send(response);
+                  })
+                  .catch(err => {
+                      response.error = err;
+                      res.send(response);
+                  });
               }
-              else if (providerUtils.active !== active) {
-                  var message = 'The active state has been changed';
-                  logger.info(method, message, 'to', active);
-                  providerUtils.active = active;
+              else {
+                  response.active = utils.host === activeHost;
+                  utils.activeHost = activeHost;
+                  var message = 'The active state has changed';
+                  logger.info(method, message, 'to', activeHost);
                   response.message = message;
+                  res.send(response);
               }
           }
-          catch (e) {
-              response.error = errorMessage;
+          else {
+              res.send(response);
           }
       }
-      response.active = providerUtils.active;
-      response.worker = providerUtils.worker;
-      res.send(response);
+      else {
+          res.send(response);
+      }
   };
 
 };
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index b07b1ed..4fca5dd 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -2,10 +2,12 @@
 const DEFAULT_MAX_TRIGGERS = -1;
 const RETRY_ATTEMPTS = 10;
 const RETRY_DELAY = 1000; //in milliseconds
+const REDIS_KEY = 'active';
 
 module.exports = {
     TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
     DEFAULT_MAX_TRIGGERS: DEFAULT_MAX_TRIGGERS,
     RETRY_ATTEMPTS: RETRY_ATTEMPTS,
-    RETRY_DELAY: RETRY_DELAY
+    RETRY_DELAY: RETRY_DELAY,
+    REDIS_KEY: REDIS_KEY
 };
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 1073a7d..5ae875c 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -6,14 +6,19 @@
 
 module.exports = function(
   logger,
-  triggerDB
+  triggerDB,
+  redisClient
 ) {
     this.module = 'utils';
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
-    this.active =  !(process.env.ACTIVE && process.env.ACTIVE.toLowerCase() === 'false');
     this.worker = process.env.WORKER || "worker0";
+    this.host = process.env.ACTIVE !== undefined && process.env.ACTIVE.toLowerCase() === 'false' ? 'host1' : 'host0';
+    this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
+    this.redisClient = redisClient;
+    this.redisHash = triggerDB.config.db + '_' + this.worker;
+    this.redisKey = constants.REDIS_KEY;
 
     var retryDelay = constants.RETRY_DELAY;
     var retryAttempts = constants.RETRY_ATTEMPTS;
@@ -35,7 +40,7 @@
                 if (!(triggerIdentifier in utils.triggers)) {
                     cronHandle = new CronJob(newTrigger.cron,
                         function onTick() {
-                            if (utils.active) {
+                            if (utils.activeHost === utils.host) {
                                 var triggerHandle = utils.triggers[triggerIdentifier];
                                 if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) {
                                     try {
@@ -73,7 +78,7 @@
         }
     };
 
-    this.fireTrigger = function (namespace, name, payload, apikey) {
+    this.fireTrigger = function(namespace, name, payload, apikey) {
         var method = 'fireTrigger';
 
         var triggerIdentifier = utils.getTriggerIdentifier(apikey, namespace, name);
@@ -82,6 +87,7 @@
         var dataTrigger = utils.triggers[triggerIdentifier];
         var uri = host + '/api/v1/namespaces/' + namespace + '/triggers/' + name;
 
+        logger.info(method, 'Cron fired for', triggerIdentifier, 'attempting to fire trigger');
         utils.postTrigger(dataTrigger, payload, uri, auth, 0)
         .then(triggerId => {
             logger.info(method, 'Trigger', triggerId, 'was successfully fired');
@@ -94,7 +100,7 @@
         });
     };
 
-    this.postTrigger = function (dataTrigger, payload, uri, auth, retryCount) {
+    this.postTrigger = function(dataTrigger, payload, uri, auth, retryCount) {
         var method = 'postTrigger';
 
         return new Promise(function(resolve, reject) {
@@ -151,16 +157,16 @@
         });
     };
 
-    this.shouldDisableTrigger = function (statusCode) {
+    this.shouldDisableTrigger = function(statusCode) {
         return ((statusCode >= 400 && statusCode < 500) &&
             [HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
     };
 
-    this.disableTrigger = function (triggerIdentifier, statusCode, message) {
+    this.disableTrigger = function(triggerIdentifier, statusCode, message) {
         var method = 'disableTrigger';
 
         //only active/master provider should update the database
-        if (utils.active) {
+        if (utils.activeHost === utils.host) {
             triggerDB.get(triggerIdentifier, function (err, existing) {
                 if (!err) {
                     if (!existing.status || existing.status.active === true) {
@@ -189,7 +195,7 @@
         }
     };
 
-    this.deleteTrigger = function (triggerIdentifier) {
+    this.deleteTrigger = function(triggerIdentifier) {
         var method = 'deleteTrigger';
 
         if (utils.triggers[triggerIdentifier].cronHandle) {
@@ -199,11 +205,11 @@
         logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory');
     };
 
-    this.getTriggerIdentifier = function (apikey, namespace, name) {
+    this.getTriggerIdentifier = function(apikey, namespace, name) {
         return apikey + '/' + namespace + '/' + name;
     };
 
-    this.initAllTriggers = function () {
+    this.initAllTriggers = function() {
         var method = 'initAllTriggers';
 
         logger.info(method, 'resetting system from last state');
@@ -262,7 +268,7 @@
         });
     };
 
-    this.setupFollow = function setupFollow(seq) {
+    this.setupFollow = function(seq) {
         var method = 'setupFollow';
 
         var feed = triggerDB.follow({ since: seq, include_docs: true, filter: ddname + '/' + filter, query_params: { worker: utils.worker } });
@@ -321,7 +327,6 @@
             var endpointAuth = utils.endpointAuth.split(':');
 
             if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
-                logger.info(method, 'Authentication successful');
                 next();
             }
             else {
@@ -334,9 +339,56 @@
         }
     };
 
-    this.sendError = function (method, code, message, res) {
+    this.sendError = function(method, code, message, res) {
         logger.error(method, message);
         res.status(code).json({error: message});
     };
 
+    this.initRedis = function() {
+        var method = 'initRedis';
+
+        return new Promise(function(resolve, reject) {
+
+            if (redisClient) {
+                var subscriber = redisClient.duplicate();
+
+                //create a subscriber client that listens for requests to perform swap
+                subscriber.on("message", function (channel, message) {
+                    if (message === 'host0' || message === 'host1') {
+                        logger.info(method, message, "set to active host in channel", channel);
+                        utils.activeHost = message;
+                    }
+                });
+
+                subscriber.subscribe(utils.redisHash);
+
+                redisClient.hgetAsync(utils.redisHash, utils.redisKey)
+                .then(activeHost => {
+                    return utils.initActiveHost(activeHost);
+                })
+                .then(resolve)
+                .catch(err => {
+                    reject(err);
+                });
+            }
+            else {
+                resolve();
+            }
+        });
+    };
+
+    this.initActiveHost = function(activeHost) {
+        var method = 'initActiveHost';
+
+        if (activeHost === null) {
+            //initialize redis key with active host
+            logger.info(method, 'redis hset', utils.redisHash, utils.redisKey, utils.activeHost);
+            return redisClient.hsetAsync(utils.redisHash, utils.redisKey, utils.activeHost);
+        }
+        else {
+            utils.activeHost = activeHost;
+            return Promise.resolve();
+        }
+    };
+
 };
diff --git a/tests/src/test/scala/system/redundancy/AlarmsRedundancyTests.scala b/tests/src/test/scala/system/redundancy/AlarmsRedundancyTests.scala
new file mode 100644
index 0000000..63d76d0
--- /dev/null
+++ b/tests/src/test/scala/system/redundancy/AlarmsRedundancyTests.scala
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.redundancy
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import common.{WhiskProperties, Wsk, WskProps, WskTestHelpers}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.{pimpAny, _}
+
+/**
+ * These tests verify that an alarms redundancy (master/slave) configuration
+ * works as expected.  They will only run properly in an environment with two
+ * alarms containers running concurrently and env var ACTIVE set to true in
+ * one container and false in the other.  This test also assumes that redis and
+ * the active endpoint authorization are configured.  For the auth set the
+ * ENDPOINT_AUTH env var in your containers to match the testing.auth property
+ * found in your whisk.properties.  To configure redis simply set the REDIS_URL
+ * env var in your containers to point to the openwhisk redis container and make
+ * sure the container is deployed.  You can run redis.yml to deploy it.
+ */
+@RunWith(classOf[JUnitRunner])
+class AlarmsRedundancyTests
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with WskTestHelpers {
+
+    val wskprops = WskProps()
+    val wsk = new Wsk
+    var edgeHost = WhiskProperties.getEdgeHost()
+    val auth = WhiskProperties.getBasicAuth
+    val user = auth.fst
+    val password = auth.snd
+
+    var endpointPrefix = s"https://$user:$password@$edgeHost/alarmstrigger/worker0/"
+
+    behavior of "Alarms redundancy tests"
+
+    it should "fire alarms trigger before the swap" in withAssetCleaner(wskprops) {
+        (wp, assetHelper) =>
+        implicit val wskprops = wp // shadow global props and make implicit
+        val triggerName = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+        val packageName = "dummyAlarmsPackage"
+
+        // the package alarms should be there
+        val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+        println("fetched package alarms")
+        packageGetResult.stdout should include("ok")
+
+        // create package binding
+        assetHelper.withCleaner(wsk.pkg, packageName) {
+            (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+        }
+
+        // create whisk stuff
+        val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+            (trigger, name) =>
+            trigger.create(name, feed = Some(s"$packageName/alarm"), parameters = Map(
+                    "trigger_payload" -> "alarmTest".toJson,
+                    "cron" -> "* * * * * *".toJson))
+        }
+        feedCreationResult.stdout should include("ok")
+
+        println("waiting for triggers")
+        val activations = wsk.activation.pollFor(N = 3, Some(triggerName)).length
+        println(s"Found activation size (should be at least 3): $activations")
+        activations should be >= 3
+    }
+
+
+    it should "perform active swap by setting host0 active=false" in {
+        val endpointURL = endpointPrefix + "0/active?active=false"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":\"swapping\"}".parseJson.asJsObject
+
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "verify active swap by checking for host0 active=false" in {
+        val endpointURL = endpointPrefix + "0/active"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":false}".parseJson.asJsObject
+
+        Thread.sleep(3000)
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "verify active swap by checking for host1 active=true" in {
+        val endpointURL = endpointPrefix + "1/active"
+        val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host1\",\"active\":true}".parseJson.asJsObject
+
+        makeGetCallWithExpectedResult(endpointURL, expectedResult)
+    }
+
+    it should "fire alarms trigger again after the swap" in withAssetCleaner(wskprops) {
+        (wp, assetHelper) =>
+            implicit val wskprops = wp // shadow global props and make implicit
+        val triggerName = s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+            val packageName = "dummyAlarmsPackage"
+
+            // the package alarms should be there
+            val packageGetResult = wsk.pkg.get("/whisk.system/alarms")
+            println("fetched package alarms")
+            packageGetResult.stdout should include("ok")
+
+            // create package binding
+            assetHelper.withCleaner(wsk.pkg, packageName) {
+                (pkg, name) => pkg.bind("/whisk.system/alarms", name)
+            }
+
+            // create whisk stuff
+            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) {
+                (trigger, name) =>
+                    trigger.create(name, feed = Some(s"$packageName/alarm"), parameters = Map(
+                        "trigger_payload" -> "alarmTest".toJson,
+                        "cron" -> "* * * * * *".toJson))
+            }
+            feedCreationResult.stdout should include("ok")
+
+            println("waiting for triggers")
+            val activations = wsk.activation.pollFor(N = 3, Some(triggerName)).length
+            println(s"Found activation size (should be at least 3): $activations")
+            activations should be >= 3
+    }
+
+    private def makeGetCallWithExpectedResult(endpointURL: String, expectedResult: JsObject) = {
+        val response = RestAssured.
+                given().
+                config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+                get(endpointURL)
+        assert(response.statusCode() == 200)
+        var result = response.body.asString.parseJson.asJsObject
+        JsObject(result.fields - "message") shouldBe expectedResult
+    }
+
+    override def afterAll() {
+        //swap back to original configuration
+        RestAssured.
+                given().
+                config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+                get(endpointPrefix + "0/active?active=true")
+    }
+
+}