blob: 23ceb4028793c62cb4b40ac0fdc4d0794b47788f [file] [log] [blame]
# Copyright 2016 IBM Corp. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import requests
import time
# HEADS UP! I'm importing confluent_kafka.Consumer as KafkaConsumer to avoid a
# naming conflict with my own Consumer class
from confluent_kafka import Consumer as KafkaConsumer, KafkaError
from database import Database
from datetime import datetime
from threading import Thread, Lock
local_dev = os.getenv('LOCAL_DEV', 'False')
check_ssl = (local_dev == 'False')
class Consumer:
class State:
Initializing = 'Initializing'
Running = 'Running'
Stopping = 'Stopping'
Restart = 'Restart'
Dead = 'Dead'
Disabled = 'Disabled'
def __init__(self, trigger, params):
self.trigger = trigger
self.params = params
self.thread = ConsumerThread(trigger, params)
# the following fields can be accessed from multiple threads
# access needs to be protected with this Lock
self.__lock = Lock()
self.__restartCount = 0
# this is weird.
# The app needs this tho...
self.triggerURL = params["triggerURL"]
def currentState(self):
return self.thread.currentState()
def desiredState(self):
return self.thread.desiredState()
def shutdown(self):
self.thread.shutdown()
def start(self):
self.thread.start()
# should only be called by the Doctor thread
def restart(self):
if self.thread.desiredState() is Consumer.State.Dead:
logging.info('[{}] Request to restart a consumer that is already slated for deletion.'.format(self.trigger))
return
with self.__lock:
self.__restartCount += 1
logging.info('[{}] Quietly shutting down consumer for restart'.format(self.trigger))
self.thread.setDesiredState(Consumer.State.Restart)
self.thread.join()
logging.info('Consumer has shut down')
# user may have interleaved a request to delete the trigger, check again
if self.thread.desiredState() is not Consumer.State.Dead:
logging.info('[{}] Starting new consumer thread'.format(self.trigger))
self.thread = ConsumerThread(self.trigger, self.params)
self.thread.start()
def restartCount(self):
with self.__lock:
restartCount = self.__restartCount
return restartCount
def lastPoll(self):
return self.thread.lastPoll()
def secondsSinceLastPoll(self):
return self.thread.secondsSinceLastPoll()
class ConsumerThread (Thread):
retry_timeout = 1 # Timeout in seconds
max_retries = 10 # Maximum number of times to retry firing trigger
database = Database()
def __init__(self, trigger, params):
Thread.__init__(self)
self.lock = Lock()
# the following params may be set/read from other threads
# only access through helper methods which handle thread safety!
if 'status' in params and params['status']['active'] == False:
self.__currentState = Consumer.State.Disabled
self.__desiredState = Consumer.State.Disabled
else:
self.__currentState = Consumer.State.Initializing
self.__desiredState = Consumer.State.Running
self.__lastPoll = datetime.max
self.daemon = True
self.trigger = trigger
self.isMessageHub = params["isMessageHub"]
self.triggerURL = params["triggerURL"]
self.brokers = params["brokers"]
self.topic = params["topic"]
if self.isMessageHub:
self.username = params["username"]
self.password = params["password"]
# handle the case where there may be existing triggers that do not
# have the isJSONData field set
if "isJSONData" in params:
self.encodeValueAsJSON = params["isJSONData"]
else:
self.encodeValueAsJSON = False
if "isBinaryValue" in params:
self.encodeValueAsBase64 = params["isBinaryValue"]
else:
self.encodeValueAsBase64 = False
if "isBinaryKey" in params:
self.encodeKeyAsBase64 = params["isBinaryKey"]
else:
self.encodeKeyAsBase64 = False
# always init consumer to None in case the consumer needs to shut down
# before the KafkaConsumer is fully initialized/assigned
self.consumer = None
# this only records the current state, and does not affect a state transition
def __recordState(self, newState):
with self.lock:
self.__currentState = newState
def currentState(self):
with self.lock:
state = self.__currentState
return state
def setDesiredState(self, newState):
logging.info('[{}] Request to set desiredState to {}'.format(self.trigger, newState))
with self.lock:
if self.__desiredState is Consumer.State.Dead and newState is not Consumer.State.Dead:
logging.info('[{}] Asking to kill a consumer that is already marked for death. Doing nothing.'.format(self.trigger))
return
else:
logging.info('[{}] Setting desiredState to: {}'.format(self.trigger, newState))
self.__desiredState = newState
def desiredState(self):
with self.lock:
state = self.__desiredState
return state
# convenience method for checking if desiredState is Running
def __shouldRun(self):
return self.desiredState() is Consumer.State.Running
def lastPoll(self):
with self.lock:
lastPoll = self.__lastPoll
return lastPoll
def updateLastPoll(self):
with self.lock:
self.__lastPoll = datetime.now()
def secondsSinceLastPoll(self):
lastPollDelta = datetime.now() - self.lastPoll()
return lastPollDelta.total_seconds()
def run(self):
try:
self.consumer = self.__createConsumer()
while self.__shouldRun():
messages = self.__pollForMessages()
if len(messages) > 0:
self.__fireTrigger(messages)
logging.info("[{}] Consumer exiting main loop".format(self.trigger))
except Exception as e:
logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))
if self.desiredState() == Consumer.State.Dead:
logging.info('[{}] Permanently killing consumer because desired state is Dead'.format(self.trigger))
self.database.deleteTrigger(self.trigger)
elif self.desiredState() == Consumer.State.Restart:
logging.info('[{}] Quietly letting the consumer thread stop in order to allow restart.'.format(self.trigger))
# nothing else to do because this Thread is about to go away
elif self.desiredState() == Consumer.State.Disabled:
logging.info('[{}] Quietly letting the consumer thread stop in order to disable the feed.'.format(self.trigger))
else:
# uh-oh... this really shouldn't happen
logging.error('[{}] Consumer stopped without being asked'.format(self.trigger))
try:
if self.consumer is not None:
logging.info('[{}] Cleaning up consumer'.format(self.trigger))
logging.debug('[{}] Closing KafkaConsumer'.format(self.trigger))
self.consumer.unsubscribe()
self.consumer.close()
logging.info('[{}] Successfully closed KafkaConsumer'.format(self.trigger))
logging.debug('[{}] Dellocating KafkaConsumer'.format(self.trigger))
self.consumer = None
logging.info('[{}] Successfully cleaned up consumer'.format(self.trigger))
except Exception as e:
logging.error('[{}] Uncaught exception while shutting down consumer: {}'.format(self.trigger, e))
finally:
logging.info('[{}] Recording consumer as {}. Bye bye!'.format(self.trigger, self.desiredState()))
self.__recordState(self.desiredState())
def __createConsumer(self):
if self.__shouldRun():
config = {'metadata.broker.list': ','.join(self.brokers),
'group.id': self.trigger,
'default.topic.config': {'auto.offset.reset': 'latest'},
'enable.auto.commit': False,
'api.version.request': True
}
if self.isMessageHub:
# append Message Hub specific config
config.update({'ssl.ca.location': '/etc/ssl/certs/',
'sasl.mechanisms': 'PLAIN',
'sasl.username': self.username,
'sasl.password': self.password,
'security.protocol': 'sasl_ssl'
})
consumer = KafkaConsumer(config)
consumer.subscribe([self.topic])
logging.info("[{}] Now listening in order to fire trigger".format(self.trigger))
return consumer
def __pollForMessages(self):
messages = []
messageSize = 0
batchMessages = True
if self.__shouldRun():
while batchMessages and (self.secondsSinceLastPoll() < 2):
message = self.consumer.poll(1.0)
if self.secondsSinceLastPoll() < 0:
logging.info('[{}] Completed first poll'.format(self.trigger))
self.__recordState(Consumer.State.Running)
if (message is not None):
if not message.error():
logging.debug("Consumed message: {}".format(str(message)))
messageSize += len(message.value())
messages.append(message)
elif message.error().code() != KafkaError._PARTITION_EOF:
logging.error('[{}] Error polling: {}'.format(self.trigger, message.error()))
batchMessages = False
else:
logging.debug('[{}] No more messages. Stopping batch op.'.format(self.trigger))
batchMessages = False
else:
logging.debug('[{}] message was None. Stopping batch op.'.format(self.trigger))
batchMessages = False
logging.debug('[{}] Completed poll'.format(self.trigger))
if len(messages) > 0:
logging.info("[{}] Found {} messages with a total size of {} bytes".format(self.trigger, len(messages), messageSize))
self.updateLastPoll()
return messages
# decide whether or not to disable a trigger based on the status code returned
# from firing the trigger. Specifically, disable on all 4xx status codes
# except 408 (gateway timeout) and 429 (throttle)
def __shouldDisable(self, status_code):
return status_code in range(400, 500) and status_code not in [408, 429]
def __fireTrigger(self, messages):
if self.__shouldRun():
lastMessage = messages[len(messages) - 1]
# I'm sure there is a much more clever way to do this ;)
mappedMessages = []
for message in messages:
fieldsToSend = {
'value': self.__encodeMessageIfNeeded(message.value()),
'topic': message.topic(),
'partition': message.partition(),
'offset': message.offset(),
'key': self.__encodeKeyIfNeeded(message.key())
}
mappedMessages.append(fieldsToSend)
payload = {}
payload['messages'] = mappedMessages
retry = True
retry_count = 0
logging.info("[{}] Firing trigger with {} messages".format(self.trigger,len(messages)))
while retry:
try:
response = requests.post(self.triggerURL, json=payload, timeout=10.0, verify=check_ssl)
status_code = response.status_code
logging.info("[{}] Repsonse status code {}".format(self.trigger, status_code))
# Manually commit offset if the trigger was fired successfully. Retry firing the trigger
# for a select set of status codes
if status_code == 200:
logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response.json()['activationId']))
self.consumer.commit()
retry = False
elif self.__shouldDisable(status_code):
logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code))
# abandon all hope?
self.setDesiredState(Consumer.State.Disabled)
# mark it disabled in the DB
self.database.disableTrigger(self.trigger, status_code)
retry = False
except requests.exceptions.RequestException as e:
logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger, e))
if retry:
retry_count += 1
if retry_count < self.max_retries:
logging.info("[{}] Retrying in {} second(s)".format(
self.trigger, self.retry_timeout))
time.sleep(self.retry_timeout)
else:
logging.warn("[{}] Skipping {} messages to offset {} of partition {}".format(self.trigger, len(messages), lastMessage.offset(), lastMessage.partition()))
self.consumer.commit()
retry = False
def shutdown(self):
if self.currentState() != Consumer.State.Stopping and self.currentState() != Consumer.State.Dead:
logging.info("[{}] Shutting down consumer for trigger".format(self.trigger))
self.__recordState(Consumer.State.Stopping)
self.setDesiredState(Consumer.State.Dead)
else:
logging.info("[{}] Ignoring request to shutdown consumer for trigger as it is already shutting down".format(self.trigger))
def __encodeMessageIfNeeded(self, value):
if self.encodeValueAsJSON:
try:
parsed = json.loads(value)
logging.debug('[{}] Successfully encoded a message as JSON.'.format(self.trigger))
return parsed
except ValueError:
# no big deal, just return the original value
logging.warn('[{}] I was asked to encode a message as JSON, but I failed.'.format(self.trigger))
pass
elif self.encodeValueAsBase64:
try:
parsed = value.encode("base64").strip()
logging.debug('[{}] Successfully encoded a binary message.'.format(self.trigger))
return parsed
except:
logging.warn('[{}] Unable to encode a binary message.'.format(self.trigger))
pass
logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return value
def __encodeKeyIfNeeded(self, key):
if self.encodeKeyAsBase64:
try:
parsed = key.encode("base64").strip()
logging.debug('[{}] Successfully encoded a binary key.'.format(self.trigger))
return parsed
except:
logging.warn('[{}] Unable to encode a binary key.'.format(self.trigger))
pass
logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return key