blob: c861518abc94de0ebd8c00b1a58d1fb98c27eb35 [file] [log] [blame]
import logging
import time
from consumer import Consumer
from consumercollection import ConsumerCollection
from threading import Thread
class TheDoctor (Thread):
# maximum time to allow a consumer to not successfully poll() before restarting
poll_timeout_seconds = 20
# interval between the Doctor making rounds
sleepy_time_seconds = 2
def __init__(self, consumerCollection):
Thread.__init__(self)
self.daemon = True
self.consumerCollection = consumerCollection
def run(self):
logging.info('[Doctor] The Doctor is in!')
while True:
consumers = self.consumerCollection.getCopyForRead()
for consumerId in consumers:
consumer = consumers[consumerId]
logging.debug('[Doctor] [{}] Consumer is in state: {}'.format(consumerId, consumer.currentState()))
if consumer.currentState() is Consumer.State.Dead and consumer.desiredState() is Consumer.State.Running:
# well this is unexpected...
logging.error('[Doctor][{}] Consumer is dead, but should be alive!'.format(consumerId))
consumer.restart()
elif consumer.currentState() is Consumer.State.Dead and consumer.desiredState() is Consumer.State.Dead:
# Bring out yer dead...
logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger))
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and consumer.desiredState() is Consumer.State.Running:
# there seems to be an issue with the kafka-python client where it gets into an
# error-handling loop. This causes poll() to never complete, but also does not
# throw an exception.
logging.error('[Doctor][{}] Consumer timed-out, but should be alive! Restarting consumer.'.format(consumerId))
consumer.restart()
time.sleep(self.sleepy_time_seconds)