Remove error callback (#250)
* remove error callback that disables valid consumers
* removed unused variables
diff --git a/provider/consumer.py b/provider/consumer.py
index 55735bd..a8427d0 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -131,20 +131,6 @@
self.brokers = params["brokers"]
self.topic = params["topic"]
- self.authErrors = 0
-
- # We want to account for the number of brokers when deciding the maximum number
- # number of auth errors to allow
- self.maxAuthErrors = len(self.brokers) * 30
-
- # There is the possibility of being disconnected from one or more brokers while
- # still maintaining a connection to one or more others. We'll use this flag to
- # signal when we have been disconnected from all brokers. Value will be set to
- # 'True' when we have received a partition assignment and 'False' when our
- # partition assignment has been revoked. When disconnected we will begin to
- # increment the 'authErrors' counter.
- self.connected = False
-
self.sharedDictionary = sharedDictionary
if 'status' in params and params['status']['active'] == False:
@@ -267,8 +253,7 @@
'group.id': self.trigger,
'default.topic.config': {'auto.offset.reset': 'latest'},
'enable.auto.commit': False,
- 'api.version.request': True,
- 'error_cb': self.__error_callback
+ 'api.version.request': True
}
if self.isMessageHub:
@@ -488,20 +473,8 @@
logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return key
- def __error_callback(self, error):
- if not self.connected and error.code() == KafkaError._AUTHENTICATION:
- self.authErrors = self.authErrors + 1
- if self.authErrors > self.maxAuthErrors:
- logging.warning('[{}] Shutting down consumer and disabling trigger. Exceeded the allowable number of _AUTHENTICATION errors'.format(self.trigger))
- self.setDesiredState(Consumer.State.Disabled)
- message = 'Automatically disabled trigger. Consumer was unable to connect to broker(s) after 30 attempts'.format()
- self.database.disableTrigger(self.trigger, 403, message)
-
def __on_assign(self, consumer, partitions):
logging.info('[{}] Completed partition assignment. Connected to broker(s)'.format(self.trigger))
- self.authErrors = 0
- self.connected = True
def __on_revoke(self, consumer, partitions):
logging.info('[{}] Partition assignment has been revoked. Disconnected from broker(s)'.format(self.trigger))
- self.connected = False