blob: 784b18d91961e2f1937807dea1c427d7c770c3da [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
Send ``list`` objects to Kafka cluster.
'''
import logging
from common.serializer import serialize
from kafka import KafkaProducer
from kafka.errors import MessageSizeTooLargeError
class Producer(KafkaProducer):
'''
A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across
threads will generally be faster than having multiple instances.
Configuration parameters are described in more detail at
https://kafka.apache.org/0100/configuration.html#producerconfigs
'''
def __init__(self, **kwargs):
self._logger = logging.getLogger('SPOT.INGEST.PRODUCER')
super(Producer, self).__init__(**kwargs)
@property
def max_request(self):
'''
Return the maximum size of a request.
'''
return self.config['max_request_size']
def send_async(self, topic, value, key=None, partition=None, timestamp_ms=None):
'''
Publish a message to this topic and block until it is sent (or timeout).
:param topic : Topic where the messages will be published.
:param value : Message value - must be serializable to bytes.
:param key : A key associated with the messages - must be type bytes.
Can be used to determine which partition to send the
messages to. If partition is ``None`` (and producer's
partitioner config is left as default) then messages with
the same key will be delivered to the same partition (but
if key is not set, partition is chosen randomly).
:param partition : Optionally specify a partition. If not set, the partition
will be selected using the configured 'partitioner'.
:param timestamp_ms : Epoch milliseconds (from Jan 1 1970 UTC) to use as the
message timestamp. Default: current time
:rtype : :class:`kafka.producer.future.RecordMetadata`
:raises RuntimeError: If an error occurs while publishing the value message.
'''
assert bool(value), 'If value is ``None``, then it acts as \'delete\'.'
try:
rawbytes = serialize(value)
if not rawbytes: raise RuntimeError
_future = self.send(topic, rawbytes.getvalue(), key, partition, timestamp_ms)
meta = _future.get(timeout=10)
self._logger.debug('[Offset: {0}, Partition: {1}, Checksum: {2}]'
.format(meta.offset, meta.partition, meta.checksum))
return meta
except MessageSizeTooLargeError:
self._logger.error('The size of the message is greater than the maximum allowed. '
'[Current size: {0} bytes]'.format(rawbytes.__sizeof__()))
except Exception as exc:
self._logger.error('[{0}] {1}'.format(exc.__class__.__name__, exc.message))
raise RuntimeError