blob: 7b360abb16eff35b5c915c2029954eb1c0c981b0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import signal
def broker_node(test, broker_type):
""" Discover node of requested type. For leader type, discovers leader for our topic and partition 0
"""
if broker_type == "leader":
node = test.kafka.leader(test.topic, partition=0)
elif broker_type == "controller":
node = test.kafka.controller()
else:
raise Exception("Unexpected broker type %s." % (broker_type))
return node
def clean_shutdown(test, broker_type):
"""Discover broker node of requested type and shut it down cleanly.
"""
node = broker_node(test, broker_type)
test.kafka.signal_node(node, sig=signal.SIGTERM)
def hard_shutdown(test, broker_type):
"""Discover broker node of requested type and shut it down with a hard kill."""
node = broker_node(test, broker_type)
test.kafka.signal_node(node, sig=signal.SIGKILL)
def clean_bounce(test, broker_type):
"""Chase the leader of one partition and restart it cleanly."""
for i in range(5):
prev_broker_node = broker_node(test, broker_type)
test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
def hard_bounce(test, broker_type):
"""Chase the leader and restart it with a hard kill."""
for i in range(5):
prev_broker_node = broker_node(test, broker_type)
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper and the broker cluster have registered the loss of the leader/controller.
# Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
def role_reassigned():
current_elected_broker = broker_node(test, broker_type)
return current_elected_broker is not None and current_elected_broker != prev_broker_node
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
test.kafka.start_node(prev_broker_node)
failures = {
"clean_shutdown": clean_shutdown,
"hard_shutdown": hard_shutdown,
"clean_bounce": clean_bounce,
"hard_bounce": hard_bounce
}
class ReplicationTest(ProduceConsumeValidateTest):
"""
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
ordering guarantees.
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
indicator that nothing is left to consume.
"""
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ReplicationTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}
})
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def setUp(self):
self.zk.start()
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["controller"],
security_protocol=["PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios.
Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
- Produce messages in the background
- Consume messages in the background
- Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
- When done driving failures, stop producing, and finish consuming
- Validate that every acked message was consumed
"""
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))