[functions][Issue:5350]Fix pulsar can't load the customized SerDe (#5357)
Fixes #5350
### Motivation
When using the `--output-serde-classname` option, `functionClassLoader` is not set correctly.
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index e92371d..b83eb4b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -44,6 +44,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
+import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -1393,6 +1394,48 @@
}
@Test
+ public void testSerdeFunction() throws Exception {
+ testCustomSerdeFunction();
+ }
+
+ private void testCustomSerdeFunction() throws Exception {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD) {
+ return;
+ }
+
+ String inputTopicName = "persistent://public/default/test-serde-java-input-" + randomName(8);
+ String outputTopicName = "test-publish-serde-output-" + randomName(8);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+ }
+
+ String functionName = "test-serde-fn-" + randomName(8);
+ submitFunction(
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, Serde_JAVA_CLASS,
+ Serde_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
+ );
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+ // get function stats
+ getFunctionStatsEmpty(functionName);
+
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+
+ FunctionStatus functionStatus = FunctionStatus.decode(result.getStdout());
+ assertEquals(functionStatus.getNumInstances(), 1);
+ assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
+ }
+
+ @Test
public void testPythonExclamationFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, false);
}
@@ -1600,6 +1643,49 @@
ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
}
+ private static <T> void submitFunction(Runtime runtime,
+ String inputTopicName,
+ String outputTopicName,
+ String functionName,
+ String functionFile,
+ String functionClass,
+ String outputSerdeClassName,
+ Map<String, String> userConfigs) throws Exception {
+
+ CommandGenerator generator;
+ log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+ if (inputTopicName.endsWith(".*")) {
+ log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
+ generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
+ } else {
+ log.info("----- CREATING REGULAR FUNCTION --- ");
+ generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+ }
+ generator.setSinkTopic(outputTopicName);
+ generator.setFunctionName(functionName);
+ generator.setOutputSerDe(outputSerdeClassName);
+ if (userConfigs != null) {
+ generator.setUserConfig(userConfigs);
+ }
+ String command;
+ if (Runtime.JAVA == runtime) {
+ command = generator.generateCreateFunctionCommand();
+ } else if (Runtime.PYTHON == runtime) {
+ generator.setRuntime(runtime);
+ command = generator.generateCreateFunctionCommand(functionFile);
+ } else {
+ throw new IllegalArgumentException("Unsupported runtime : " + runtime);
+ }
+
+ log.info("---------- Function command: {}", command);
+ String[] commands = {
+ "sh", "-c", command
+ };
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ commands);
+ assertTrue(result.getStdout().contains("\"Created successfully\""));
+ }
+
private static <T> void ensureSubscriptionCreated(String inputTopicName,
String subscriptionName,
Schema<T> inputTopicSchema)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 3dc9484..4ba741e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -79,6 +79,11 @@
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";
+ public static final String Serde_JAVA_CLASS =
+ "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
+
+ public static final String Serde_OUTPUT_CLASS =
+ "org.apache.pulsar.functions.api.examples.CustomBaseSerde";
public static final String EXCLAMATION_PYTHON_CLASS =
"exclamation_function.ExclamationFunction";