Fix Test Assertion retry (#5544)
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 32e8144..3376cfb 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -144,6 +144,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>failsafe</artifactId>
+ <version>${failsafe.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b83eb4b..765bd19 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,10 +18,11 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -44,7 +45,6 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
-import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -60,6 +60,7 @@
import org.testng.annotations.Test;
import org.testng.collections.Maps;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -79,13 +80,21 @@
@Slf4j
public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+ final Duration ONE_MINUTE = Duration.ofMinutes(1);
+ final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+ final RetryPolicy statusRetryPolicy = new RetryPolicy()
+ .withMaxDuration(ONE_MINUTE)
+ .withDelay(TEN_SECONDS)
+ .onRetry(e -> log.error("Retry ... "));
+
PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
}
@Test
public void testKafkaSink() throws Exception {
- String kafkaContainerName = "kafka-" + randomName(8);
+ final String kafkaContainerName = "kafka-" + randomName(8);
testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName));
}
@@ -173,40 +182,46 @@
getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
// get sink status
- getSinkStatus(tenant, namespace, sinkName);
+ Failsafe.with(statusRetryPolicy).run(() -> getSinkStatus(tenant, namespace, sinkName));
// produce messages
Map<String, String> kvs;
if (tester instanceof JdbcSinkTester) {
kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
// wait for sink to process messages
- waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages));
+
// validate the sink result
tester.validateSinkResult(kvs);
kvs = produceSchemaUpdateMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
// wait for sink to process messages
- waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20));
+
// validate the sink result
tester.validateSinkResult(kvs);
kvs = produceSchemaDeleteMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
// wait for sink to process messages
- waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20));
+
// validate the sink result
tester.validateSinkResult(kvs);
} else {
kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
// wait for sink to process messages
- waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages));
// validate the sink result
tester.validateSinkResult(kvs);
}
-
// update the sink
updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
@@ -326,7 +341,7 @@
}
protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sink",
"status",
@@ -334,32 +349,21 @@
"--namespace", namespace,
"--name", sinkName
};
- while (true) {
- try {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get sink status : {}", result.getStdout());
- assertEquals(result.getExitCode(), 0);
+ final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
- SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
- try {
- assertEquals(sinkStatus.getNumInstances(), 1);
- assertEquals(sinkStatus.getNumRunning(), 1);
- assertEquals(sinkStatus.getInstances().size(), 1);
- assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
- return;
- } catch (Exception e) {
- // noop
- }
- } catch (ContainerExecException e) {
- // expected in early iterations
- }
- log.info("Backoff 1 second until the sink is running");
- TimeUnit.SECONDS.sleep(1);
- }
+ assertEquals(result.getExitCode(), 0);
+
+ SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
+
+ assertEquals(sinkStatus.getNumInstances(), 1);
+ assertEquals(sinkStatus.getNumRunning(), 1);
+ assertEquals(sinkStatus.getInstances().size(), 1);
+ assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
}
protected Map<String, String> produceMessagesToInputTopic(String inputTopicName,
@@ -368,10 +372,12 @@
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
+
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
@@ -387,16 +393,18 @@
// This for JdbcSinkTester
protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
- int numMessages,
- Schema<Foo> schema) throws Exception {
+ int numMessages,
+ Schema<Foo> schema) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup
Producer<Foo> producer = client.newProducer(schema)
.topic(inputTopicName)
.create();
+
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
@@ -422,16 +430,18 @@
// This for JdbcSinkTester
protected Map<String, String> produceSchemaUpdateMessagesToInputTopic(String inputTopicName,
- int numMessages,
- Schema<Foo> schema) throws Exception {
+ int numMessages,
+ Schema<Foo> schema) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup
Producer<Foo> producer = client.newProducer(schema)
.topic(inputTopicName)
.create();
+
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
log.info("update start");
for (int i = 0; i < numMessages; i++) {
@@ -465,10 +475,12 @@
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup
Producer<Foo> producer = client.newProducer(schema)
.topic(inputTopicName)
.create();
+
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
@@ -493,7 +505,8 @@
}
protected void deleteSink(String tenant, String namespace, String sinkName) throws Exception {
- String[] commands = {
+
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sink",
"delete",
@@ -501,6 +514,7 @@
"--namespace", namespace,
"--name", sinkName
};
+
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("Deleted successfully"),
@@ -513,7 +527,7 @@
}
protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sink",
"get",
@@ -568,13 +582,14 @@
getSourceInfoSuccess(tester, tenant, namespace, sourceName);
// get source status
- getSourceStatus(tenant, namespace, sourceName);
+ Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
// produce messages
Map<String, String> kvs = tester.produceSourceMessages(numMessages);
// wait for source to process messages
- waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
// validate the source result
validateSourceResult(consumer, kvs);
@@ -598,7 +613,7 @@
String namespace,
String sourceName,
String outputTopicName) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source", "create",
"--tenant", tenant,
@@ -608,6 +623,7 @@
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName
};
+
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
@@ -620,7 +636,7 @@
String namespace,
String sourceName,
String outputTopicName) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source", "update",
"--tenant", tenant,
@@ -631,6 +647,7 @@
"--destinationTopicName", outputTopicName,
"--parallelism", "2"
};
+
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
@@ -642,7 +659,7 @@
String tenant,
String namespace,
String sourceName) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source",
"get",
@@ -650,6 +667,7 @@
"--namespace", namespace,
"--name", sourceName
};
+
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source info : {}", result.getStdout());
assertTrue(
@@ -659,7 +677,8 @@
}
protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
- String[] commands = {
+
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source",
"status",
@@ -667,35 +686,23 @@
"--namespace", namespace,
"--name", sourceName
};
- while (true) {
- try {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get source status : {}", result.getStdout());
- assertEquals(result.getExitCode(), 0);
+ final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
- SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
- try {
- assertEquals(sourceStatus.getNumInstances(), 1);
- assertEquals(sourceStatus.getNumRunning(), 1);
- assertEquals(sourceStatus.getInstances().size(), 1);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
- return;
- } catch (Exception e) {
- // noop
- }
+ assertEquals(result.getExitCode(), 0);
- if (result.getStdout().contains("\"running\": true")) {
- return;
- }
- } catch (ContainerExecException e) {
- // expected for early iterations
- }
- log.info("Backoff 1 second until the function is running");
- TimeUnit.SECONDS.sleep(1);
- }
+ final SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
+
+ assertEquals(sourceStatus.getNumInstances(), 1);
+ assertEquals(sourceStatus.getNumRunning(), 1);
+ assertEquals(sourceStatus.getInstances().size(), 1);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
+
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+
}
protected void validateSourceResult(Consumer<String> consumer,
@@ -711,7 +718,7 @@
String namespace,
String sourceName,
int numMessages) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source",
"status",
@@ -719,44 +726,30 @@
"--namespace", namespace,
"--name", sourceName
};
- Stopwatch stopwatch = Stopwatch.createStarted();
- while (true) {
- try {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get source status : {}", result.getStdout());
- assertEquals(result.getExitCode(), 0);
+ final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
- SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
- try {
- assertEquals(sourceStatus.getNumInstances(), 1);
- assertEquals(sourceStatus.getNumRunning(), 1);
- assertEquals(sourceStatus.getInstances().size(), 1);
- assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
- assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
- assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
- return;
- } catch (Exception e) {
- // noop
- }
- } catch (ContainerExecException e) {
- // expected for early iterations
- }
- log.info("{} ms has elapsed but the source hasn't process {} messages, backoff to wait for another 1 second",
- stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
- TimeUnit.SECONDS.sleep(1);
- }
+ assertEquals(result.getExitCode(), 0);
+
+ SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
+ assertEquals(sourceStatus.getNumInstances(), 1);
+ assertEquals(sourceStatus.getNumRunning(), 1);
+ assertEquals(sourceStatus.getInstances().size(), 1);
+ assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+ assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+ assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
}
protected void waitForProcessingSinkMessages(String tenant,
String namespace,
String sinkName,
int numMessages) throws Exception {
- String[] commands = {
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sink",
"status",
@@ -764,42 +757,29 @@
"--namespace", namespace,
"--name", sinkName
};
- Stopwatch stopwatch = Stopwatch.createStarted();
- while (true) {
- try {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get sink status : {}", result.getStdout());
- assertEquals(result.getExitCode(), 0);
+ final ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
- SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
- try {
- assertEquals(sinkStatus.getNumInstances(), 1);
- assertEquals(sinkStatus.getNumRunning(), 1);
- assertEquals(sinkStatus.getInstances().size(), 1);
- assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
- assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
- assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
- return;
- } catch (Exception e) {
- // noop
- }
+ assertEquals(result.getExitCode(), 0);
- } catch (ContainerExecException e) {
- // expected for early iterations
- }
- log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
- stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
- TimeUnit.SECONDS.sleep(1);
- }
+ final SinkStatus sinkStatus = SinkStatus.decode(result.getStdout());
+
+ assertEquals(sinkStatus.getNumInstances(), 1);
+ assertEquals(sinkStatus.getNumRunning(), 1);
+ assertEquals(sinkStatus.getInstances().size(), 1);
+ assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true);
+ assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
+ assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
}
protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
- String[] commands = {
+
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source",
"delete",
@@ -807,6 +787,7 @@
"--namespace", namespace,
"--name", sourceName
};
+
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("Delete source successfully"),
@@ -819,7 +800,8 @@
}
protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
- String[] commands = {
+
+ final String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source",
"get",
@@ -827,6 +809,7 @@
"--namespace", namespace,
"--name", sourceName
};
+
try {
pulsarCluster.getAnyWorker().execCmd(commands);
fail("Command should have exited with non-zero");
@@ -892,11 +875,13 @@
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
+
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
@@ -1192,6 +1177,7 @@
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
@@ -1322,10 +1308,24 @@
if (runtime == Runtime.PYTHON) {
submitFunction(
- runtime, inputTopicName, outputTopicName, functionName, PUBLISH_FUNCTION_PYTHON_FILE, PUBLISH_PYTHON_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+ runtime,
+ inputTopicName,
+ outputTopicName,
+ functionName,
+ PUBLISH_FUNCTION_PYTHON_FILE,
+ PUBLISH_PYTHON_CLASS,
+ schema,
+ Collections.singletonMap("publish-topic", outputTopicName));
} else {
submitFunction(
- runtime, inputTopicName, outputTopicName, functionName, null, PUBLISH_JAVA_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+ runtime,
+ inputTopicName,
+ outputTopicName,
+ functionName,
+ null,
+ PUBLISH_JAVA_CLASS,
+ schema,
+ Collections.singletonMap("publish-topic", outputTopicName));
}
// get function info
@@ -1412,8 +1412,8 @@
String functionName = "test-serde-fn-" + randomName(8);
submitFunction(
- Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, Serde_JAVA_CLASS,
- Serde_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
+ SERDE_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
);
// get function info
@@ -1491,11 +1491,13 @@
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
.topic(inputTopicName + "1")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
+
@Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
.topic(inputTopicName + "2")
.subscriptionType(SubscriptionType.Exclusive)
@@ -1578,7 +1580,7 @@
if (isPublishFunction) {
file = PUBLISH_FUNCTION_PYTHON_FILE;
} else if (pyZip) {
- file = EXCLAMATION_PYTHONZIP_FILE;
+ file = EXCLAMATION_PYTHON_ZIP_FILE;
} else if (withExtraDeps) {
file = EXCLAMATION_WITH_DEPS_PYTHON_FILE;
} else {
@@ -1879,6 +1881,7 @@
@Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
+
@Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();
@@ -1919,15 +1922,18 @@
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
+
if (inputTopic.endsWith(".*")) {
@Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
+
@Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();
@@ -1985,7 +1991,13 @@
// submit the exclamation function
submitFunction(
- Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false, false,
+ Runtime.JAVA,
+ inputTopicName,
+ outputTopicName,
+ functionName,
+ false,
+ false,
+ false,
AutoSchemaFunction.class.getName(),
Schema.AVRO(CustomObject.class));
@@ -2009,14 +2021,17 @@
private static void publishAndConsumeAvroMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
+
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
+
@Cleanup Producer<CustomObject> producer = client.newProducer(Schema.AVRO(CustomObject.class))
.topic(inputTopic)
.create();
@@ -2079,10 +2094,11 @@
getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
// get source status
- getSourceStatus(tenant, namespace, sourceName);
+ Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
// wait for source to process messages
- waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
// validate the source result
sourceTester.validateSourceResult(consumer, 9);
@@ -2135,10 +2151,11 @@
getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
// get source status
- getSourceStatus(tenant, namespace, sourceName);
+ Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
// wait for source to process messages
- waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
// validate the source result
sourceTester.validateSourceResult(consumer, 9);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 4ba741e..cb5291b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -79,10 +79,10 @@
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";
- public static final String Serde_JAVA_CLASS =
+ public static final String SERDE_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
- public static final String Serde_OUTPUT_CLASS =
+ public static final String SERDE_OUTPUT_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseSerde";
public static final String EXCLAMATION_PYTHON_CLASS =
@@ -91,7 +91,7 @@
public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
"exclamation_with_extra_deps.ExclamationFunction";
- public static final String EXCLAMATION_PYTHONZIP_CLASS =
+ public static final String EXCLAMATION_PYTHON_ZIP_CLASS =
"exclamation";
public static final String PUBLISH_PYTHON_CLASS = "typed_message_builder_publish.TypedMessageBuilderPublish";
@@ -99,7 +99,7 @@
public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
- public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
+ public static final String EXCLAMATION_PYTHON_ZIP_FILE = "exclamation.zip";
public static final String PUBLISH_FUNCTION_PYTHON_FILE = "typed_message_builder_publish.py";
public static final String EXCEPTION_FUNCTION_PYTHON_FILE = "exception_function.py";
@@ -110,7 +110,7 @@
return EXCLAMATION_JAVA_CLASS;
} else if (Runtime.PYTHON == runtime) {
if (pyZip) {
- return EXCLAMATION_PYTHONZIP_CLASS;
+ return EXCLAMATION_PYTHON_ZIP_CLASS;
} else if (extraDeps) {
return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
} else {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index fd68fbe..8fe8e33 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -42,12 +42,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -68,7 +66,7 @@
}
public static void dumpContainerLogToTarget(DockerClient dockerClient, String containerId) {
- InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+ final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
final String containerName = inspectContainerResponse.getName().replace("/","");
@@ -119,11 +117,11 @@
public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
String path) {
final int READ_BLOCK_SIZE = 10000;
- InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+ final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
final String containerName = inspectContainerResponse.getName().replace("/","");
- String baseName = path.replace("/", "-").replaceAll("^-", "");
+ final String baseName = path.replace("/", "-").replaceAll("^-", "");
File output = new File(getTargetDirectory(containerName), baseName + ".tar.gz");
int i = 0;
while (output.exists()) {
@@ -181,8 +179,8 @@
public static ContainerExecResult runCommand(DockerClient docker,
String containerId,
- String... cmd) throws ContainerExecException, ExecutionException, InterruptedException {
-
+ String... cmd)
+ throws ContainerExecException, ExecutionException, InterruptedException {
try {
return runCommandAsync(docker, containerId, cmd).get();
} catch (ExecutionException e) {
@@ -193,32 +191,34 @@
}
}
- public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient docker,
- String containerId,
- String... cmd) {
+ public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient dockerClient,
+ String containerId,
+ String... cmd) {
CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
- String execid = docker.execCreateCmd(containerId)
+ String execId = dockerClient.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
- String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+ final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+ final String containerName = inspectContainerResponse.getName().replace("/","");
+ String cmdString = String.join(" ", cmd);
StringBuilder stdout = new StringBuilder();
StringBuilder stderr = new StringBuilder();
- docker.execStartCmd(execid).withDetach(false)
+ dockerClient.execStartCmd(execId).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
public void close() {}
@Override
public void onStart(Closeable closeable) {
- LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
+ LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
}
@Override
public void onNext(Frame object) {
- LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, object);
+ LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
if (StreamType.STDOUT == object.getStreamType()) {
stdout.append(new String(object.getPayload(), UTF_8));
} else if (StreamType.STDERR == object.getStreamType()) {
@@ -233,9 +233,9 @@
@Override
public void onComplete() {
- LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
+ LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
- InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+ InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
while (resp.isRunning()) {
try {
Thread.sleep(200);
@@ -243,7 +243,7 @@
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
- resp = docker.inspectExecCmd(execid).exec();
+ resp = dockerClient.inspectExecCmd(execId).exec();
}
int retCode = resp.getExitCode();
ContainerExecResult result = ContainerExecResult.of(
@@ -251,11 +251,11 @@
stdout.toString(),
stderr.toString()
);
- LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
if (retCode != 0) {
LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
- containerId, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
+ containerName, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
} else {
future.complete(result);
@@ -265,21 +265,22 @@
return future;
}
- public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient docker,
- String containerId,
- String... cmd)
- throws ContainerExecException {
+ public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient dockerClient,
+ String containerId,
+ String... cmd) throws ContainerExecException {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- String execid = docker.execCreateCmd(containerId)
+ String execId = dockerClient.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
- String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+ final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
+ final String containerName = inspectContainerResponse.getName().replace("/","");
+ String cmdString = String.join(" ", cmd);
ByteBuf stdout = Unpooled.buffer();
ByteBuf stderr = Unpooled.buffer();
- docker.execStartCmd(execid).withDetach(false)
+ dockerClient.execStartCmd(execId).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
public void close() {
@@ -287,7 +288,7 @@
@Override
public void onStart(Closeable closeable) {
- LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
+ LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
}
@Override
@@ -306,13 +307,13 @@
@Override
public void onComplete() {
- LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
+ LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
future.complete(true);
}
});
future.join();
- InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+ InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
while (resp.isRunning()) {
try {
Thread.sleep(200);
@@ -320,7 +321,7 @@
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
- resp = docker.inspectExecCmd(execid).exec();
+ resp = dockerClient.inspectExecCmd(execId).exec();
}
int retCode = resp.getExitCode();
@@ -333,7 +334,7 @@
retCode,
stdoutBytes,
stderrBytes);
- LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
if (retCode != 0) {
throw new ContainerExecException(cmdString, containerId, null);