disable consumer after 5 failed attempts to connect to kafka brokers (#236)


diff --git a/provider/consumer.py b/provider/consumer.py
index f28c524..d80ad6b 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -131,6 +131,8 @@
         self.brokers = params["brokers"]
         self.topic = params["topic"]
 
+        self.authErrors = 0
+
         self.sharedDictionary = sharedDictionary
 
         if 'status' in params and params['status']['active'] == False:
@@ -253,7 +255,8 @@
                         'group.id': self.trigger,
                         'default.topic.config': {'auto.offset.reset': 'latest'},
                         'enable.auto.commit': False,
-                        'api.version.request': True
+                        'api.version.request': True,
+                        'error_cb': self.__error_callback
                     }
 
             if self.isMessageHub:
@@ -445,3 +448,12 @@
 
         logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
         return key
+
+    def __error_callback(self, error):
+        logging.warning(error)
+        if error.code() == KafkaError._AUTHENTICATION:
+            self.authErrors = self.authErrors + 1
+            if self.authErrors > 5:
+                self.setDesiredState(Consumer.State.Disabled)
+                message = 'Automatically disabled trigger. Consumer failed to authentication with broker(s) more than 5 times with apikey {}:{}'.format(self.username, self.password)
+                self.database.disableTrigger(self.trigger, 403, message)
diff --git a/provider/database.py b/provider/database.py
index 77dd4c7..68ee8a3 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -58,7 +58,7 @@
             self.client.disconnect()
             self.client = None
 
-    def disableTrigger(self, triggerFQN, status_code):
+    def disableTrigger(self, triggerFQN, status_code, message='Automatically disabled after receiving a {} status code when firing the trigger.'):
         try:
             document = self.database[triggerFQN]
 
@@ -71,7 +71,7 @@
                     'reason': {
                         'kind': 'AUTO',
                         'statusCode': status_code,
-                        'message': 'Automatically disabled after receiving a {} status code when firing the trigger.'.format(status_code)
+                        'message': message.format(status_code)
                     }
                 }