| # -*- coding: utf-8 -*- |
| |
| import logging |
| |
| from mock import MagicMock |
| from . import unittest |
| |
| from kafka.producer.base import Producer |
| |
| |
| class TestKafkaProducer(unittest.TestCase): |
| def test_producer_message_types(self): |
| |
| producer = Producer(MagicMock()) |
| topic = b"test-topic" |
| partition = 0 |
| |
| bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'}) |
| for m in bad_data_types: |
| with self.assertRaises(TypeError): |
| logging.debug("attempting to send message of type %s", type(m)) |
| producer.send_messages(topic, partition, m) |
| |
| good_data_types = (b'a string!',) |
| for m in good_data_types: |
| # This should not raise an exception |
| producer.send_messages(topic, partition, m) |
| |
| def test_topic_message_types(self): |
| from kafka.producer.simple import SimpleProducer |
| |
| client = MagicMock() |
| |
| def partitions(topic): |
| return [0, 1] |
| |
| client.get_partition_ids_for_topic = partitions |
| |
| producer = SimpleProducer(client, random_start=False) |
| topic = b"test-topic" |
| producer.send_messages(topic, b'hi') |
| assert client.send_produce_request.called |