KAFKA-10295: Wait for connector recovery in test_bounce (#9043)
Signed-off-by: Greg Harris <gregh@confluent.io>
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 244bc64..107c0d4 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -387,8 +387,22 @@
# through the test.
time.sleep(15)
+ # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance
+ time.sleep(60)
+ # Allow the connectors to startup, recover, and exit cleanly before
+ # ending the test. It's possible for the source connector to make
+ # uncommitted progress, and for the sink connector to read messages that
+ # have not been committed yet, and fail a later assertion.
+ wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+ time.sleep(15)
self.source.stop()
+ # Ensure that the sink connector has an opportunity to read all
+ # committed messages from the source connector.
+ wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+ time.sleep(15)
self.sink.stop()
self.cc.stop()