blob: 03ae5fc208474ff6ccbf0b2a6a8c9d43960bc270 [file] [log] [blame]
import ssl
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
def main(params):
validationResult = validateParams(params)
if validationResult[0] != True:
return {'error': validationResult[1]}
brokers = params['brokers']
try:
producer = KafkaProducer(
api_version_auto_timeout_ms=15000,
bootstrap_servers=brokers)
print "Created producer"
# only use the key parameter if it is present
if 'key' in params:
producer.send(params['topic'], bytes(params['value']), key=bytes(params['key']))
else:
producer.send(params['topic'], bytes(params['value'])
producer.flush()
print "Sent message"
except NoBrokersAvailable:
# this exception's message is a little too generic
return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
except Exception as e:
return {'error': '{}'.format(e)}
return {"success": True}
def validateParams(params):
requiredParams = ['brokers', 'topic', 'value']
actualParams = params.keys()
missingParams = []
for requiredParam in requiredParams:
if requiredParam not in params:
missingParams.append(requiredParam)
if len(missingParams) > 0:
return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
else:
return (True, "Params all check out.")