Message Hub IAM Integration (#284)


* add authHandler for managing iam token

* ensure iam url is added to validated params in web action

* validated params

* updates to iam integration

* package node_modules in web zip

* remove logging import from auth handler
diff --git a/.gitignore b/.gitignore
index bc22306..15822ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,9 @@
 action/*.zip
 action/package.json
 tests/build
+action/node_modules/
+action/package-lock.json
+package-lock.json
 
 .idea/
 out/
\ No newline at end of file
diff --git a/action/kafkaFeedWeb.js b/action/kafkaFeedWeb.js
index 7d1350b..2c8a386 100644
--- a/action/kafkaFeedWeb.js
+++ b/action/kafkaFeedWeb.js
@@ -30,7 +30,7 @@
                     // do these in parallel!
                     return Promise.all([
                         db.ensureTriggerIsUnique(validatedParams.triggerName),
-                        common.verifyTriggerAuth(validatedParams.triggerURL)
+                        verifyTriggerAuth(validatedParams.triggerURL, params.authKey)
                     ]);
                 })
                 .then(() => {
@@ -63,9 +63,9 @@
                     resolve(common.webResponse(statusCode, body));
                 });
         } else if (params.__ow_method === "get") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.getTrigger(params.triggerName);
@@ -93,9 +93,9 @@
                     resolve(common.webResponse(500, error.toString()));
                 });
         } else if (params.__ow_method === "put") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.getTrigger(params.triggerName);
@@ -123,9 +123,9 @@
                     resolve(common.webResponse(statusCode, body));
                 });
         } else if (params.__ow_method === "delete") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.deleteTrigger(params.triggerName);
@@ -178,4 +178,9 @@
     return promise;
 }
 
+function verifyTriggerAuth(triggerURL, apiKey) {
+    var auth = apiKey.split(':');
+    return common.verifyTriggerAuth(triggerURL, { user: auth[0], pass: auth[1] });
+}
+
 exports.main = main;
diff --git a/action/lib/common.js b/action/lib/common.js
index a24f9b5..5220c90 100644
--- a/action/lib/common.js
+++ b/action/lib/common.js
@@ -9,19 +9,19 @@
     };
 }
 
-function getTriggerURL(authKey, endpoint, triggerName) {
+function getTriggerURL(endpoint, triggerName) {
     var massagedAPIHost = endpoint.replace(/https?:\/\/(.*)/, "$1");
 
     var components = triggerComponents(triggerName);
     var namespace = components.namespace;
     var trigger = components.triggerName;
 
-    var url = `https://${authKey}@${massagedAPIHost}/api/v1/namespaces/${encodeURIComponent(namespace)}/triggers/${encodeURIComponent(trigger)}`;
+    var url = `https://${massagedAPIHost}/api/v1/namespaces/${encodeURIComponent(namespace)}/triggers/${encodeURIComponent(trigger)}`;
 
     return url;
 }
 
-function verifyTriggerAuth(triggerURL) {
+function verifyTriggerAuth(triggerURL, auth) {
     var options = {
         method: 'GET',
         url: triggerURL,
@@ -29,7 +29,8 @@
         headers: {
             'Content-Type': 'application/json',
             'User-Agent': 'whisk'
-        }
+        },
+        auth: auth
     };
 
     return request(options)
@@ -205,7 +206,7 @@
     // now that everything else is valid, let's add these
     validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
     validatedParams.authKey = rawParams.authKey;
-    validatedParams.triggerURL = getTriggerURL(validatedParams.authKey, rawParams.endpoint, rawParams.triggerName);
+    validatedParams.triggerURL = getTriggerURL(rawParams.endpoint, rawParams.triggerName);
 
     const uuid = require('uuid');
     validatedParams.uuid = uuid.v4();
diff --git a/action/messageHubFeed.js b/action/messageHubFeed.js
index 66b3124..4b4638e 100644
--- a/action/messageHubFeed.js
+++ b/action/messageHubFeed.js
@@ -18,6 +18,14 @@
     var massagedParams = common.massageParamsForWeb(params);
     massagedParams.triggerName = common.getTriggerFQN(params.triggerName);
 
+    var iamKey = process.env.__OW_IAM_NAMESPACE_API_KEY;
+    massagedParams.authKey = iamKey || process.env.__OW_API_KEY;
+    massagedParams.isIamKey = iamKey != undefined;
+
+    if (massagedParams.isIamKey) {
+        massagedParams.iamUrl = process.env.__OW_IAM_API_URL;
+    }
+
     if (params.lifecycleEvent === 'CREATE') {
         return common.createTrigger(endpoint, massagedParams, webActionName);
     } else if (params.lifecycleEvent === 'READ') {
diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 0ad270b..1f64515 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -1,5 +1,6 @@
 const common = require('./lib/common');
 const Database = require('./lib/Database');
+const itm = require('@ibm-functions/iam-token-manager');
 var moment = require('moment');
 
 /**
@@ -32,7 +33,7 @@
                     // do these in parallel!
                     return Promise.all([
                         db.ensureTriggerIsUnique(validatedParams.triggerName),
-                        common.verifyTriggerAuth(validatedParams.triggerURL),
+                        verifyTriggerAuth(validatedParams.triggerURL, params.authKey, params.isIamKey, params.iamUrl),
                         checkMessageHubCredentials(validatedParams)
                     ]);
                 })
@@ -66,9 +67,9 @@
                     resolve(common.webResponse(statusCode, body));
                 });
         } else if (params.__ow_method === "get") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.getTrigger(params.triggerName);
@@ -99,9 +100,9 @@
                     resolve(common.webResponse(500, error.toString()));
                 });
         } else if (params.__ow_method === "put") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.getTrigger(params.triggerName);
@@ -129,9 +130,9 @@
                     resolve(common.webResponse(statusCode, body));
                 });
         } else if (params.__ow_method === "delete") {
-            const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
+            const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
 
-            return common.verifyTriggerAuth(triggerURL)
+            return verifyTriggerAuth(triggerURL, params.authKey, params.isIamKey, params.iamUrl)
                 .then(() => {
                     db = new Database(params.DB_URL, params.DB_NAME);
                     return db.deleteTrigger(params.triggerName);
@@ -162,6 +163,16 @@
             return;
         } else {
             validatedParams = commonValidationResult.validatedParams;
+
+            if (rawParams.isIamKey != undefined) {
+                validatedParams.isIamKey = rawParams.isIamKey;
+            } else {
+                validatedParams.isIamKey = false
+            }
+
+            if (rawParams.iamUrl) {
+                validatedParams.iamUrl = rawParams.iamUrl;
+            }
         }
 
         // kafka_brokers_sasl
@@ -243,4 +254,13 @@
         });
 }
 
+function verifyTriggerAuth(triggerURL, apiKey, isIamKey, iamUrl) {
+    if (isIamKey) {
+        return new itm({ 'iamApikey': apiKey, 'iamUrl': iamUrl }).getToken().then( token => common.verifyTriggerAuth(triggerURL, { bearer: token }));
+    } else {
+        var auth = apiKey.split(':');
+        return common.verifyTriggerAuth(triggerURL, { user: auth[0], pass: auth[1] });
+    }
+}
+
 exports.main = main;
diff --git a/action/messageHubFeedWeb_package.json b/action/messageHubFeedWeb_package.json
index 7735d19..00b159d 100644
--- a/action/messageHubFeedWeb_package.json
+++ b/action/messageHubFeedWeb_package.json
@@ -1,5 +1,8 @@
 {
   "name": "messageHubFeedWeb",
   "version": "1.0.0",
-  "main": "messageHubFeedWeb.js"
+  "main": "messageHubFeedWeb.js",
+  "dependencies": {
+    "@ibm-functions/iam-token-manager": "^1.0.0"
+  }
 }
diff --git a/installCatalog.sh b/installCatalog.sh
index 3976dc5..855e6be 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -93,7 +93,8 @@
 fi
 
 cp -f messageHubFeedWeb_package.json package.json
-zip -r messageHubFeedWeb.zip lib package.json messageHubFeedWeb.js
+npm install
+zip -r messageHubFeedWeb.zip lib package.json messageHubFeedWeb.js node_modules
 
 cd $OLD_PATH
 
diff --git a/provider/authHandler.py b/provider/authHandler.py
new file mode 100644
index 0000000..0325161
--- /dev/null
+++ b/provider/authHandler.py
@@ -0,0 +1,99 @@
+"""IAMAuth class.
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+"""
+
+import requests
+import time
+
+from requests.auth import AuthBase
+
+
+class IAMAuth(AuthBase):
+
+    def __init__(self, authKey, endpoint):
+        self.authKey = authKey
+        self.endpoint = endpoint
+        self.tokenInfo = {}
+
+    def __call__(self, r):
+        r.headers['Authorization'] = 'Bearer {}'.format(self.__getToken())
+        return r
+
+
+    def __getToken(self):
+        if 'expires_in' not in self.tokenInfo or self.__isRefreshTokenExpired():
+            self.tokenInfo = self.__requestToken()
+            return self.tokenInfo['access_token']
+        elif self.__isTokenExpired():
+            self.tokenInfo = self.__refreshToken()
+            return self.tokenInfo['access_token']
+        else:
+            return self.tokenInfo['access_token']
+
+
+    def __requestToken(self):
+        headers = {
+            'Content-type': 'application/x-www-form-urlencoded',
+            'Authorization': 'Basic Yng6Yng='
+        }
+        payload = {
+            'grant_type': 'urn:ibm:params:oauth:grant-type:apikey',
+            'apikey': self.authKey
+        }
+
+        return self.__sendRequest(payload, headers)
+
+    def __refreshToken(self):
+        headers = {
+            'Content-type': 'application/x-www-form-urlencoded',
+            'Authorization': 'Basic Yng6Yng='
+        }
+        payload = {
+            'grant_type': 'refresh_token',
+            'refresh_token': self.tokenInfo['refresh_token']
+        }
+
+        return self.__sendRequest(payload, headers)
+
+
+    def __isTokenExpired(self):
+        if 'expires_in' not in self.tokenInfo or 'expiration' not in self.tokenInfo:
+            return True
+
+        fractionOfTtl = 0.8
+        timeToLive = self.tokenInfo['expires_in']
+        expireTime = self.tokenInfo['expiration']
+        currentTime = int(time.time())
+        refreshTime = expireTime - (timeToLive * (1.0 - fractionOfTtl))
+
+        return refreshTime < currentTime
+
+    def __isRefreshTokenExpired(self):
+        if 'expiration' not in self.tokenInfo:
+            return true
+
+        sevenDays = 7 * 24 * 3600
+        currentTime = int(time.time())
+        newTokenTime = self.tokenInfo['expiration'] + sevenDays
+
+        return newTokenTime < currentTime
+
+    def __sendRequest(self, payload, headers):
+        response = requests.post(self.endpoint, data=payload, headers=headers)
+        return response.json()
diff --git a/provider/consumer.py b/provider/consumer.py
index 4bf62cf..1b6bf6f 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -32,6 +32,8 @@
 from datetimeutils import secondsSince
 from multiprocessing import Process, Manager
 from urlparse import urlparse
+from authHandler import IAMAuth
+from requests.auth import HTTPBasicAuth
 
 local_dev = os.getenv('LOCAL_DEV', 'False')
 payload_limit = int(os.getenv('PAYLOAD_LIMIT', 900000))
@@ -145,6 +147,12 @@
             self.username = params["username"]
             self.password = params["password"]
 
+        if 'isIamKey' in params and params['isIamKey'] == True:
+            self.authHandler = IAMAuth(params['authKey'], params['iamUrl'])
+        else:
+            auth = params['authKey'].split(':')
+            self.authHandler = HTTPBasicAuth(auth[0], auth[1])
+
         # handle the case where there may be existing triggers that do not
         # have the isJSONData field set
         if "isJSONData" in params:
@@ -362,7 +370,7 @@
 
             while retry:
                 try:
-                    response = requests.post(self.triggerURL, json=payload, timeout=10.0, verify=check_ssl)
+                    response = requests.post(self.triggerURL, json=payload, auth=self.authHandler, timeout=10.0, verify=check_ssl)
                     status_code = response.status_code
                     logging.info("[{}] Repsonse status code {}".format(self.trigger, status_code))