| """Database class. |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| """ |
| |
| import logging |
| import os |
| import time |
| |
| from cloudant.client import CouchDB |
| from cloudant.client import CouchDatabase |
| from cloudant.result import Result |
| from datetime import datetime |
| |
| |
| class Database: |
| db_prefix = os.getenv('DB_PREFIX', '') |
| dbname = db_prefix + 'ow_kafka_triggers' |
| |
| username = os.environ['DB_USER'] |
| password = os.environ['DB_PASS'] |
| url = os.environ['DB_URL'] |
| |
| filters_design_doc_id = '_design/filters' |
| only_triggers_view_id = 'only-triggers' |
| by_worker_view_id = 'by-worker' |
| |
| instance = os.getenv('INSTANCE', 'messageHubTrigger-0') |
| canaryId = "canary-{}".format(instance) |
| |
| def __init__(self, timeout=None): |
| self.client = CouchDB(self.username, self.password, url=self.url, timeout=timeout, auto_renew=True) |
| self.client.connect() |
| |
| self.database = CouchDatabase(self.client, self.dbname) |
| |
| if self.database.exists(): |
| logging.info('Database exists - connecting to it.') |
| else: |
| logging.warn('Database does not exist - creating it.') |
| self.database.create() |
| |
| def destroy(self): |
| if self.client is not None: |
| self.client.disconnect() |
| self.client = None |
| |
| def disableTrigger(self, triggerFQN, status_code, message='Automatically disabled after receiving a {} status code when firing the trigger.'): |
| try: |
| document = self.database[triggerFQN] |
| |
| if document.exists(): |
| logging.info('Found trigger to disable from DB: {}'.format(triggerFQN)) |
| |
| status = { |
| 'active': False, |
| 'dateChanged': long(time.time() * 1000), |
| 'reason': { |
| 'kind': 'AUTO', |
| 'statusCode': status_code, |
| 'message': message.format(status_code) |
| } |
| } |
| |
| document['status'] = status |
| document.save() |
| |
| logging.info('{} Successfully recorded trigger as disabled.'.format(triggerFQN)) |
| except Exception as e: |
| logging.error('[{}] Uncaught exception while disabling trigger: {}'.format(triggerFQN, e)) |
| |
| def changesFeed(self, timeout, since=None): |
| if since == None: |
| return self.database.infinite_changes(include_docs=True, heartbeat=(timeout*1000)) |
| else: |
| return self.database.infinite_changes(include_docs=True, heartbeat=(timeout*1000), since=since) |
| |
| def createCanary(self): |
| maxRetries = 3 |
| retryCount = 0 |
| |
| while retryCount < maxRetries: |
| try: |
| if self.canaryId in self.database.keys(remote=True): |
| # update the timestamp to cause a document change |
| logging.debug("[database] Canary doc exists, updating it.") |
| |
| myCanaryDocument = self.database[self.canaryId] |
| myCanaryDocument["canary-timestamp"] = datetime.now().isoformat() |
| myCanaryDocument.save() |
| |
| return |
| else: |
| # create the canary doc for this instance |
| logging.debug("[database] Canary doc does not exist, creating it.") |
| |
| document = dict() |
| document['_id'] = self.canaryId |
| document['canary-timestamp'] = datetime.now().isoformat() |
| |
| result = self.database.create_document(document) |
| logging.debug('[canary] Successfully wrote canary to DB') |
| |
| return |
| except Exception as e: |
| retryCount += 1 |
| logging.error( |
| '[canary] Uncaught exception while writing canary document: {}'.format(e)) |
| |
| logging.error('[canary] Retried and failed {} times to create a canary'.format(maxRetries)) |
| |
| def migrate(self): |
| logging.info('Starting DB migration') |
| |
| by_worker_view = { |
| 'map': """function(doc) { |
| if(doc.triggerURL && (!doc.status || doc.status.active)) { |
| emit(doc.worker || 'worker0', 1); |
| } |
| }""", |
| 'reduce': '_count' |
| } |
| |
| filtersDesignDoc = self.database.get_design_document(self.filters_design_doc_id) |
| |
| if filtersDesignDoc.exists(): |
| if self.by_worker_view_id not in filtersDesignDoc["views"]: |
| filtersDesignDoc["views"][self.by_worker_view_id] = by_worker_view |
| logging.info('Updating the design doc') |
| filtersDesignDoc.save() |
| else: |
| logging.info('Creating the design doc') |
| |
| self.database.create_document({ |
| '_id': self.filters_design_doc_id, |
| 'views': { |
| self.only_triggers_view_id: { |
| 'map': """function (doc) { |
| if(doc.triggerURL) { |
| emit(doc._id, 1); |
| } |
| }""" |
| }, |
| self.by_worker_view_id: by_worker_view |
| } |
| }) |
| |
| logging.info('Database migration complete') |