Add Redis support for managing active state (#104)
* Add Redis support for managing active state
* prevent duplicate active hosts
diff --git a/.gitignore b/.gitignore
index 7f6823b..ede5a15 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
.gradle
build/
+node_modules/
+action/*.zip
\ No newline at end of file
diff --git a/actions/changes.js b/actions/changes.js
index 1c30ba8..7115024 100644
--- a/actions/changes.js
+++ b/actions/changes.js
@@ -1,7 +1,6 @@
var request = require('request');
function main(msg) {
- console.log("cloudant trigger feed: ", msg);
// for creation -> CREATE
// for deletion -> DELETE
diff --git a/package.json b/package.json
index 1932a43..7eaeff6 100644
--- a/package.json
+++ b/package.json
@@ -16,6 +16,8 @@
"nano": "^6.2.0",
"json-stringify-safe": "^5.0.1",
"http-status-codes": "^1.0.5",
- "request-promise": "^1.0.2"
+ "request-promise": "^1.0.2",
+ "redis":"^2.7.1",
+ "bluebird": "^3.5.0"
}
}
\ No newline at end of file
diff --git a/provider/app.js b/provider/app.js
index 5af7424..75a8883 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -7,6 +7,7 @@
var express = require('express');
var request = require('request');
var bodyParser = require('body-parser');
+var bluebird = require('bluebird');
var logger = require('./Logger');
var ProviderUtils = require('./lib/utils.js');
@@ -32,15 +33,16 @@
var dbProtocol = process.env.DB_PROTOCOL;
var dbPrefix = process.env.DB_PREFIX;
var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
+var redisUrl = process.env.REDIS_URL;
var ddname = '_design/triggers';
// Create the Provider Server
var server = http.createServer(app);
-server.listen(app.get('port'), function(){
+server.listen(app.get('port'), function() {
logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
});
-function createDatabase (nanop) {
+function createDatabase(nanop) {
var method = 'createDatabase';
logger.info(method, 'creating the trigger database');
@@ -80,19 +82,45 @@
});
}
-function createTriggerDb () {
+function createTriggerDb() {
var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' + dbHost);
if (nanop !== null) {
- return createDatabase (nanop);
+ return createDatabase(nanop);
}
else {
Promise.reject('nano provider did not get created. check db URL: ' + dbHost);
}
}
+function createRedisClient() {
+ var method = 'createRedisClient';
+
+ return new Promise(function(resolve, reject) {
+ if (redisUrl) {
+ var redis = require('redis');
+ bluebird.promisifyAll(redis.RedisClient.prototype);
+ var client = redis.createClient(redisUrl);
+
+ client.on("connect", function () {
+ resolve(client);
+ });
+
+ client.on("error", function (err) {
+ logger.error(method, 'Error creating redis', err);
+ reject(err);
+ });
+ }
+ else {
+ resolve();
+ }
+ });
+}
+
// Initialize the Provider Server
function init(server) {
var method = 'init';
+ var nanoDb;
+ var providerUtils;
if (server !== null) {
var address = server.address();
@@ -103,25 +131,30 @@
}
createTriggerDb()
- .then(nanoDb => {
- logger.info(method, 'trigger storage database details:', nanoDb);
+ .then(db => {
+ nanoDb = db;
+ return createRedisClient();
+ })
+ .then(client => {
+ providerUtils = new ProviderUtils(logger, nanoDb, client);
+ return providerUtils.initRedis();
+ })
+ .then(() => {
+ var providerRAS = new ProviderRAS();
+ var providerHealth = new ProviderHealth(providerUtils);
+ var providerActivation = new ProviderActivation(logger, providerUtils);
- var providerUtils = new ProviderUtils(logger, nanoDb);
- var providerRAS = new ProviderRAS();
- var providerHealth = new ProviderHealth(providerUtils);
- var providerActivation = new ProviderActivation(logger, providerUtils);
+ // RAS Endpoint
+ app.get(providerRAS.endPoint, providerRAS.ras);
- // RAS Endpoint
- app.get(providerRAS.endPoint, providerRAS.ras);
+ // Health Endpoint
+ app.get(providerHealth.endPoint, providerHealth.health);
- // Health Endpoint
- app.get(providerHealth.endPoint, providerHealth.health);
+ // Activation Endpoint
+ app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
- // Activation Endpoint
- app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
-
- providerUtils.initAllTriggers();
- }).catch(err => {
+ providerUtils.initAllTriggers();
+ }).catch(err => {
logger.error(method, 'an error occurred creating database:', err);
});
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 82c29f0..be9ee88 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -1,33 +1,60 @@
-module.exports = function(logger, providerUtils) {
+module.exports = function(logger, utils) {
- // Active Endpoint
- this.endPoint = '/active';
+ // Active Endpoint
+ this.endPoint = '/active';
- this.active = function (req, res) {
- var method = 'active';
- var response = {};
+ this.active = function(req, res) {
+ var method = 'active';
- if (req.query && req.query.active) {
- var errorMessage = "Invalid query string";
- try {
- var active = JSON.parse(req.query.active);
- if (typeof active !== 'boolean') {
- response.error = errorMessage;
- }
- else if (providerUtils.active !== active) {
- var message = 'The active state has been changed';
- logger.info(method, message, 'to', active);
- providerUtils.active = active;
- response.message = message;
- }
- }
- catch (e) {
- response.error = errorMessage;
- }
- }
- response.active = providerUtils.active;
- response.worker = providerUtils.worker;
- res.send(response);
- };
+ var response = {
+ worker: utils.worker,
+ host: utils.host,
+ active: utils.host === utils.activeHost
+ };
+
+ if (req.query && req.query.active) {
+ var query = req.query.active.toLowerCase();
+
+ if (query !== 'true' && query !== 'false') {
+ response.error = "Invalid query string";
+ res.send(response);
+ return;
+ }
+
+ var redundantHost = utils.host === 'host0' ? 'host1' : 'host0';
+ var activeHost = query === 'true' ? utils.host : redundantHost;
+ if (utils.activeHost !== activeHost) {
+ if (utils.redisClient) {
+ utils.redisClient.hsetAsync(utils.redisHash, utils.redisKey, activeHost)
+ .then(() => {
+ response.active = 'swapping';
+ utils.redisClient.publish(utils.redisHash, activeHost);
+ var msg = 'Active host swap in progress';
+ logger.info(method, msg);
+ response.message = msg;
+ res.send(response);
+ })
+ .catch(err => {
+ response.error = err;
+ res.send(response);
+ });
+ }
+ else {
+ response.active = utils.host === activeHost;
+ utils.activeHost = activeHost;
+ var message = 'The active state has changed';
+ logger.info(method, message, 'to', activeHost);
+ response.message = message;
+ res.send(response);
+ }
+ }
+ else {
+ res.send(response);
+ }
+ }
+ else {
+ res.send(response);
+ }
+ };
};
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index d8f11b8..810ea62 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -2,10 +2,12 @@
const DEFAULT_MAX_TRIGGERS = -1;
const RETRY_ATTEMPTS = 12;
const RETRY_DELAY = 1000; //in milliseconds
+const REDIS_KEY = 'active';
module.exports = {
TRIGGER_DB_SUFFIX: TRIGGER_DB_SUFFIX,
DEFAULT_MAX_TRIGGERS: DEFAULT_MAX_TRIGGERS,
RETRY_ATTEMPTS: RETRY_ATTEMPTS,
- RETRY_DELAY: RETRY_DELAY
+ RETRY_DELAY: RETRY_DELAY,
+ REDIS_KEY: REDIS_KEY
};
diff --git a/provider/lib/ras.js b/provider/lib/ras.js
index c5bf9c2..2b7d352 100644
--- a/provider/lib/ras.js
+++ b/provider/lib/ras.js
@@ -1,4 +1,4 @@
-module.exports = function(logger) {
+module.exports = function() {
// Test Endpoint
this.endPoint = '/ping';
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index db5c18e..68b9d9c 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -5,14 +5,19 @@
module.exports = function(
logger,
- triggerDB
+ triggerDB,
+ redisClient
) {
this.module = 'utils';
this.triggers = {};
this.endpointAuth = process.env.ENDPOINT_AUTH;
this.routerHost = process.env.ROUTER_HOST || 'localhost';
- this.active = !(process.env.ACTIVE && process.env.ACTIVE.toLowerCase() === 'false');
this.worker = process.env.WORKER || "worker0";
+ this.host = process.env.ACTIVE !== undefined && process.env.ACTIVE.toLowerCase() === 'false' ? 'host1' : 'host0';
+ this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
+ this.redisClient = redisClient;
+ this.redisHash = triggerDB.config.db + '_' + this.worker;
+ this.redisKey = constants.REDIS_KEY;
this.retryAttempts = constants.RETRY_ATTEMPTS;
@@ -50,7 +55,7 @@
utils.triggers[dataTrigger.id] = dataTrigger;
feed.on('change', function (change) {
- if (utils.active) {
+ if (utils.activeHost === utils.host) {
logger.info(method, 'Trigger', dataTrigger.id, 'got change from', dataTrigger.dbname);
var triggerHandle = utils.triggers[dataTrigger.id];
@@ -87,7 +92,7 @@
};
- this.initTrigger = function (newTrigger) {
+ this.initTrigger = function(newTrigger) {
var method = 'initTrigger';
logger.info(method, 'create trigger', newTrigger.id, 'with the following args', newTrigger);
@@ -113,16 +118,16 @@
return trigger;
};
- this.shouldDisableTrigger = function (statusCode) {
+ this.shouldDisableTrigger = function(statusCode) {
return ((statusCode >= 400 && statusCode < 500) &&
[HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
};
- this.disableTrigger = function (id, statusCode, message) {
+ this.disableTrigger = function(id, statusCode, message) {
var method = 'disableTrigger';
//only active/master provider should update the database
- if (utils.active) {
+ if (utils.activeHost === utils.host) {
triggerDB.get(id, function (err, existing) {
if (!err) {
if (!existing.status || existing.status.active === true) {
@@ -152,7 +157,7 @@
};
// Delete a trigger: stop listening for changes and remove it.
- this.deleteTrigger = function (triggerIdentifier) {
+ this.deleteTrigger = function(triggerIdentifier) {
var method = 'deleteTrigger';
if (utils.triggers[triggerIdentifier].feed) {
@@ -163,7 +168,7 @@
logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory');
};
- this.fireTrigger = function (triggerIdentifier, change) {
+ this.fireTrigger = function(triggerIdentifier, change) {
var method = 'fireTrigger';
var dataTrigger = utils.triggers[triggerIdentifier];
@@ -190,7 +195,7 @@
});
};
- this.postTrigger = function (dataTrigger, form, uri, auth, retryCount) {
+ this.postTrigger = function(dataTrigger, form, uri, auth, retryCount) {
var method = 'postTrigger';
return new Promise(function(resolve, reject) {
@@ -246,7 +251,7 @@
});
};
- this.initAllTriggers = function () {
+ this.initAllTriggers = function() {
var method = 'initAllTriggers';
logger.info(method, 'resetting system from last state');
@@ -302,7 +307,7 @@
});
};
- this.setupFollow = function setupFollow(seq) {
+ this.setupFollow = function(seq) {
var method = 'setupFollow';
var feed = triggerDB.follow({ since: seq, include_docs: true, filter: ddname + '/' + filter, query_params: { worker: utils.worker } });
@@ -361,7 +366,6 @@
var endpointAuth = utils.endpointAuth.split(':');
if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
- logger.info(method, 'Authentication successful');
next();
}
else {
@@ -374,12 +378,12 @@
}
};
- this.sendError = function (method, code, message, res) {
+ this.sendError = function(method, code, message, res) {
logger.error(method, message);
res.status(code).json({error: message});
};
- this.parseQName = function (qname, separator) {
+ this.parseQName = function(qname, separator) {
var parsed = {};
var delimiter = separator || ':';
var defaultNamespace = '_';
@@ -394,4 +398,51 @@
return parsed;
};
+ this.initRedis = function() {
+ var method = 'initRedis';
+
+ return new Promise(function(resolve, reject) {
+
+ if (redisClient) {
+ var subscriber = redisClient.duplicate();
+
+ //create a subscriber client that listens for requests to perform swap
+ subscriber.on("message", function (channel, message) {
+ if (message === 'host0' || message === 'host1') {
+ logger.info(method, message, "set to active host in channel", channel);
+ utils.activeHost = message;
+ }
+ });
+
+ subscriber.subscribe(utils.redisHash);
+
+ redisClient.hgetAsync(utils.redisHash, utils.redisKey)
+ .then(activeHost => {
+ return utils.initActiveHost(activeHost);
+ })
+ .then(resolve)
+ .catch(err => {
+ reject(err);
+ });
+ }
+ else {
+ resolve();
+ }
+ });
+ };
+
+ this.initActiveHost = function(activeHost) {
+ var method = 'initActiveHost';
+
+ if (activeHost === null) {
+ //initialize redis key with active host
+ logger.info(method, 'redis hset', utils.redisHash, utils.redisKey, utils.activeHost);
+ return redisClient.hsetAsync(utils.redisHash, utils.redisKey, utils.activeHost);
+ }
+ else {
+ utils.activeHost = activeHost;
+ return Promise.resolve();
+ }
+ };
+
};
diff --git a/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala b/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala
new file mode 100644
index 0000000..6cc69f8
--- /dev/null
+++ b/tests/src/test/scala/system/redundancy/CloudantRedundancyTests.scala
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.redundancy
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import common.{WhiskProperties, Wsk, WskProps, WskTestHelpers}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.{pimpAny, _}
+import system.CloudantUtil
+
+/**
+ * These tests verify that a cloudant redundancy (master/slave) configuration
+ * works as expected. They will only run properly in an environment with two
+ * alarms containers running concurrently and env var ACTIVE set to true in
+ * one container and false in the other. This test also assumes that redis and
+ * the active endpoint authorization are configured. For the auth set the
+ * ENDPOINT_AUTH env var in your containers to match the testing.auth property
+ * found in your whisk.properties. To configure redis simply set the REDIS_URL
+ * env var in your containers to point to the openwhisk redis container and make
+ * sure the container is deployed. You can run redis.yml to deploy it.
+ */
+@RunWith(classOf[JUnitRunner])
+class CloudantRedundancyTests
+ extends FlatSpec
+ with Matchers
+ with BeforeAndAfterAll
+ with WskTestHelpers {
+
+ val wskprops = WskProps()
+ val wsk = new Wsk
+ val myCloudantCreds = CloudantUtil.Credential.makeFromVCAPFile("cloudantNoSQLDB", this.getClass.getSimpleName)
+ var edgeHost = WhiskProperties.getEdgeHost()
+ val auth = WhiskProperties.getBasicAuth
+ val user = auth.fst
+ val password = auth.snd
+
+ var endpointPrefix = s"https://$user:$password@$edgeHost/cloudanttrigger/worker0/"
+
+ behavior of "Cloudant redundancy tests"
+
+ it should "fire cloudant trigger before the swap" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ // the package cloudant should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("fetched package cloudant")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> myCloudantCreds.user.toJson,
+ "password" -> myCloudantCreds.password.toJson,
+ "host" -> myCloudantCreds.host().toJson,
+ "dbname" -> myCloudantCreds.dbname.toJson))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ Thread.sleep(3000)
+
+ // create a test doc in the sample db
+ println("create a test doc and wait for trigger")
+ CloudantUtil.createDocument(myCloudantCreds, "{\"test\":\"test_doc1\"}")
+
+ // get activation list of the trigger, expecting exactly 1
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+ println(s"Found activation size (should be exactly 1): $activations")
+ withClue("Change feed trigger count: ") { activations should be(1) }
+
+ // delete the whisk trigger, which must also delete the feed
+ wsk.trigger.delete(triggerName)
+ } finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
+
+ it should "perform active swap by setting host0 active=false" in {
+ val endpointURL = endpointPrefix + "0/active?active=false"
+ val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":\"swapping\"}".parseJson.asJsObject
+
+ makeGetCallWithExpectedResult(endpointURL, expectedResult)
+ }
+
+ it should "verify active swap by checking for host0 active=false" in {
+ val endpointURL = endpointPrefix + "0/active"
+ val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host0\",\"active\":false}".parseJson.asJsObject
+
+ Thread.sleep(3000)
+ makeGetCallWithExpectedResult(endpointURL, expectedResult)
+ }
+
+ it should "verify active swap by checking for host1 active=true" in {
+ val endpointURL = endpointPrefix + "1/active"
+ val expectedResult = "{\"worker\":\"worker0\",\"host\":\"host1\",\"active\":true}".parseJson.asJsObject
+
+ makeGetCallWithExpectedResult(endpointURL, expectedResult)
+ }
+
+ it should "fire cloudant trigger again after the swap" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ implicit val wskprops = wp // shadow global props and make implicit
+ val triggerName = s"dummyCloudantTrigger-${System.currentTimeMillis}"
+ val packageName = "dummyCloudantPackage"
+ val feed = "changes"
+
+ try {
+ CloudantUtil.setUp(myCloudantCreds)
+
+ // the package cloudant should be there
+ val packageGetResult = wsk.pkg.get("/whisk.system/cloudant")
+ println("fetched package cloudant")
+ packageGetResult.stdout should include("ok")
+
+ // create package binding
+ assetHelper.withCleaner(wsk.pkg, packageName) {
+ (pkg, name) => pkg.bind("/whisk.system/cloudant", name)
+ }
+
+ // create whisk stuff
+ val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName, confirmDelete = false) {
+ (trigger, name) =>
+ trigger.create(name, feed = Some(s"$packageName/$feed"), parameters = Map(
+ "username" -> myCloudantCreds.user.toJson,
+ "password" -> myCloudantCreds.password.toJson,
+ "host" -> myCloudantCreds.host().toJson,
+ "dbname" -> myCloudantCreds.dbname.toJson))
+ }
+ feedCreationResult.stdout should include("ok")
+
+ Thread.sleep(3000)
+
+ // create a test doc in the sample db
+ println("create a test doc and wait for trigger")
+ CloudantUtil.createDocument(myCloudantCreds, "{\"test\":\"test_doc1\"}")
+
+ // get activation list of the trigger, expecting exactly 1
+ val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = 30).length
+ println(s"Found activation size (should be exactly 1): $activations")
+ withClue("Change feed trigger count: ") { activations should be(1) }
+
+ // delete the whisk trigger, which must also delete the feed
+ wsk.trigger.delete(triggerName)
+ } finally {
+ CloudantUtil.unsetUp(myCloudantCreds)
+ }
+ }
+
+ private def makeGetCallWithExpectedResult(endpointURL: String, expectedResult: JsObject) = {
+ val response = RestAssured.
+ given().
+ config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+ get(endpointURL)
+ assert(response.statusCode() == 200)
+ var result = response.body.asString.parseJson.asJsObject
+ JsObject(result.fields - "message") shouldBe expectedResult
+ }
+
+ override def afterAll() {
+ //swap back to original configuration
+ RestAssured.
+ given().
+ config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())).
+ get(endpointPrefix + "0/active?active=true")
+ }
+
+}