blob: f6b3d6a1be25bfe09edf7b78cb0db81b43423767 [file] [log] [blame]
# -*- coding: utf-8 -*-
import logging
from mock import MagicMock
from . import unittest
from kafka.producer.base import Producer
class TestKafkaProducer(unittest.TestCase):
def test_producer_message_types(self):
producer = Producer(MagicMock())
topic = b"test-topic"
partition = 0
bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, partition, m)
good_data_types = (b'a string!',)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, partition, m)
def test_topic_message_types(self):
from kafka.producer.simple import SimpleProducer
client = MagicMock()
def partitions(topic):
return [0, 1]
client.get_partition_ids_for_topic = partitions
producer = SimpleProducer(client, random_start=False)
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called