blob: 4723220baecdde635b7fe596bb2abbe63ae68f76 [file] [log] [blame]
import logging
import os
from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, random_string, Timer
)
class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.server = cls.server1 # Bootstrapping server
@classmethod
def tearDownClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return
cls.server1.close()
cls.server2.close()
cls.zk.close()
def send_messages(self, partition, messages):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0)
return [ x.value for x in messages ]
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
self.assertEqual(len(messages), num_messages)
# Make sure there are no duplicates
self.assertEqual(len(set(messages)), num_messages)
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
consumer_class = kwargs.pop('consumer', SimpleConsumer)
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
if consumer_class == SimpleConsumer:
kwargs.setdefault('iter_timeout', 0)
return consumer_class(self.client, group, topic, **kwargs)
def kafka_consumer(self, **configs):
brokers = '%s:%d' % (self.server.host, self.server.port)
consumer = KafkaConsumer(self.topic,
metadata_broker_list=brokers,
**configs)
return consumer
@kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer = self.consumer()
self.assert_message_count([ message for message in consumer ], 200)
consumer.stop()
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
consumer = self.consumer()
# Rewind 10 messages from the end
consumer.seek(-10, 2)
self.assert_message_count([ message for message in consumer ], 10)
# Rewind 13 messages from the end
consumer.seek(-13, 2)
self.assert_message_count([ message for message in consumer ], 13)
consumer.stop()
@kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = self.consumer()
# Ask for 5 messages, nothing in queue, block 5 seconds
with Timer() as t:
messages = consumer.get_messages(block=True, timeout=5)
self.assert_message_count(messages, 0)
self.assertGreaterEqual(t.interval, 5)
self.send_messages(0, range(0, 10))
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
with Timer() as t:
messages = consumer.get_messages(count=5, block=True, timeout=5)
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
# Ask for 10 messages, get 5 back, block 5 seconds
with Timer() as t:
messages = consumer.get_messages(count=10, block=True, timeout=5)
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 5)
consumer.stop()
@kafka_versions("all")
def test_simple_consumer_pending(self):
# make sure that we start with no pending messages
consumer = self.consumer()
self.assertEquals(consumer.pending(), 0)
self.assertEquals(consumer.pending(partitions=[0]), 0)
self.assertEquals(consumer.pending(partitions=[1]), 0)
# Produce 10 messages to partitions 0 and 1
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
consumer = self.consumer()
self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)
self.assertEqual(consumer.pending(partitions=[1]), 10)
# move to last message, so one partition should have 1 pending
# message and other 0
consumer.seek(-1, 2)
self.assertEqual(consumer.pending(), 1)
pending_part1 = consumer.pending(partitions=[0])
pending_part2 = consumer.pending(partitions=[1])
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
@kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
consumer = self.consumer(consumer = MultiProcessConsumer)
self.assert_message_count([ message for message in consumer ], 200)
consumer.stop()
@kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
# Ask for 5 messages, No messages in queue, block 5 seconds
with Timer() as t:
messages = consumer.get_messages(block=True, timeout=5)
self.assert_message_count(messages, 0)
self.assertGreaterEqual(t.interval, 5)
# Send 10 messages
self.send_messages(0, range(0, 10))
# Ask for 5 messages, 10 messages in queue, block 0 seconds
with Timer() as t:
messages = consumer.get_messages(count=5, block=True, timeout=5)
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
# Ask for 10 messages, 5 in queue, block 5 seconds
with Timer() as t:
messages = consumer.get_messages(count=10, block=True, timeout=5)
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 4.95)
consumer.stop()
@kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)
self.assertEqual(consumer.pending(partitions=[1]), 10)
consumer.stop()
@kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
# Produce 10 messages that are large (bigger than default fetch size)
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
# Consumer should still get all of them
consumer = self.consumer()
expected_messages = set(small_messages + large_messages)
actual_messages = set([ x.message.value for x in consumer ])
self.assertEqual(expected_messages, actual_messages)
consumer.stop()
@kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
])
# Create a consumer with the default buffer size
consumer = self.consumer()
# This consumer failes to get the message
with self.assertRaises(ConsumerFetchSizeTooSmall):
consumer.get_message(False, 0.1)
consumer.stop()
# Create a consumer with no fetch size limit
big_consumer = self.consumer(
max_buffer_size = None,
partitions = [0],
)
# Seek to the last message
big_consumer.seek(-1, 2)
# Consume giant message successfully
message = big_consumer.get_message(block=False, timeout=10)
self.assertIsNotNone(message)
self.assertEqual(message.message.value, huge_message)
big_consumer.stop()
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.consumer(
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
# Grab the first 195 messages
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
self.assert_message_count(output_msgs1, 195)
# The total offset across both partitions should be at 180
consumer2 = self.consumer(
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
# 181-200
self.assert_message_count([ message for message in consumer2 ], 20)
consumer1.stop()
consumer2.stop()
# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):
# Test parameters (see issue 135 / PR 136)
TEST_MESSAGE_SIZE=1048
INIT_BUFFER_SIZE=1024
MAX_BUFFER_SIZE=2048
assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
self.send_messages(0, [ "x" * 1048 ])
self.send_messages(1, [ "x" * 1048 ])
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
@kafka_versions("all")
def test_kafka_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer = self.kafka_consumer(auto_offset_reset='smallest',
consumer_timeout_ms=5000)
n = 0
messages = {0: set(), 1: set()}
logging.debug("kafka consumer offsets: %s" % consumer.offsets())
for m in consumer:
logging.debug("Consumed message %s" % repr(m))
n += 1
messages[m.partition].add(m.offset)
if n >= 200:
break
self.assertEqual(len(messages[0]), 100)
self.assertEqual(len(messages[1]), 100)
@kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest',
consumer_timeout_ms=TIMEOUT_MS)
# Ask for 5 messages, nothing in queue, block 5 seconds
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
msg = consumer.next()
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
self.send_messages(0, range(0, 10))
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
messages = set()
with Timer() as t:
for i in range(5):
msg = consumer.next()
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
# Ask for 10 messages, get 5 back, block 5 seconds
messages = set()
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
for i in range(10):
msg = consumer.next()
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10)
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.kafka_consumer(
group_id = GROUP_ID,
auto_commit_enable = True,
auto_commit_interval_ms = None,
auto_commit_interval_messages = 20,
auto_offset_reset='smallest',
)
# Grab the first 195 messages
output_msgs1 = []
for _ in xrange(195):
m = consumer1.next()
output_msgs1.append(m)
consumer1.task_done(m)
self.assert_message_count(output_msgs1, 195)
# The total offset across both partitions should be at 180
consumer2 = self.kafka_consumer(
group_id = GROUP_ID,
auto_commit_enable = True,
auto_commit_interval_ms = None,
auto_commit_interval_messages = 20,
consumer_timeout_ms = 100,
auto_offset_reset='smallest',
)
# 181-200
output_msgs2 = []
with self.assertRaises(ConsumerTimeout):
while True:
m = consumer2.next()
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)