blob: ae970bb4c02996e4cbac212d323a38ece50348ec [file] [log] [blame]
"""Service class, CanaryDocumentGenerator class.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import logging
import os
import time
from consumer import Consumer
from database import Database
from datetime import datetime
from datetimeutils import secondsSince
from requests.exceptions import ConnectionError, ReadTimeout
from threading import Thread
# How often to produce canary documents
canaryInterval = 60 # seconds
# How long the changes feed should poll before timing out
changesFeedTimeout = 30 # seconds
class Service (Thread):
def __init__(self, consumers):
self.daemon = True
self.database = None
self.lastSequence = None
self.canaryGenerator = CanaryDocumentGenerator()
self.consumers = consumers
self.workerId = os.getenv("WORKER", "worker0")
def run(self):
while True:
if self.database is not None:"Shutting down existing DB client")
self.database.destroy()"Starting changes feed")
self.database = Database(timeout=changesFeedTimeout)
self.changes = self.database.changesFeed(timeout=changesFeedTimeout, since=self.lastSequence)
self.lastCanaryTime =
for change in self.changes:
# change could be None because the changes feed will timeout
# if it hasn't detected any changes. This timeout allows us to
# check whether or not the feed is capable of detecting canary
# documents
if change != None:
# Record the sequence in case the changes feed needs to be
# restarted. This way the new feed can pick up right where
# the old one left off.
self.lastSequence = change['seq']
if "deleted" in change and change["deleted"] == True:'[changes] Found a delete')
consumer = self.consumers.getConsumerForTrigger(change['id'])
if consumer != None:
if consumer.desiredState() == Consumer.State.Disabled:
# just remove it from memory'[{}] Removing disabled trigger'.format(consumer.trigger))
else:'[{}] Shutting down running trigger'.format(consumer.trigger))
# since we can't use a filter function for the feed (then
# you don't get deletes) we need to manually verify this
# is a valid trigger doc that has changed
elif 'triggerURL' in change['doc']:'[changes] Found a change in a trigger document')
document = change['doc']
triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
if not self.consumers.hasConsumerForTrigger(change["id"]):
if triggerIsAssignedToMe:'[{}] Found a new trigger to create'.format(change["id"]))
else:"[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
# running trigger should become disabled
# this should be done regardless of which worker the document claims to be assigned to'[{}] Existing running trigger should become disabled'.format(change["id"]))
elif triggerIsAssignedToMe:'[{}] Found a change to an existing trigger'.format(change["id"]))
if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
# disabled trigger has become active'[{}] Existing disabled trigger should become active'.format(change["id"]))
# trigger has become reassigned to a different worker"[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
elif 'canary-timestamp' in change['doc']:
# found a canary - update lastCanaryTime'[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
self.lastCanaryTime =
logging.debug('[changes] Found a change for a non-trigger document')
except Exception as e:
logging.error('[canary] Exception caught from changes feed. Restarting changes feed...')
logging.debug("[changes] I made it out of the changes loop!")
def __isTriggerDocAssignedToMe(self, doc):
if "worker" in doc:
return doc["worker"] == self.workerId
return self.workerId == "worker0"
def stopChangesFeed(self):
if self.changes != None:
self.changes = None
def createAndRunConsumer(self, doc):
triggerFQN = doc['_id']
# Create a representation for this trigger, even if it is disabled
# This allows it to appear in /health as well as allow it to be deleted
# Creating this object is lightweight and does not initialize any connections
consumer = Consumer(triggerFQN, doc)
self.consumers.addConsumerForTrigger(triggerFQN, consumer)
if self.__isTriggerDocActive(doc):'[{}] Trigger was determined to be active, starting...'.format(triggerFQN))
else:'[{}] Trigger was determined to be disabled, not starting...'.format(triggerFQN))
def __isTriggerDocActive(self, doc):
return ('status' not in doc or doc['status']['active'] == True)
class CanaryDocumentGenerator (Thread):
def __init__(self):
self.daemon = True
self.database = Database()
def run(self):
while True:
# create a new canary document every so often
logging.error('[canary generator] Exited the main loop!')