Retry failed database changes (#365)
diff --git a/provider/service.py b/provider/service.py
index b8a272f..26e7909 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,69 +68,7 @@
# check whether or not the feed is capable of detecting canary
# documents
if change != None:
- if "deleted" in change and change["deleted"] == True:
- logging.info('[changes] Found a delete')
- consumer = self.consumers.getConsumerForTrigger(change['id'])
- if consumer != None:
- if consumer.desiredState() == Consumer.State.Disabled:
- # just remove it from memory
- logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
- self.consumers.removeConsumerForTrigger(consumer.trigger)
- else:
- logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
- consumer.shutdown()
- # since we can't use a filter function for the feed (then
- # you don't get deletes) we need to manually verify this
- # is a valid trigger doc that has changed
- elif 'triggerURL' in change['doc']:
- logging.info('[changes] Found a change in a trigger document')
- document = change['doc']
- triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
-
- if not self.consumers.hasConsumerForTrigger(change["id"]):
- if triggerIsAssignedToMe:
- logging.info('[{}] Found a new trigger to create'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
- else:
- existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
-
- if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
- # running trigger should become disabled
- # this should be done regardless of which worker the document claims to be assigned to
- logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
- existingConsumer.disable()
- elif triggerIsAssignedToMe:
- logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
-
- if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
- # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
- # so we need to forcefully delete the process before recreating it.
- logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))
-
- if existingConsumer.process.is_alive():
- logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
- existingConsumer.process.join(1)
- else:
- logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
-
- self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
- self.createAndRunConsumer(document)
- elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
- # disabled trigger has become active
- logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- # trigger has become reassigned to a different worker
- logging.info("[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
- existingConsumer.shutdown()
- elif 'canary-timestamp' in change['doc']:
- # found a canary - update lastCanaryTime
- logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
- self.lastCanaryTime = datetime.now()
- else:
- logging.debug('[changes] Found a change for a non-trigger document')
+ self.__handleDocChange(change)
# Record the sequence in case the changes feed needs to be
# restarted. This way the new feed can pick up right where
@@ -143,6 +81,89 @@
logging.debug("[changes] I made it out of the changes loop!")
+ def __handleDocChange(self, change):
+ retry = True
+ retryCount = 0
+ maxRetries = 5
+
+ while retry:
+ try:
+ if "deleted" in change and change["deleted"] == True:
+ logging.info('[changes] Found a delete')
+ consumer = self.consumers.getConsumerForTrigger(change['id'])
+ if consumer != None:
+ if consumer.desiredState() == Consumer.State.Disabled:
+ # just remove it from memory
+ logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
+ self.consumers.removeConsumerForTrigger(consumer.trigger)
+ else:
+ logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
+ consumer.shutdown()
+ # since we can't use a filter function for the feed (then
+ # you don't get deletes) we need to manually verify this
+ # is a valid trigger doc that has changed
+ elif 'triggerURL' in change['doc']:
+ logging.info('[changes] Found a change in a trigger document')
+ document = change['doc']
+ triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
+
+ if not self.consumers.hasConsumerForTrigger(change["id"]):
+ if triggerIsAssignedToMe:
+ logging.info('[{}] Found a new trigger to create'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ else:
+ logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
+ else:
+ existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
+
+ if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
+ # running trigger should become disabled
+ # this should be done regardless of which worker the document claims to be assigned to
+ logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
+ existingConsumer.disable()
+ elif triggerIsAssignedToMe:
+ logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
+
+ if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
+ # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
+ # so we need to forcefully delete the process before recreating it.
+ logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))
+
+ if existingConsumer.process.is_alive():
+ logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
+ existingConsumer.process.join(1)
+ else:
+ logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
+
+ self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ # disabled trigger has become active
+ logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ else:
+ # trigger has become reassigned to a different worker
+ logging.info("[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
+ existingConsumer.shutdown()
+ elif 'canary-timestamp' in change['doc']:
+ # found a canary - update lastCanaryTime
+ logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+ self.lastCanaryTime = datetime.now()
+ else:
+ logging.debug('[changes] Found a change for a non-trigger document')
+
+ retry = False
+ except Exception as e:
+ logging.error('[{}] Exception caught while handling change.'.format(change["id"]))
+ logging.error(e)
+
+ if retry:
+ retryCount += 1
+
+ if retryCount >= maxRetries:
+ logging.warn('[{}] Maximum number of retries exceeded for failed change.'.format(change["id"]))
+ retry = False
+
def __isTriggerDocAssignedToMe(self, doc):
if "worker" in doc:
return doc["worker"] == self.workerId