disable consumer after 5 failed attempts to connect to kafka brokers (#236)
diff --git a/provider/consumer.py b/provider/consumer.py
index f28c524..d80ad6b 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -131,6 +131,8 @@
self.brokers = params["brokers"]
self.topic = params["topic"]
+ self.authErrors = 0
+
self.sharedDictionary = sharedDictionary
if 'status' in params and params['status']['active'] == False:
@@ -253,7 +255,8 @@
'group.id': self.trigger,
'default.topic.config': {'auto.offset.reset': 'latest'},
'enable.auto.commit': False,
- 'api.version.request': True
+ 'api.version.request': True,
+ 'error_cb': self.__error_callback
}
if self.isMessageHub:
@@ -445,3 +448,12 @@
logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return key
+
+ def __error_callback(self, error):
+ logging.warning(error)
+ if error.code() == KafkaError._AUTHENTICATION:
+ self.authErrors = self.authErrors + 1
+ if self.authErrors > 5:
+ self.setDesiredState(Consumer.State.Disabled)
+ message = 'Automatically disabled trigger. Consumer failed to authentication with broker(s) more than 5 times with apikey {}:{}'.format(self.username, self.password)
+ self.database.disableTrigger(self.trigger, 403, message)
diff --git a/provider/database.py b/provider/database.py
index 77dd4c7..68ee8a3 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -58,7 +58,7 @@
self.client.disconnect()
self.client = None
- def disableTrigger(self, triggerFQN, status_code):
+ def disableTrigger(self, triggerFQN, status_code, message='Automatically disabled after receiving a {} status code when firing the trigger.'):
try:
document = self.database[triggerFQN]
@@ -71,7 +71,7 @@
'reason': {
'kind': 'AUTO',
'statusCode': status_code,
- 'message': 'Automatically disabled after receiving a {} status code when firing the trigger.'.format(status_code)
+ 'message': message.format(status_code)
}
}