blob: e01a5ec887e677b3c4f95d45b3a0f74f4540c82e [file] [log] [blame]
import ssl
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
def main(params):
validationResult = validateParams(params)
if validationResult[0] != True:
return {'error': validationResult[1]}
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
try:
producer = KafkaProducer(
api_version_auto_timeout_ms=15000,
bootstrap_servers=params['kafka_brokers_sasl'],
sasl_plain_username=params['user'],
sasl_plain_password=params['password'],
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism)
print "Created producer"
# only use the key parameter if it is present
if 'key' in params:
producer.send(params['topic'], bytes(params['value']), key=bytes(params['key']))
else:
producer.send(params['topic'], bytes(params['value']))
producer.flush()
print "Sent message"
except NoBrokersAvailable:
# this exception's message is a little too generic
return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'}
except Exception as e:
return {'error': '{}'.format(e)}
return {"success": True}
def validateParams(params):
requiredParams = ['kafka_brokers_sasl', 'user', 'password', 'topic', 'value']
actualParams = params.keys()
missingParams = []
for requiredParam in requiredParams:
if requiredParam not in params:
missingParams.append(requiredParam)
if len(missingParams) > 0:
return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams)))
else:
return (True, "Params all check out.")