Fix Test Assertion retry (#5544)

diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 32e8144..3376cfb 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -144,6 +144,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>failsafe</artifactId>
+      <version>${failsafe.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b83eb4b..765bd19 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
-import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -44,7 +45,6 @@
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
-import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -60,6 +60,7 @@
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -79,13 +80,21 @@
 @Slf4j
 public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
+   final Duration ONE_MINUTE = Duration.ofMinutes(1);
+   final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+   final RetryPolicy statusRetryPolicy = new RetryPolicy()
+            .withMaxDuration(ONE_MINUTE)
+            .withDelay(TEN_SECONDS)
+            .onRetry(e -> log.error("Retry ... "));
+
     PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
         super(functionRuntimeType);
     }
 
     @Test
     public void testKafkaSink() throws Exception {
-        String kafkaContainerName = "kafka-" + randomName(8);
+        final String kafkaContainerName = "kafka-" + randomName(8);
         testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName));
     }
 
@@ -173,40 +182,46 @@
         getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
 
         // get sink status
-        getSinkStatus(tenant, namespace, sinkName);
+        Failsafe.with(statusRetryPolicy).run(() -> getSinkStatus(tenant, namespace, sinkName));
 
         // produce messages
         Map<String, String> kvs;
         if (tester instanceof JdbcSinkTester) {
             kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
             // wait for sink to process messages
-            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+            Failsafe.with(statusRetryPolicy).run(() ->
+                    waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages));
+
             // validate the sink result
             tester.validateSinkResult(kvs);
 
             kvs = produceSchemaUpdateMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
 
             // wait for sink to process messages
-            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20);
+            Failsafe.with(statusRetryPolicy).run(() ->
+                    waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20));
+
             // validate the sink result
             tester.validateSinkResult(kvs);
 
             kvs = produceSchemaDeleteMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
 
             // wait for sink to process messages
-            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20);
+            Failsafe.with(statusRetryPolicy).run(() ->
+                    waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20));
+
             // validate the sink result
             tester.validateSinkResult(kvs);
 
         } else {
             kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
             // wait for sink to process messages
-            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+            Failsafe.with(statusRetryPolicy).run(() ->
+                    waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages));
             // validate the sink result
             tester.validateSinkResult(kvs);
         }
 
-
         // update the sink
         updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
 
@@ -326,7 +341,7 @@
     }
 
     protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
-        String[] commands = {
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "sink",
             "status",
@@ -334,32 +349,21 @@
             "--namespace", namespace,
             "--name", sinkName
         };
-        while (true) {
-            try {
-                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get sink status : {}", result.getStdout());
 
-                assertEquals(result.getExitCode(), 0);
+        final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get sink status : {}", result.getStdout());
 
-                SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
-                try {
-                    assertEquals(sinkStatus.getNumInstances(), 1);
-                    assertEquals(sinkStatus.getNumRunning(), 1);
-                    assertEquals(sinkStatus.getInstances().size(), 1);
-                    assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
-                    return;
-                } catch (Exception e) {
-                    // noop
-                }
-            } catch (ContainerExecException e) {
-                // expected in early iterations
-            }
-            log.info("Backoff 1 second until the sink is running");
-            TimeUnit.SECONDS.sleep(1);
-        }
+        assertEquals(result.getExitCode(), 0);
+
+        SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
+
+        assertEquals(sinkStatus.getNumInstances(), 1);
+        assertEquals(sinkStatus.getNumRunning(), 1);
+        assertEquals(sinkStatus.getInstances().size(), 1);
+        assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
     }
 
     protected Map<String, String> produceMessagesToInputTopic(String inputTopicName,
@@ -368,10 +372,12 @@
         PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
+
         @Cleanup
         Producer<String> producer = client.newProducer(Schema.STRING)
             .topic(inputTopicName)
             .create();
+
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
@@ -387,16 +393,18 @@
 
     // This for JdbcSinkTester
     protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
-                                                                    int numMessages,
-                                                                    Schema<Foo> schema) throws Exception {
+                                                                          int numMessages,
+                                                                          Schema<Foo> schema) throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
+
         @Cleanup
         Producer<Foo> producer = client.newProducer(schema)
             .topic(inputTopicName)
             .create();
