Disable triggers for invalid auth when using custom auth handler (#354)
* Disable triggers for invalid auth when using custom auth handler
* Fix bad indentation
* Call __shouldDisable on auth handler exception
* Always dump response on auth handler exception
* Use response.ok instead of checking status code
* Fix typo
diff --git a/provider/authHandler.py b/provider/authHandler.py
index 0325161..58b1ef0 100644
--- a/provider/authHandler.py
+++ b/provider/authHandler.py
@@ -23,6 +23,9 @@
from requests.auth import AuthBase
+class AuthHandlerException(Exception):
+ def __init__(self, response):
+ self.response = response
class IAMAuth(AuthBase):
@@ -35,18 +38,24 @@
r.headers['Authorization'] = 'Bearer {}'.format(self.__getToken())
return r
-
def __getToken(self):
if 'expires_in' not in self.tokenInfo or self.__isRefreshTokenExpired():
- self.tokenInfo = self.__requestToken()
- return self.tokenInfo['access_token']
+ response = self.__requestToken()
+ if response.ok and 'access_token' in response.json():
+ self.tokenInfo = response.json()
+ return self.tokenInfo['access_token']
+ else:
+ raise AuthHandlerException(response)
elif self.__isTokenExpired():
- self.tokenInfo = self.__refreshToken()
- return self.tokenInfo['access_token']
+ response = self.__refreshToken()
+ if response.ok and 'access_token' in response.json():
+ self.tokenInfo = response.json()
+ return self.tokenInfo['access_token']
+ else:
+ raise AuthHandlerException(response)
else:
return self.tokenInfo['access_token']
-
def __requestToken(self):
headers = {
'Content-type': 'application/x-www-form-urlencoded',
@@ -86,7 +95,7 @@
def __isRefreshTokenExpired(self):
if 'expiration' not in self.tokenInfo:
- return true
+ return True
sevenDays = 7 * 24 * 3600
currentTime = int(time.time())
@@ -96,4 +105,4 @@
def __sendRequest(self, payload, headers):
response = requests.post(self.endpoint, data=payload, headers=headers)
- return response.json()
+ return response
diff --git a/provider/consumer.py b/provider/consumer.py
index 74b243a..8742af3 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -32,6 +32,7 @@
from datetimeutils import secondsSince
from multiprocessing import Process, Manager
from urlparse import urlparse
+from authHandler import AuthHandlerException
from authHandler import IAMAuth
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta
@@ -409,44 +410,19 @@
self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
retry = False
elif self.__shouldDisable(status_code):
- logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code))
- response_dump = {
- 'request': {
- 'method': response.request.method,
- 'url': response.request.url,
- 'path_url': response.request.path_url,
- 'headers': response.request.headers,
- 'body': response.request.body
- },
- 'response': {
- 'status_code': response.status_code,
- 'ok': response.ok,
- 'reason': response.reason,
- 'url': response.url,
- 'headers': response.headers,
- 'content': response.content
- }
- }
-
- logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger, response_dump))
-
- # abandon all hope?
- self.setDesiredState(Consumer.State.Disabled)
- # mark it disabled in the DB
-
- # when failing to establish a database connection, mark the consumer as dead to restart the consumer
- try:
- self.database = Database()
- self.database.disableTrigger(self.trigger, status_code)
- except Exception as e:
- logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))
- self.__recordState(Consumer.State.Dead)
- finally:
- self.database.destroy()
-
retry = False
+ logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code))
+ self.__dumpRequestResponse(response)
+ self.__disableTrigger(status_code)
except requests.exceptions.RequestException as e:
logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger, e))
+ except AuthHandlerException as e:
+ logging.error("[{}] Encountered an exception from auth handler, status code {}").format(self.trigger, e.response.status_code)
+ self.__dumpRequestResponse(e.response)
+
+ if self.__shouldDisable(e.response.status_code):
+ retry = False
+ self.__disableTrigger(e.response.status_code)
if retry:
retry_count += 1
@@ -460,6 +436,40 @@
self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
retry = False
+ def __disableTrigger(self, status_code):
+ self.setDesiredState(Consumer.State.Disabled)
+
+ # when failing to establish a database connection, mark the consumer as dead to restart the consumer
+ try:
+ self.database = Database()
+ self.database.disableTrigger(self.trigger, status_code)
+ except Exception as e:
+ logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))
+ self.__recordState(Consumer.State.Dead)
+ finally:
+ self.database.destroy()
+
+ def __dumpRequestResponse(self, response):
+ response_dump = {
+ 'request': {
+ 'method': response.request.method,
+ 'url': response.request.url,
+ 'path_url': response.request.path_url,
+ 'headers': response.request.headers,
+ 'body': response.request.body
+ },
+ 'response': {
+ 'status_code': response.status_code,
+ 'ok': response.ok,
+ 'reason': response.reason,
+ 'url': response.url,
+ 'headers': response.headers,
+ 'content': response.content
+ }
+ }
+
+ logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger, response_dump))
+
# return the dict that will be sent as the trigger payload
def __getMessagePayload(self, message):
return {