blob: c691cbc12b0728079cfebf73692da58488544b91 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.utils import validate_delivery
class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
- produce to a topic in the background
- consume from that topic in the background
- run some logic, e.g. fail topic leader etc.
- perform validation
"""
def __init__(self, test_context):
super(ProduceConsumeValidateTest, self).__init__(test_context=test_context)
# How long to wait for the producer to declare itself healthy? This can
# be overidden by inheriting classes.
self.producer_start_timeout_sec = 20
# How long to wait for the consumer to start consuming messages?
self.consumer_start_timeout_sec = 60
# How long wait for the consumer process to fork? This
# is important in the case when the consumer is starting from the end,
# and we don't want it to miss any messages. The race condition this
# timeout avoids is that the consumer has not forked even after the
# producer begins producing messages, in which case we will miss the
# initial set of messages and get spurious test failures.
self.consumer_init_timeout_sec = 0
self.enable_idempotence = False
# Allow tests to tolerate some data loss by overriding this for tests using older message formats
self.may_truncate_acked_records = False
def start_producer_and_consumer(self):
# Start background producer and consumer
self.consumer.start()
if (self.consumer_init_timeout_sec > 0):
self.logger.debug("Waiting %ds for the consumer to initialize.",
self.consumer_init_timeout_sec)
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
timeout_sec=self.consumer_init_timeout_sec,
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
# If consuming only latest messages, wait for offset reset to ensure all messages are consumed
if not self.consumer.from_beginning:
self.consumer.wait_for_offset_reset(self.consumer.nodes[0], self.topic, self.num_partitions)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
timeout_sec=self.producer_start_timeout_sec,
err_msg="Producer failed to produce messages for %ds." %\
self.producer_start_timeout_sec)
wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0,
timeout_sec=self.consumer_start_timeout_sec,
err_msg="Consumer failed to consume messages for %ds." %\
self.consumer_start_timeout_sec)
def check_alive(self):
msg = ""
for node in self.consumer.nodes:
if not self.consumer.alive(node):
msg = "The consumer has terminated, or timed out, on node %s." % str(node.account)
for node in self.producer.nodes:
if not self.producer.alive(node):
msg += "The producer has terminated, or timed out, on node %s." % str(node.account)
if len(msg) > 0:
raise Exception(msg)
def check_producing(self):
currently_acked = self.producer.num_acked
wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=30,
err_msg="Expected producer to still be producing.")
def stop_producer_and_consumer(self):
self.check_alive()
self.check_producing()
self.producer.stop()
self.consumer.wait()
def run_produce_consume_validate(self, core_test_action=None, *args):
"""Top-level template for simple produce/consume/validate tests."""
try:
self.start_producer_and_consumer()
if core_test_action is not None:
core_test_action(*args)
self.stop_producer_and_consumer()
self.validate()
except BaseException as e:
for s in self.test_context.services:
self.mark_for_collect(s)
raise
def validate(self):
messages_consumed = self.consumer.messages_consumed[1]
self.logger.info("Number of acked records: %d" % len(self.producer.acked))
self.logger.info("Number of consumed records: %d" % len(messages_consumed))
def check_lost_data(missing_records):
return self.kafka.search_data_files(self.topic, missing_records)
succeeded, error_msg = validate_delivery(self.producer.acked, messages_consumed,
self.enable_idempotence, check_lost_data,
self.may_truncate_acked_records)
# Collect all logs if validation fails
if not succeeded:
for s in self.test_context.services:
self.mark_for_collect(s)
assert succeeded, error_msg