+
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
@@ -422,16 +430,18 @@
 
     // This for JdbcSinkTester
     protected Map<String, String> produceSchemaUpdateMessagesToInputTopic(String inputTopicName,
-                                                                    int numMessages,
-                                                                    Schema<Foo> schema) throws Exception {
+                                                                          int numMessages,
+                                                                          Schema<Foo> schema) throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                 .build();
+
         @Cleanup
         Producer<Foo> producer = client.newProducer(schema)
                 .topic(inputTopicName)
                 .create();
+
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         log.info("update start");
         for (int i = 0; i < numMessages; i++) {
@@ -465,10 +475,12 @@
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                 .build();
+
         @Cleanup
         Producer<Foo> producer = client.newProducer(schema)
                 .topic(inputTopicName)
                 .create();
+
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
@@ -493,7 +505,8 @@
     }
 
     protected void deleteSink(String tenant, String namespace, String sinkName) throws Exception {
-        String[] commands = {
+
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "sink",
             "delete",
@@ -501,6 +514,7 @@
             "--namespace", namespace,
             "--name", sinkName
         };
+
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
             result.getStdout().contains("Deleted successfully"),
@@ -513,7 +527,7 @@
     }
 
     protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
-        String[] commands = {
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "sink",
             "get",
@@ -568,13 +582,14 @@
         getSourceInfoSuccess(tester, tenant, namespace, sourceName);
 
         // get source status
-        getSourceStatus(tenant, namespace, sourceName);
+        Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
 
         // produce messages
         Map<String, String> kvs = tester.produceSourceMessages(numMessages);
 
         // wait for source to process messages
-        waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+        Failsafe.with(statusRetryPolicy).run(() ->
+                waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
         validateSourceResult(consumer, kvs);
@@ -598,7 +613,7 @@
                                          String namespace,
                                          String sourceName,
                                          String outputTopicName) throws Exception {
-        String[] commands = {
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source", "create",
             "--tenant", tenant,
@@ -608,6 +623,7 @@
             "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
             "--destinationTopicName", outputTopicName
         };
+
         log.info("Run command : {}", StringUtils.join(commands, ' '));
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
@@ -620,7 +636,7 @@
                                          String namespace,
                                          String sourceName,
                                          String outputTopicName) throws Exception {
-        String[] commands = {
+        final String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "source", "update",
                 "--tenant", tenant,
@@ -631,6 +647,7 @@
                 "--destinationTopicName", outputTopicName,
                 "--parallelism", "2"
         };
+
         log.info("Run command : {}", StringUtils.join(commands, ' '));
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
@@ -642,7 +659,7 @@
                                         String tenant,
                                         String namespace,
                                         String sourceName) throws Exception {
-        String[] commands = {
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
             "get",
@@ -650,6 +667,7 @@
             "--namespace", namespace,
             "--name", sourceName
         };
+
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get source info : {}", result.getStdout());
         assertTrue(
@@ -659,7 +677,8 @@
     }
 
     protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
-        String[] commands = {
+
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
             "status",
@@ -667,35 +686,23 @@
             "--namespace", namespace,
             "--name", sourceName
         };
-        while (true) {
-            try {
-                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get source status : {}", result.getStdout());
 
-                assertEquals(result.getExitCode(), 0);
+        final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get source status : {}", result.getStdout());
 
-                SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
-                try {
-                    assertEquals(sourceStatus.getNumInstances(), 1);
-                    assertEquals(sourceStatus.getNumRunning(), 1);
-                    assertEquals(sourceStatus.getInstances().size(), 1);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
-                    return;
-                } catch (Exception e) {
-                    // noop
-                }
+        assertEquals(result.getExitCode(), 0);
 
-                if (result.getStdout().contains("\"running\": true")) {
-                    return;
-                }
-            } catch (ContainerExecException e) {
-                // expected for early iterations
-            }
-            log.info("Backoff 1 second until the function is running");
-            TimeUnit.SECONDS.sleep(1);
-        }
+        final SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
+
+        assertEquals(sourceStatus.getNumInstances(), 1);
+        assertEquals(sourceStatus.getNumRunning(), 1);
+        assertEquals(sourceStatus.getInstances().size(), 1);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
+
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+
     }
 
     protected void validateSourceResult(Consumer<String> consumer,
@@ -711,7 +718,7 @@
                                                    String namespace,
                                                    String sourceName,
                                                    int numMessages) throws Exception {
-        String[] commands = {
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
             "status",
@@ -719,44 +726,30 @@
             "--namespace", namespace,
             "--name", sourceName
         };
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        while (true) {
-            try {
-                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get source status : {}", result.getStdout());
 
-                assertEquals(result.getExitCode(), 0);
+        final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get source status : {}", result.getStdout());
 
-                SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
-                try {
-                    assertEquals(sourceStatus.getNumInstances(), 1);
-                    assertEquals(sourceStatus.getNumRunning(), 1);
-                    assertEquals(sourceStatus.getInstances().size(), 1);
-                    assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
-                    assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
-                    return;
-                } catch (Exception e) {
-                    // noop
-                }
-            } catch (ContainerExecException e) {
-                // expected for early iterations
-            }
-            log.info("{} ms has elapsed but the source hasn't process {} messages, backoff to wait for another 1 second",
-                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
-            TimeUnit.SECONDS.sleep(1);
-        }
+        assertEquals(result.getExitCode(), 0);
+
+        SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
+        assertEquals(sourceStatus.getNumInstances(), 1);
+        assertEquals(sourceStatus.getNumRunning(), 1);
+        assertEquals(sourceStatus.getInstances().size(), 1);
+        assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+        assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+        assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
     }
 
     protected void waitForProcessingSinkMessages(String tenant,
                                                  String namespace,
                                                  String sinkName,
                                                  int numMessages) throws Exception {
-        String[] commands = {
+        final String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "sink",
                 "status",
@@ -764,42 +757,29 @@
                 "--namespace", namespace,
                 "--name", sinkName
         };
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        while (true) {
-            try {
-                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get sink status : {}", result.getStdout());
 
-                assertEquals(result.getExitCode(), 0);
+        final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get sink status : {}", result.getStdout());
 
-                SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
-                try {
-                    assertEquals(sinkStatus.getNumInstances(), 1);
-                    assertEquals(sinkStatus.getNumRunning(), 1);
-                    assertEquals(sinkStatus.getInstances().size(), 1);
-                    assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
-                    assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
-                    assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
-                    return;
-                } catch (Exception e) {
-                    // noop
-                }
+        assertEquals(result.getExitCode(), 0);
 
-            } catch (ContainerExecException e) {
-                // expected for early iterations
-            }
-            log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
-                    stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
-            TimeUnit.SECONDS.sleep(1);
-        }
+        final SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
+
+        assertEquals(sinkStatus.getNumInstances(), 1);
+        assertEquals(sinkStatus.getNumRunning(), 1);
+        assertEquals(sinkStatus.getInstances().size(), 1);
+        assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
+        assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+        assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
     }
 
     protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
-        String[] commands = {
+
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
             "delete",
@@ -807,6 +787,7 @@
             "--namespace", namespace,
             "--name", sourceName
         };
+
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
             result.getStdout().contains("Delete source successfully"),
@@ -819,7 +800,8 @@
     }
 
     protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
-        String[] commands = {
+
+        final String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "source",
             "get",
@@ -827,6 +809,7 @@
             "--namespace", namespace,
             "--name", sourceName
         };
+
         try {
             pulsarCluster.getAnyWorker().execCmd(commands);
             fail("Command should have exited with non-zero");
@@ -892,11 +875,13 @@
                 @Cleanup PulsarClient client = PulsarClient.builder()
                         .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                         .build();
+
                 @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
                         .topic(outputTopicName)
                         .subscriptionType(SubscriptionType.Exclusive)
                         .subscriptionName("test-sub")
                         .subscribe();
+
                 @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
                         .topic(inputTopicName)
                         .create();
@@ -1192,6 +1177,7 @@
             @Cleanup PulsarClient client = PulsarClient.builder()
                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                     .build();
+
             @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
                     .topic(outputTopicName)
                     .subscriptionType(SubscriptionType.Exclusive)
@@ -1322,10 +1308,24 @@
 
         if (runtime == Runtime.PYTHON) {
             submitFunction(
-                    runtime, inputTopicName, outputTopicName, functionName, PUBLISH_FUNCTION_PYTHON_FILE, PUBLISH_PYTHON_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+                    runtime,
+                    inputTopicName,
+                    outputTopicName,
+                    functionName,
+                    PUBLISH_FUNCTION_PYTHON_FILE,
+                    PUBLISH_PYTHON_CLASS,
+                    schema,
+                    Collections.singletonMap("publish-topic", outputTopicName));
         } else {
             submitFunction(
-                    runtime, inputTopicName, outputTopicName, functionName, null, PUBLISH_JAVA_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+                    runtime,
+                    inputTopicName,
+                    outputTopicName,
+                    functionName,
+                    null,
+                    PUBLISH_JAVA_CLASS,
+                    schema,
+                    Collections.singletonMap("publish-topic", outputTopicName));
         }
 
         // get function info
@@ -1412,8 +1412,8 @@
 
         String functionName = "test-serde-fn-" + randomName(8);
         submitFunction(
-                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, Serde_JAVA_CLASS,
-                Serde_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
+                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
+                SERDE_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
         );
 
         // get function info
@@ -1491,11 +1491,13 @@
             @Cleanup PulsarClient client = PulsarClient.builder()
                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                     .build();
+
             @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
                     .topic(inputTopicName + "1")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscriptionName("test-sub")
                     .subscribe();
+
             @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
                     .topic(inputTopicName + "2")
                     .subscriptionType(SubscriptionType.Exclusive)
@@ -1578,7 +1580,7 @@
             if (isPublishFunction) {
                 file = PUBLISH_FUNCTION_PYTHON_FILE;
             } else if (pyZip) {
-                file = EXCLAMATION_PYTHONZIP_FILE;
+                file = EXCLAMATION_PYTHON_ZIP_FILE;
             } else if (withExtraDeps) {
                 file = EXCLAMATION_WITH_DEPS_PYTHON_FILE;
             } else {
@@ -1879,6 +1881,7 @@
             @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
                     .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
                     .create();
+
             @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
                     .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
                     .create();
@@ -1919,15 +1922,18 @@
         @Cleanup PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
+
         @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
             .topic(outputTopic)
             .subscriptionType(SubscriptionType.Exclusive)
             .subscriptionName("test-sub")
             .subscribe();
+
         if (inputTopic.endsWith(".*")) {
             @Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
                     .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
                     .create();
+
             @Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
                     .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
                     .create();
@@ -1985,7 +1991,13 @@
 
         // submit the exclamation function
         submitFunction(
-            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false, false,
+            Runtime.JAVA,
+            inputTopicName,
+            outputTopicName,
+            functionName,
+            false,
+            false,
+            false,
             AutoSchemaFunction.class.getName(),
             Schema.AVRO(CustomObject.class));
 
@@ -2009,14 +2021,17 @@
     private static void publishAndConsumeAvroMessages(String inputTopic,
                                                       String outputTopic,
                                                       int numMessages) throws Exception {
+
         @Cleanup PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
+
         @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
             .topic(outputTopic)
             .subscriptionType(SubscriptionType.Exclusive)
             .subscriptionName("test-sub")
             .subscribe();
+
         @Cleanup Producer<CustomObject> producer = client.newProducer(Schema.AVRO(CustomObject.class))
             .topic(inputTopic)
             .create();
@@ -2079,10 +2094,11 @@
         getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
 
         // get source status
-        getSourceStatus(tenant, namespace, sourceName);
+        Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
 
         // wait for source to process messages
-        waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+        Failsafe.with(statusRetryPolicy).run(() ->
+                waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
         sourceTester.validateSourceResult(consumer, 9);
@@ -2135,10 +2151,11 @@
         getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
 
         // get source status
-        getSourceStatus(tenant, namespace, sourceName);
+        Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
 
         // wait for source to process messages
-        waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+        Failsafe.with(statusRetryPolicy).run(() ->
+                waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
         sourceTester.validateSourceResult(consumer, 9);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 4ba741e..cb5291b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -79,10 +79,10 @@
     public static final String EXCEPTION_JAVA_CLASS =
             "org.apache.pulsar.tests.integration.functions.ExceptionFunction";
 
-    public static final String Serde_JAVA_CLASS =
+    public static final String SERDE_JAVA_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
 
-    public static final String Serde_OUTPUT_CLASS =
+    public static final String SERDE_OUTPUT_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseSerde";
 
     public static final String EXCLAMATION_PYTHON_CLASS =
@@ -91,7 +91,7 @@
     public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
         "exclamation_with_extra_deps.ExclamationFunction";
 
-    public static final String EXCLAMATION_PYTHONZIP_CLASS =
+    public static final String EXCLAMATION_PYTHON_ZIP_CLASS =
             "exclamation";
 
     public static final String PUBLISH_PYTHON_CLASS = "typed_message_builder_publish.TypedMessageBuilderPublish";
@@ -99,7 +99,7 @@
 
     public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
     public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
-    public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
+    public static final String EXCLAMATION_PYTHON_ZIP_FILE = "exclamation.zip";
     public static final String PUBLISH_FUNCTION_PYTHON_FILE = "typed_message_builder_publish.py";
     public static final String EXCEPTION_FUNCTION_PYTHON_FILE = "exception_function.py";
 
@@ -110,7 +110,7 @@
             return EXCLAMATION_JAVA_CLASS;
         } else if (Runtime.PYTHON == runtime) {
             if (pyZip) {
-                return EXCLAMATION_PYTHONZIP_CLASS;
+                return EXCLAMATION_PYTHON_ZIP_CLASS;
             } else if (extraDeps) {
                 return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
             } else {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index fd68fbe..8fe8e33 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -42,12 +42,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -68,7 +66,7 @@
     }
 
     public static void dumpContainerLogToTarget(DockerClient dockerClient, String containerId) {
-        InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
         // docker api returns names prefixed with "/", it's part of it's legacy design,
         // this removes it to be consistent with what docker ps shows.
         final String containerName = inspectContainerResponse.getName().replace("/","");
@@ -119,11 +117,11 @@
     public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
                                                           String path) {
         final int READ_BLOCK_SIZE = 10000;
-        InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
         // docker api returns names prefixed with "/", it's part of it's legacy design,
         // this removes it to be consistent with what docker ps shows.
         final String containerName = inspectContainerResponse.getName().replace("/","");
-        String baseName = path.replace("/", "-").replaceAll("^-", "");
+        final String baseName = path.replace("/", "-").replaceAll("^-", "");
         File output = new File(getTargetDirectory(containerName), baseName + ".tar.gz");
         int i = 0;
         while (output.exists()) {
@@ -181,8 +179,8 @@
 
     public static ContainerExecResult runCommand(DockerClient docker,
                                                  String containerId,
-                                                 String... cmd) throws ContainerExecException, ExecutionException, InterruptedException {
-
+                                                 String... cmd)
+            throws ContainerExecException, ExecutionException, InterruptedException {
         try {
             return runCommandAsync(docker, containerId, cmd).get();
         } catch (ExecutionException e) {
@@ -193,32 +191,34 @@
         }
     }
 
-    public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient docker,
-                                                 String containerId,
-                                                 String... cmd) {
+    public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient dockerClient,
+                                                                         String containerId,
+                                                                         String... cmd) {
         CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
-        String execid = docker.execCreateCmd(containerId)
+        String execId = dockerClient.execCreateCmd(containerId)
             .withCmd(cmd)
             .withAttachStderr(true)
             .withAttachStdout(true)
             .exec()
             .getId();
-        String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+        final String containerName = inspectContainerResponse.getName().replace("/","");
+        String cmdString = String.join(" ", cmd);
         StringBuilder stdout = new StringBuilder();
         StringBuilder stderr = new StringBuilder();
-        docker.execStartCmd(execid).withDetach(false)
+        dockerClient.execStartCmd(execId).withDetach(false)
             .exec(new ResultCallback<Frame>() {
                 @Override
                 public void close() {}
 
                 @Override
                 public void onStart(Closeable closeable) {
-                    LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
+                    LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
                 }
 
                 @Override
                 public void onNext(Frame object) {
-                    LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, object);
+                    LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
                     if (StreamType.STDOUT == object.getStreamType()) {
                         stdout.append(new String(object.getPayload(), UTF_8));
                     } else if (StreamType.STDERR == object.getStreamType()) {
@@ -233,9 +233,9 @@
 
                 @Override
                 public void onComplete() {
-                    LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
+                    LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
 
-                    InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+                    InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
                     while (resp.isRunning()) {
                         try {
                             Thread.sleep(200);
@@ -243,7 +243,7 @@
                             Thread.currentThread().interrupt();
                             throw new RuntimeException(ie);
                         }
-                        resp = docker.inspectExecCmd(execid).exec();
+                        resp = dockerClient.inspectExecCmd(execId).exec();
                     }
                     int retCode = resp.getExitCode();
                     ContainerExecResult result = ContainerExecResult.of(
@@ -251,11 +251,11 @@
                             stdout.toString(),
                             stderr.toString()
                     );
-                    LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+                    LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
 
                     if (retCode != 0) {
                         LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
-                                containerId, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
+                                containerName, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
                         future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
                     } else {
                         future.complete(result);
@@ -265,21 +265,22 @@
         return future;
     }
 
-    public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient docker,
-            String containerId,
-            String... cmd)
-            throws ContainerExecException {
+    public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient dockerClient,
+                                                                   String containerId,
+                                                                   String... cmd) throws ContainerExecException {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        String execid = docker.execCreateCmd(containerId)
+        String execId = dockerClient.execCreateCmd(containerId)
                 .withCmd(cmd)
                 .withAttachStderr(true)
                 .withAttachStdout(true)
                 .exec()
                 .getId();
-        String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+        final String containerName = inspectContainerResponse.getName().replace("/","");
+        String cmdString = String.join(" ", cmd);
         ByteBuf stdout = Unpooled.buffer();
         ByteBuf stderr = Unpooled.buffer();
-        docker.execStartCmd(execid).withDetach(false)
+        dockerClient.execStartCmd(execId).withDetach(false)
                 .exec(new ResultCallback<Frame>() {
                     @Override
                     public void close() {
@@ -287,7 +288,7 @@
 
                     @Override
                     public void onStart(Closeable closeable) {
-                        LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
+                        LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
                     }
 
                     @Override
@@ -306,13 +307,13 @@
 
                     @Override
                     public void onComplete() {
-                        LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
+                        LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
                         future.complete(true);
                     }
                 });
         future.join();
 
-        InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+        InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
         while (resp.isRunning()) {
             try {
                 Thread.sleep(200);
@@ -320,7 +321,7 @@
                 Thread.currentThread().interrupt();
                 throw new RuntimeException(ie);
             }
-            resp = docker.inspectExecCmd(execid).exec();
+            resp = dockerClient.inspectExecCmd(execId).exec();
         }
         int retCode = resp.getExitCode();
 
@@ -333,7 +334,7 @@
                 retCode,
                 stdoutBytes,
                 stderrBytes);
-        LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+        LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
 
         if (retCode != 0) {
             throw new ContainerExecException(cmdString, containerId, null);