blob: c796fbb7368e9fcb70161529be0ddfa5ef584b21 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import time
from ducktape.mark import matrix, ignore
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
# once 0.10.1.2 is available backward_compatible_metadata_2_versions
# can be replaced with metadata_2_versions
backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
# If we add a new version below, we also need to add this version to `streams.py`:
# -> class `StreamsUpgradeTestJobRunnerService`, method `start_cmd`, variable `KAFKA_STREAMS_VERSION`
metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(DEV_VERSION)]
"""
After each release one should first check that the released version has been uploaded to
https://s3-us-west-2.amazonaws.com/kafka-packages/ which is the url used by system test to download jars;
anyone can verify that by calling
curl https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz to download the jar
and if it is not uploaded yet, ping the dev@kafka mailing list to request it being uploaded.
This test needs to get updated, but this requires several steps
which are outlined here:
1. Update all relevant versions in tests/kafkatest/version.py this will include adding a new version for the new
release and bumping all relevant already released versions.
2. Add the new version to the "kafkatest.version" import above and include the version in the
broker_upgrade_versions list above. You'll also need to add the new version to the
"StreamsUpgradeTestJobRunnerService" on line 484 to make sure the correct arguments are passed
during the system test run.
3. Update the vagrant/bash.sh file to include all new versions, including the newly released version
and all point releases for existing releases. You only need to list the latest version in
this file.
4. Then update all relevant versions in the tests/docker/Dockerfile
5. Add a new "upgrade-system-tests-XXXX module under streams. You can probably just copy the
latest system test module from the last release. Just make sure to update the systout print
statement in StreamsUpgradeTest to the version for the release. After you add the new module
you'll need to update settings.gradle file to include the name of the module you just created
for gradle to recognize the newly added module
6. Then you'll need to update any version changes in gradle/dependencies.gradle
"""
class StreamsUpgradeTest(Test):
"""
Test upgrading Kafka Streams (all version combination)
If metadata was changes, upgrade is more difficult
Metadata version was bumped in 0.10.1.0 and
subsequently bumped in 2.0.0
"""
def __init__(self, test_context):
super(StreamsUpgradeTest, self).__init__(test_context)
self.topics = {
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
}
processed_msg = "processed [0-9]* records"
base_version_number = str(DEV_VERSION).split("-")[0]
def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
@ignore
@cluster(num_nodes=6)
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
def test_upgrade_downgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
"""
if from_version == to_version:
return
self.replication = 3
self.num_kafka_nodes = 3
self.partitions = 1
self.isr = 2
self.topics = {
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr}},
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())),
timeout_sec=60,
err_msg="Broker did not create all topics in 60 seconds ")
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
self.driver.start()
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
self.perform_broker_upgrade(to_version)
log_monitor.wait_until(connected_message,
timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
stdout_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account))
# SmokeTestDriver allows up to 6 minutes to consume all
# records for the verification step so this timeout is set to
# 6 minutes (360 seconds) for consuming of verification records
# and a very conservative additional 2 minutes (120 seconds) to process
# the records in the verification step
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
timeout_sec=480,
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))
self.driver.stop()
processor.stop()
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version):
"""
Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
"""
if from_version == to_version:
return
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.start_all_nodes_with(from_version)
self.processors = [self.processor1, self.processor2, self.processor3]
counter = 1
random.seed()
# upgrade one-by-one via rolling bounce
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
self.do_stop_start_bounce(p, None, to_version, counter)
counter = counter + 1
# shutdown
self.driver.stop()
random.shuffle(self.processors)
for p in self.processors:
node = p.node
with node.account.monitor_log(p.STDOUT_FILE) as monitor:
p.stop()
monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
@matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions)
@matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions)
def test_metadata_upgrade(self, from_version, to_version):
"""
Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
"""
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.start_all_nodes_with(from_version)
self.processors = [self.processor1, self.processor2, self.processor3]
counter = 1
random.seed()
# first rolling bounce
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
self.do_stop_start_bounce(p, from_version[:-2], to_version, counter)
counter = counter + 1
# second rolling bounce
random.shuffle(self.processors)
for p in self.processors:
self.do_stop_start_bounce(p, None, to_version, counter)
counter = counter + 1
# shutdown
self.driver.stop()
random.shuffle(self.processors)
for p in self.processors:
node = p.node
with node.account.monitor_log(p.STDOUT_FILE) as monitor:
p.stop()
monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
def test_version_probing_upgrade(self):
"""
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
"""
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.start_all_nodes_with("") # run with TRUNK
self.processors = [self.processor1, self.processor2, self.processor3]
self.old_processors = [self.processor1, self.processor2, self.processor3]
self.upgraded_processors = []
counter = 1
current_generation = 3
random.seed()
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
current_generation = self.do_rolling_bounce(p, counter, current_generation)
counter = counter + 1
# shutdown
self.driver.stop()
random.shuffle(self.processors)
for p in self.processors:
node = p.node
with node.account.monitor_log(p.STDOUT_FILE) as monitor:
p.stop()
monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
def get_version_string(self, version):
if version.startswith("0") or version.startswith("1") \
or version.startswith("2.0") or version.startswith("2.1"):
return "Kafka version : " + version
elif "SNAPSHOT" in version:
return "Kafka version.*" + self.base_version_number + ".*SNAPSHOT"
else:
return "Kafka version: " + version
def start_all_nodes_with(self, version):
kafka_version_str = self.get_version_string(version)
# start first with <version>
self.prepare_for(self.processor1, version)
node1 = self.processor1.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
self.processor1.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
# start second with <version>
self.prepare_for(self.processor2, version)
node2 = self.processor2.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
self.processor2.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
# start third with <version>
self.prepare_for(self.processor3, version)
node3 = self.processor3.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
self.processor3.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
third_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
@staticmethod
def prepare_for(processor, version):
processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
if version == str(DEV_VERSION):
processor.set_version("") # set to TRUNK
else:
processor.set_version(version)
def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
kafka_version_str = self.get_version_string(new_version)
first_other_processor = None
second_other_processor = None
for p in self.processors:
if p != processor:
if first_other_processor is None:
first_other_processor = p
else:
second_other_processor = p
node = processor.node
first_other_node = first_other_processor.node
second_other_node = second_other_processor.node
# stop processor and wait for rebalance of others
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop()
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
roll_counter = ".1-" # second round of rolling bounces
else:
roll_counter = ".0-" # first round of rolling boundes
node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
if new_version == str(DEV_VERSION):
processor.set_version("") # set to TRUNK
else:
processor.set_version(new_version)
processor.set_upgrade_from(upgrade_from)
grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
def do_rolling_bounce(self, processor, counter, current_generation):
first_other_processor = None
second_other_processor = None
for p in self.processors:
if p != processor:
if first_other_processor is None:
first_other_processor = p
else:
second_other_processor = p
node = processor.node
first_other_node = first_other_processor.node
second_other_node = second_other_processor.node
with first_other_node.account.monitor_log(first_other_processor.LOG_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.LOG_FILE) as second_other_monitor:
# stop processor
processor.stop()
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + "." + str(counter), allow_fail=False)
with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
processor.set_upgrade_to("future_version")
processor.start()
self.old_processors.remove(processor)
self.upgraded_processors.append(processor)
# checking for the dev version which should be the only SNAPSHOT
log_monitor.wait_until("Kafka version.*" + self.base_version_number + ".*SNAPSHOT",
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account))
log_monitor.offset = 5
log_monitor.wait_until("partition\.assignment\.strategy = \[org\.apache\.kafka\.streams\.tests\.StreamsUpgradeTest$FutureStreamsPartitionAssignor\]",
timeout_sec=60,
err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account))
monitors = {}
monitors[processor] = log_monitor
monitors[first_other_processor] = first_other_monitor
monitors[second_other_processor] = second_other_monitor
if len(self.old_processors) > 0:
log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
else:
log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
first_other_monitor.wait_until("Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account))
second_other_monitor.wait_until("Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account))
log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
# version probing should trigger second rebalance
# now we check that after consecutive rebalances we have synchronized generation
generation_synchronized = False
retries = 0
while retries < 10:
processor_found = self.extract_generation_from_logs(processor)
first_other_processor_found = self.extract_generation_from_logs(first_other_processor)
second_other_processor_found = self.extract_generation_from_logs(second_other_processor)
if len(processor_found) > 0 and len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0:
self.logger.info("processor: " + str(processor_found))
self.logger.info("first other processor: " + str(first_other_processor_found))
self.logger.info("second other processor: " + str(second_other_processor_found))
processor_generation = self.extract_highest_generation(processor_found)
first_other_processor_generation = self.extract_highest_generation(first_other_processor_found)
second_other_processor_generation = self.extract_highest_generation(second_other_processor_found)
if processor_generation == first_other_processor_generation and processor_generation == second_other_processor_generation:
current_generation = processor_generation
generation_synchronized = True
break
time.sleep(5)
retries = retries + 1
if generation_synchronized == False:
raise Exception("Never saw all three processors have the synchronized generation number")
if len(self.old_processors) > 0:
self.verify_metadata_no_upgraded_yet()
return current_generation
def extract_generation_from_logs(self, processor):
return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))
def extract_highest_generation(self, found_generations):
return int(found_generations[-1])
def verify_metadata_no_upgraded_yet(self):
for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")
def confirm_topics_on_all_brokers(self, expected_topic_set):
for node in self.kafka.nodes:
match_count = 0
# need to iterate over topic_list_generator as kafka.list_topics()
# returns a python generator so values are fetched lazily
# so we can't just compare directly we must iterate over what's returned
topic_list_generator = self.kafka.list_topics(node=node)
for topic in topic_list_generator:
if topic in expected_topic_set:
match_count += 1
if len(expected_topic_set) != match_count:
return False
return True