| # Copyright 2016 IBM Corp. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import logging |
| import os |
| import requests |
| import time |
| |
| # HEADS UP! I'm importing confluent_kafka.Consumer as KafkaConsumer to avoid a |
| # naming conflict with my own Consumer class |
| from confluent_kafka import Consumer as KafkaConsumer, KafkaError |
| from database import Database |
| from datetime import datetime |
| from threading import Thread, Lock |
| |
| local_dev = os.getenv('LOCAL_DEV', 'False') |
| check_ssl = (local_dev == 'False') |
| |
| class Consumer: |
| class State: |
| Initializing = 'Initializing' |
| Running = 'Running' |
| Stopping = 'Stopping' |
| Restart = 'Restart' |
| Dead = 'Dead' |
| Disabled = 'Disabled' |
| |
| def __init__(self, trigger, params): |
| self.trigger = trigger |
| self.params = params |
| self.thread = ConsumerThread(trigger, params) |
| |
| # the following fields can be accessed from multiple threads |
| # access needs to be protected with this Lock |
| self.__lock = Lock() |
| self.__restartCount = 0 |
| |
| # this is weird. |
| # The app needs this tho... |
| self.triggerURL = params["triggerURL"] |
| |
| def currentState(self): |
| return self.thread.currentState() |
| |
| def desiredState(self): |
| return self.thread.desiredState() |
| |
| def shutdown(self): |
| self.thread.shutdown() |
| |
| def start(self): |
| self.thread.start() |
| |
| # should only be called by the Doctor thread |
| def restart(self): |
| if self.thread.desiredState() is Consumer.State.Dead: |
| logging.info('[{}] Request to restart a consumer that is already slated for deletion.'.format(self.trigger)) |
| return |
| |
| with self.__lock: |
| self.__restartCount += 1 |
| |
| logging.info('[{}] Quietly shutting down consumer for restart'.format(self.trigger)) |
| self.thread.setDesiredState(Consumer.State.Restart) |
| self.thread.join() |
| logging.info('Consumer has shut down') |
| |
| # user may have interleaved a request to delete the trigger, check again |
| if self.thread.desiredState() is not Consumer.State.Dead: |
| logging.info('[{}] Starting new consumer thread'.format(self.trigger)) |
| self.thread = ConsumerThread(self.trigger, self.params) |
| self.thread.start() |
| |
| def restartCount(self): |
| with self.__lock: |
| restartCount = self.__restartCount |
| |
| return restartCount |
| |
| def lastPoll(self): |
| return self.thread.lastPoll() |
| |
| def secondsSinceLastPoll(self): |
| return self.thread.secondsSinceLastPoll() |
| |
| |
| class ConsumerThread (Thread): |
| |
| retry_timeout = 1 # Timeout in seconds |
| max_retries = 10 # Maximum number of times to retry firing trigger |
| |
| database = Database() |
| |
| def __init__(self, trigger, params): |
| Thread.__init__(self) |
| |
| self.lock = Lock() |
| |
| # the following params may be set/read from other threads |
| # only access through helper methods which handle thread safety! |
| if 'status' in params and params['status']['active'] == False: |
| self.__currentState = Consumer.State.Disabled |
| self.__desiredState = Consumer.State.Disabled |
| else: |
| self.__currentState = Consumer.State.Initializing |
| self.__desiredState = Consumer.State.Running |
| self.__lastPoll = datetime.max |
| |
| self.daemon = True |
| self.trigger = trigger |
| self.isMessageHub = params["isMessageHub"] |
| self.triggerURL = params["triggerURL"] |
| self.brokers = params["brokers"] |
| self.topic = params["topic"] |
| |
| if self.isMessageHub: |
| self.username = params["username"] |
| self.password = params["password"] |
| |
| # handle the case where there may be existing triggers that do not |
| # have the isJSONData field set |
| if "isJSONData" in params: |
| self.encodeValueAsJSON = params["isJSONData"] |
| else: |
| self.encodeValueAsJSON = False |
| |
| if "isBinaryValue" in params: |
| self.encodeValueAsBase64 = params["isBinaryValue"] |
| else: |
| self.encodeValueAsBase64 = False |
| |
| if "isBinaryKey" in params: |
| self.encodeKeyAsBase64 = params["isBinaryKey"] |
| else: |
| self.encodeKeyAsBase64 = False |
| |
| # always init consumer to None in case the consumer needs to shut down |
| # before the KafkaConsumer is fully initialized/assigned |
| self.consumer = None |
| |
| # this only records the current state, and does not affect a state transition |
| def __recordState(self, newState): |
| with self.lock: |
| self.__currentState = newState |
| |
| def currentState(self): |
| with self.lock: |
| state = self.__currentState |
| |
| return state |
| |
| def setDesiredState(self, newState): |
| logging.info('[{}] Request to set desiredState to {}'.format(self.trigger, newState)) |
| |
| with self.lock: |
| if self.__desiredState is Consumer.State.Dead and newState is not Consumer.State.Dead: |
| logging.info('[{}] Asking to kill a consumer that is already marked for death. Doing nothing.'.format(self.trigger)) |
| return |
| else: |
| logging.info('[{}] Setting desiredState to: {}'.format(self.trigger, newState)) |
| self.__desiredState = newState |
| |
| def desiredState(self): |
| with self.lock: |
| state = self.__desiredState |
| |
| return state |
| |
| # convenience method for checking if desiredState is Running |
| def __shouldRun(self): |
| return self.desiredState() is Consumer.State.Running |
| |
| def lastPoll(self): |
| with self.lock: |
| lastPoll = self.__lastPoll |
| |
| return lastPoll |
| |
| def updateLastPoll(self): |
| with self.lock: |
| self.__lastPoll = datetime.now() |
| |
| def secondsSinceLastPoll(self): |
| lastPollDelta = datetime.now() - self.lastPoll() |
| return lastPollDelta.total_seconds() |
| |
| def run(self): |
| try: |
| self.consumer = self.__createConsumer() |
| |
| while self.__shouldRun(): |
| messages = self.__pollForMessages() |
| |
| if len(messages) > 0: |
| self.__fireTrigger(messages) |
| |
| logging.info("[{}] Consumer exiting main loop".format(self.trigger)) |
| except Exception as e: |
| logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e)) |
| |
| if self.desiredState() == Consumer.State.Dead: |
| logging.info('[{}] Permanently killing consumer because desired state is Dead'.format(self.trigger)) |
| self.database.deleteTrigger(self.trigger) |
| elif self.desiredState() == Consumer.State.Restart: |
| logging.info('[{}] Quietly letting the consumer thread stop in order to allow restart.'.format(self.trigger)) |
| # nothing else to do because this Thread is about to go away |
| elif self.desiredState() == Consumer.State.Disabled: |
| logging.info('[{}] Quietly letting the consumer thread stop in order to disable the feed.'.format(self.trigger)) |
| else: |
| # uh-oh... this really shouldn't happen |
| logging.error('[{}] Consumer stopped without being asked'.format(self.trigger)) |
| |
| try: |
| if self.consumer is not None: |
| logging.info('[{}] Cleaning up consumer'.format(self.trigger)) |
| logging.debug('[{}] Closing KafkaConsumer'.format(self.trigger)) |
| self.consumer.unsubscribe() |
| self.consumer.close() |
| logging.info('[{}] Successfully closed KafkaConsumer'.format(self.trigger)) |
| |
| logging.debug('[{}] Dellocating KafkaConsumer'.format(self.trigger)) |
| self.consumer = None |
| logging.info('[{}] Successfully cleaned up consumer'.format(self.trigger)) |
| except Exception as e: |
| logging.error('[{}] Uncaught exception while shutting down consumer: {}'.format(self.trigger, e)) |
| finally: |
| logging.info('[{}] Recording consumer as {}. Bye bye!'.format(self.trigger, self.desiredState())) |
| self.__recordState(self.desiredState()) |
| |
| def __createConsumer(self): |
| if self.__shouldRun(): |
| config = {'metadata.broker.list': ','.join(self.brokers), |
| 'group.id': self.trigger, |
| 'default.topic.config': {'auto.offset.reset': 'latest'}, |
| 'enable.auto.commit': False, |
| 'api.version.request': True |
| } |
| |
| if self.isMessageHub: |
| # append Message Hub specific config |
| config.update({'ssl.ca.location': '/etc/ssl/certs/', |
| 'sasl.mechanisms': 'PLAIN', |
| 'sasl.username': self.username, |
| 'sasl.password': self.password, |
| 'security.protocol': 'sasl_ssl' |
| }) |
| |
| consumer = KafkaConsumer(config) |
| consumer.subscribe([self.topic]) |
| logging.info("[{}] Now listening in order to fire trigger".format(self.trigger)) |
| return consumer |
| |
| def __pollForMessages(self): |
| messages = [] |
| messageSize = 0 |
| batchMessages = True |
| |
| if self.__shouldRun(): |
| while batchMessages and (self.secondsSinceLastPoll() < 2): |
| message = self.consumer.poll(1.0) |
| |
| if self.secondsSinceLastPoll() < 0: |
| logging.info('[{}] Completed first poll'.format(self.trigger)) |
| self.__recordState(Consumer.State.Running) |
| |
| if (message is not None): |
| if not message.error(): |
| logging.debug("Consumed message: {}".format(str(message))) |
| messageSize += len(message.value()) |
| messages.append(message) |
| elif message.error().code() != KafkaError._PARTITION_EOF: |
| logging.error('[{}] Error polling: {}'.format(self.trigger, message.error())) |
| batchMessages = False |
| else: |
| logging.debug('[{}] No more messages. Stopping batch op.'.format(self.trigger)) |
| batchMessages = False |
| else: |
| logging.debug('[{}] message was None. Stopping batch op.'.format(self.trigger)) |
| batchMessages = False |
| |
| logging.debug('[{}] Completed poll'.format(self.trigger)) |
| |
| if len(messages) > 0: |
| logging.info("[{}] Found {} messages with a total size of {} bytes".format(self.trigger, len(messages), messageSize)) |
| |
| self.updateLastPoll() |
| return messages |
| |
| # decide whether or not to disable a trigger based on the status code returned |
| # from firing the trigger. Specifically, disable on all 4xx status codes |
| # except 408 (gateway timeout) and 429 (throttle) |
| def __shouldDisable(self, status_code): |
| return status_code in range(400, 500) and status_code not in [408, 429] |
| |
| def __fireTrigger(self, messages): |
| if self.__shouldRun(): |
| lastMessage = messages[len(messages) - 1] |
| |
| # I'm sure there is a much more clever way to do this ;) |
| mappedMessages = [] |
| for message in messages: |
| fieldsToSend = { |
| 'value': self.__encodeMessageIfNeeded(message.value()), |
| 'topic': message.topic(), |
| 'partition': message.partition(), |
| 'offset': message.offset(), |
| 'key': self.__encodeKeyIfNeeded(message.key()) |
| } |
| mappedMessages.append(fieldsToSend) |
| |
| payload = {} |
| payload['messages'] = mappedMessages |
| retry = True |
| retry_count = 0 |
| |
| logging.info("[{}] Firing trigger with {} messages".format(self.trigger,len(messages))) |
| |
| while retry: |
| try: |
| response = requests.post(self.triggerURL, json=payload, timeout=10.0, verify=check_ssl) |
| status_code = response.status_code |
| logging.info("[{}] Repsonse status code {}".format(self.trigger, status_code)) |
| |
| # Manually commit offset if the trigger was fired successfully. Retry firing the trigger |
| # for a select set of status codes |
| if status_code == 200: |
| logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response.json()['activationId'])) |
| self.consumer.commit() |
| retry = False |
| elif self.__shouldDisable(status_code): |
| logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code)) |
| |
| # abandon all hope? |
| self.setDesiredState(Consumer.State.Disabled) |
| # mark it disabled in the DB |
| self.database.disableTrigger(self.trigger, status_code) |
| retry = False |
| except requests.exceptions.RequestException as e: |
| logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger, e)) |
| |
| if retry: |
| retry_count += 1 |
| |
| if retry_count < self.max_retries: |
| logging.info("[{}] Retrying in {} second(s)".format( |
| self.trigger, self.retry_timeout)) |
| time.sleep(self.retry_timeout) |
| else: |
| logging.warn("[{}] Skipping {} messages to offset {} of partition {}".format(self.trigger, len(messages), lastMessage.offset(), lastMessage.partition())) |
| self.consumer.commit() |
| retry = False |
| |
| def shutdown(self): |
| if self.currentState() != Consumer.State.Stopping and self.currentState() != Consumer.State.Dead: |
| logging.info("[{}] Shutting down consumer for trigger".format(self.trigger)) |
| self.__recordState(Consumer.State.Stopping) |
| self.setDesiredState(Consumer.State.Dead) |
| else: |
| logging.info("[{}] Ignoring request to shutdown consumer for trigger as it is already shutting down".format(self.trigger)) |
| |
| def __encodeMessageIfNeeded(self, value): |
| if self.encodeValueAsJSON: |
| try: |
| parsed = json.loads(value) |
| logging.debug('[{}] Successfully encoded a message as JSON.'.format(self.trigger)) |
| return parsed |
| except ValueError: |
| # no big deal, just return the original value |
| logging.warn('[{}] I was asked to encode a message as JSON, but I failed.'.format(self.trigger)) |
| pass |
| elif self.encodeValueAsBase64: |
| try: |
| parsed = value.encode("base64").strip() |
| logging.debug('[{}] Successfully encoded a binary message.'.format(self.trigger)) |
| return parsed |
| except: |
| logging.warn('[{}] Unable to encode a binary message.'.format(self.trigger)) |
| pass |
| |
| logging.debug('[{}] Returning un-encoded message'.format(self.trigger)) |
| return value |
| |
| def __encodeKeyIfNeeded(self, key): |
| if self.encodeKeyAsBase64: |
| try: |
| parsed = key.encode("base64").strip() |
| logging.debug('[{}] Successfully encoded a binary key.'.format(self.trigger)) |
| return parsed |
| except: |
| logging.warn('[{}] Unable to encode a binary key.'.format(self.trigger)) |
| pass |
| |
| logging.debug('[{}] Returning un-encoded message'.format(self.trigger)) |
| return key |