blob: b78a832f6093329d1f0bfc47bd96764214fc8177 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
import org.apache.pulsar.functions.api.examples.InitializableFunction;
import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.RecordFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.pojo.Users;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
/**
* A test base for testing functions.
*/
@Slf4j
public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
private final ObjectMapper objectMapper = new ObjectMapper();
public PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
}
protected Map<String, String> produceMessagesToInputTopic(String inputTopicName,
int numMessages) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
String value = "value-" + i;
kvs.put(key, value);
producer.newMessage()
.key(key)
.value(value)
.send();
}
return kvs;
}
protected void testFunctionLocalRun(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;
}
String inputTopicName =
"persistent://public/default/test-function-local-run-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-function-local-run-" + runtime + "-output-" + randomName(8);
final int numMessages = 10;
String cmd = "";
CommandGenerator commandGenerator = new CommandGenerator();
commandGenerator.setAdminUrl("pulsar://pulsar-broker-0:6650");
commandGenerator.setSourceTopic(inputTopicName);
commandGenerator.setSinkTopic(outputTopicName);
commandGenerator.setFunctionName("localRunTest-" + randomName(8));
commandGenerator.setRuntime(runtime);
switch (runtime) {
case JAVA:
commandGenerator.setFunctionClassName(EXCLAMATION_JAVA_CLASS);
cmd = commandGenerator.generateLocalRunCommand(null);
break;
case PYTHON:
commandGenerator.setFunctionClassName(EXCLAMATION_PYTHON_CLASS);
cmd = commandGenerator.generateLocalRunCommand(EXCLAMATION_PYTHON_FILE);
break;
case GO:
commandGenerator.setFunctionClassName(null);
cmd = commandGenerator.generateLocalRunCommand(EXCLAMATION_GO_FILE);
break;
}
log.info("cmd: {}", cmd);
pulsarCluster.getAnyWorker().execCmdAsync(cmd.split(" "));
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
retryStrategically((test) -> {
try {
return admin.topics().getStats(inputTopicName).getSubscriptions().size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 30, 200);
assertEquals(admin.topics().getStats(inputTopicName).getSubscriptions().size(), 1);
// publish and consume result
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
// java and python supports schema
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
log.info("Received: {}", msg.getValue());
assertTrue(expectedMessages.contains(msg.getValue()));
expectedMessages.remove(msg.getValue());
}
assertEquals(expectedMessages.size(), 0);
} else {
// golang doesn't support schema
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopicName)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("message-" + i).getBytes(UTF_8)).send();
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
String msgValue = new String(msg.getValue(), UTF_8);
log.info("Received: {}", msgValue);
assertTrue(expectedMessages.contains(msgValue));
expectedMessages.remove(msgValue);
}
assertEquals(expectedMessages.size(), 0);
}
}
}
protected void testWindowFunction(String type, String[] expectedResults) throws Exception {
int NUM_OF_MESSAGES = 100;
int windowLengthCount = 10;
int slidingIntervalCount = 5;
String functionName = "test-" + type + "-window-fn-" + randomName(8);
String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8);
String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}
CommandGenerator generator = CommandGenerator.createDefaultGenerator(
inputTopicName,
"org.apache.pulsar.functions.api.examples.WindowDurationFunction");
generator.setFunctionName(functionName);
generator.setSinkTopic(outputTopicName);
generator.setWindowLengthCount(windowLengthCount);
if (type.equals("sliding")) {
generator.setSlidingIntervalCount(slidingIntervalCount);
}
String[] commands = {
"sh", "-c", generator.generateCreateFunctionCommand()
};
ContainerExecResult containerExecResult = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(containerExecResult.getStdout().contains("Created successfully"));
// get function info
getFunctionInfoSuccess(functionName);
containerExecResult = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
FunctionStatus functionStatus = FunctionStatusUtil.decode(containerExecResult.getStdout());
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getNumRunning(), 1);
assertEquals(functionStatus.getInstances().size(), 1);
assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup
Reader<byte[]> reader = client.newReader().startMessageId(MessageId.earliest)
.topic(outputTopicName)
.create();
@Cleanup
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopicName)
.enableBatching(false)
.create();
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
producer.send(String.format("%d", i).getBytes());
}
int i = 0;
while (true) {
if (i > expectedResults.length) {
Assertions.fail("More results than expected");
}
Message<byte[]> msg = reader.readNext(30, TimeUnit.SECONDS);
if (msg == null) {
break;
}
String msgStr = new String(msg.getData());
log.info("[testWindowFunction] i: {} RECV: {}", i, msgStr);
String result = msgStr.split(":")[0];
assertThat(result).contains(expectedResults[i]);
i++;
}
getFunctionStatus(functionName, NUM_OF_MESSAGES, true);
// in case last commit is not updated
assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
}
protected void testFunctionNegAck(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;
}
Schema<?> schema;
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}
String inputTopicName = "persistent://public/default/test-neg-ack-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-neg-ack-" + runtime + "-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}
String functionName = "test-neg-ack-fn-" + randomName(8);
final int numMessages = 20;
// submit the exclamation function
if (runtime == Runtime.PYTHON) {
submitFunction(
runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_FUNCTION_PYTHON_FILE,
EXCEPTION_PYTHON_CLASS, schema, null);
} else {
submitFunction(
runtime, inputTopicName, outputTopicName, functionName, null, EXCEPTION_JAVA_CLASS, schema, null);
}
// get function info
getFunctionInfoSuccess(functionName);
// get function stats
getFunctionStatsEmpty(functionName);
// publish and consume result
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
// java and python supports schema
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
log.info("Received: {}", msg.getValue());
assertTrue(expectedMessages.contains(msg.getValue()));
expectedMessages.remove(msg.getValue());
}
assertEquals(expectedMessages.size(), 0);
} else {
// golang doesn't support schema
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("message-" + i).getBytes(UTF_8)).send();
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
String msgValue = new String(msg.getValue(), UTF_8);
log.info("Received: {}", msgValue);
assertTrue(expectedMessages.contains(msgValue));
expectedMessages.remove(msgValue);
}
assertEquals(expectedMessages.size(), 0);
}
// get function status
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
FunctionStatus functionStatus = FunctionStatusUtil.decode(result.getStdout());
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getNumRunning(), 1);
assertEquals(functionStatus.getInstances().size(), 1);
assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0);
assertTrue(functionStatus.getInstances().get(0).getStatus().getAverageLatency() > 0.0);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0);
// going to receive two more tuples because of delivery
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages + 2);
// only going to successfully process 20
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), numMessages);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 2);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
// get function stats
result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"stats",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
log.info("FUNCTION STATS: {}", result.getStdout());
FunctionStatsImpl functionStats = FunctionStatsImpl.decode(result.getStdout());
assertEquals(functionStats.getReceivedTotal(), numMessages + 2);
assertEquals(functionStats.getProcessedSuccessfullyTotal(), numMessages);
assertEquals(functionStats.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.getUserExceptionsTotal(), 2);
assertTrue(functionStats.avgProcessLatency > 0);
assertTrue(functionStats.getLastInvocation() > 0);
assertEquals(functionStats.instances.size(), 1);
assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), numMessages + 2);
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 2);
assertTrue(functionStats.instances.get(0).getMetrics().getAvgProcessLatency() > 0);
// delete function
deleteFunction(functionName);
// get function info
getFunctionInfoNotFound(functionName);
// make sure subscriptions are cleanup
checkSubscriptionsCleanup(inputTopicName);
}
public void testGoPublishFunction() throws Exception {
testPublishFunction(Runtime.GO);
}
protected void testPublishFunction(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;
}
Schema<?> schema;
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}
String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}
String functionName = "test-publish-fn-" + randomName(8);
final int numMessages = 10;
// submit the exclamation function
switch (runtime) {
case JAVA:
submitFunction(
runtime,
inputTopicName,
outputTopicName,
functionName,
null,
PUBLISH_JAVA_CLASS,
schema,
Collections.singletonMap("publish-topic", outputTopicName),
null, null, null, null, null, null);
break;
case PYTHON:
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaType("string");
Map<String, String> inputSpecs = new HashMap<>() {{
put(inputTopicName, objectMapper.writeValueAsString(consumerConfig));
}};
submitFunction(
runtime,
inputTopicName,
outputTopicName,
functionName,
PUBLISH_FUNCTION_PYTHON_FILE,
PUBLISH_PYTHON_CLASS,
schema,
Collections.singletonMap("publish-topic", outputTopicName),
objectMapper.writeValueAsString(inputSpecs), "string", null, null, null, null);
break;
case GO:
submitFunction(
runtime,
inputTopicName,
outputTopicName,
functionName,
PUBLISH_FUNCTION_GO_FILE,
null,
schema,
Collections.singletonMap("publish-topic", outputTopicName),
null, null, null, null, null, null);
}
// get function info
getFunctionInfoSuccess(functionName);
// get function stats
getFunctionStatsEmpty(functionName);
// publish and consume result
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
// java and python supports schema
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
} else {
// Does Go support schema? Maybe we need a switch instead for the Go case.
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage().key(String.valueOf(i)).property("count", String.valueOf(i))
.value(("message-" + i).getBytes(UTF_8)).send();
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
String msgValue = new String(msg.getValue(), UTF_8);
log.info("Received: {}", msgValue);
assertEquals(msg.getKey(), String.valueOf(i));
assertEquals(msg.getProperties().get("count"), String.valueOf(i));
assertEquals(msg.getProperties().get("input_topic"), inputTopicName);
assertTrue(msg.getEventTime() > 0);
assertTrue(expectedMessages.contains(msgValue));
expectedMessages.remove(msgValue);
}
}
// get function status
getFunctionStatus(functionName, numMessages, true);
// get function stats
getFunctionStats(functionName, numMessages);
// delete function
deleteFunction(functionName);
// get function info
getFunctionInfoNotFound(functionName);
// make sure subscriptions are cleanup
checkSubscriptionsCleanup(inputTopicName);
}
protected void testExclamationFunction(Runtime runtime,
boolean isTopicPattern,
boolean pyZip,
boolean multipleInput,
boolean withExtraDeps) throws Exception {
testExclamationFunction(runtime, isTopicPattern, pyZip, multipleInput, withExtraDeps, null);
}
protected void testExclamationFunction(Runtime runtime,
boolean isTopicPattern,
boolean pyZip,
boolean multipleInput,
boolean withExtraDeps,
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime == Runtime.PYTHON || runtime == Runtime.GO)) {
// python&go can only run on process mode
return;
}
Schema<?> schema;
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}
String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}
if (isTopicPattern) {
inputTopicName = inputTopicName + ".*";
} else if (multipleInput) {
inputTopicName = inputTopicName + "1," + inputTopicName + "2";
}
String functionName = "test-exclamation-fn-" + randomName(8);
final int numMessages = 10;
// submit the exclamation function
submitExclamationFunction(
runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, schema, commandGeneratorConsumer);
// get function info
final String info = getFunctionInfoSuccess(functionName);
// get function stats
getFunctionStatsEmpty(functionName);
// publish and consume result
if (Runtime.JAVA == runtime || Runtime.PYTHON == runtime) {
// java supports schema
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
} else {
// golang doesn't support schema
publishAndConsumeMessagesBytes(inputTopicName, outputTopicName, numMessages);
}
// get function status
getFunctionStatus(functionName, numMessages, true);
if (Runtime.GO != runtime) {
// TODO: Go runtime doesn't collect `process_latency_ms_1min` metric
// get function stats
getFunctionStats(functionName, numMessages);
}
// update parallelism
updateFunctionParallelism(functionName, 2);
//get function status
getFunctionStatus(functionName, 0, true, 2);
// update code file
switch (runtime) {
case JAVA:
updateFunctionCodeFile(functionName, Runtime.JAVA, "test");
break;
case PYTHON:
updateFunctionCodeFile(functionName, Runtime.PYTHON, EXCLAMATION_PYTHON_FILE);
break;
case GO:
updateFunctionCodeFile(functionName, Runtime.GO, EXCLAMATION_GO_FILE);
break;
}
checkSubscriptionType(inputTopicName,
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, FunctionConfig.class));
// delete function
deleteFunction(functionName);
// get function info
getFunctionInfoNotFound(functionName);
// make sure subscriptions are cleanup
checkSubscriptionsCleanup(inputTopicName);
}
private void checkSubscriptionType(String topic, FunctionConfig config) {
List<String> topics = new ArrayList<>();
if (topic.endsWith(".*")) {
topics.add(topic.substring(0, topic.length() - 2) + "1");
topics.add(topic.substring(0, topic.length() - 2) + "2");
} else if (topic.contains(",")) {
topics.addAll(Arrays.asList(topic.split(",")));
} else {
topics.add(topic);
}
topics.stream().forEach(t -> {
try {
ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"stats",
t);
TopicStats topicStats = ObjectMapperFactory.getMapper().reader()
.readValue(result.getStdout(), TopicStats.class);
assertEquals(topicStats.getSubscriptions().size(), 1);
final SubscriptionStats sub = topicStats.getSubscriptions().values().iterator()
.next();
if (config.getRetainOrdering()) {
assertEquals(sub.getType(), "Failover");
} else if (config.getRetainKeyOrdering()) {
assertEquals(sub.getType(), "Key_Shared");
} else {
assertEquals(sub.getType(), "Shared");
}
} catch (Exception e) {
fail("Command should have exited with non-zero");
}
});
}
private void submitExclamationFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
boolean pyZip,
boolean withExtraDeps,
Schema<?> schema) throws Exception {
submitExclamationFunction(runtime, inputTopicName, outputTopicName, functionName, pyZip,
withExtraDeps, schema, null);
}
private void submitExclamationFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
boolean pyZip,
boolean withExtraDeps,
Schema<?> schema,
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception {
submitFunction(
runtime,
inputTopicName,
outputTopicName,
functionName,
pyZip,
withExtraDeps,
false,
getExclamationClass(runtime, pyZip, withExtraDeps),
schema,
commandGeneratorConsumer);
}
private <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
boolean pyZip,
boolean withExtraDeps,
boolean isPublishFunction,
String functionClass,
Schema<T> inputTopicSchema,
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception {
String file = null;
if (Runtime.JAVA == runtime) {
file = null;
} else if (Runtime.PYTHON == runtime) {
if (isPublishFunction) {
file = PUBLISH_FUNCTION_PYTHON_FILE;
} else if (pyZip) {
file = EXCLAMATION_PYTHON_ZIP_FILE;
} else if (withExtraDeps) {
file = EXCLAMATION_WITH_DEPS_PYTHON_FILE;
} else {
file = EXCLAMATION_PYTHON_FILE;
}
} else if (Runtime.GO == runtime) {
if (isPublishFunction) {
file = PUBLISH_FUNCTION_GO_FILE;
} else {
file = EXCLAMATION_GO_FILE;
}
}
submitFunction(runtime, inputTopicName, outputTopicName, functionName, file, functionClass, inputTopicSchema,
commandGeneratorConsumer);
}
private <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
String functionFile,
String functionClass,
Schema<T> inputTopicSchema,
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception {
submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass,
inputTopicSchema, null, null, null, null, null, null,
commandGeneratorConsumer);
}
private <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
String functionFile,
String functionClass,
Schema<T> inputTopicSchema,
Map<String, String> userConfigs,
String customSchemaInputs,
String outputSchemaType,
SubscriptionInitialPosition subscriptionInitialPosition,
String inputTypeClassName,
String outputTypeClassName,
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception {
if (StringUtils.isNotEmpty(inputTopicName)) {
ensureSubscriptionCreated(
inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
}
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
} else {
log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
if (userConfigs != null) {
generator.setUserConfig(userConfigs);
}
if (customSchemaInputs != null) {
generator.setCustomSchemaInputs(customSchemaInputs);
}
if (outputSchemaType != null) {
generator.setSchemaType(outputSchemaType);
}
if (subscriptionInitialPosition != null) {
generator.setSubscriptionInitialPosition(subscriptionInitialPosition);
}
if (inputTypeClassName != null) {
generator.setInputTypeClassName(inputTypeClassName);
}
if (outputTypeClassName != null) {
generator.setOutputTypeClassName(outputTypeClassName);
}
if (commandGeneratorConsumer != null) {
commandGeneratorConsumer.accept(generator);
}
String command = "";
switch (runtime) {
case JAVA:
command = generator.generateCreateFunctionCommand();
break;
case PYTHON:
case GO:
generator.setRuntime(runtime);
command = generator.generateCreateFunctionCommand(functionFile);
break;
default:
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}
log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
log.info("---------- stdout is: {}", result.getStdout());
log.info("---------- stderr is: {}", result.getStderr());
assertTrue(result.getStdout().contains("Created successfully"));
}
private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
CommandGenerator generator = new CommandGenerator();
generator.setFunctionName(functionName);
generator.setParallelism(parallelism);
String command = generator.generateUpdateFunctionCommand();
log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Updated successfully"));
}
private void updateFunctionCodeFile(String functionName, Runtime runtime, String codeFile) throws Exception {
CommandGenerator generator = new CommandGenerator();
generator.setFunctionName(functionName);
generator.setRuntime(runtime);
String command = generator.generateUpdateFunctionCommand(codeFile);
log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Updated successfully"));
}
protected <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
String functionFile,
String functionClass,
Map<String, String> inputSerdeClassNames,
String outputSerdeClassName,
Map<String, String> userConfigs) throws Exception {
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
} else {
log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
generator.setCustomSerDeSourceTopics(inputSerdeClassNames);
generator.setOutputSerDe(outputSerdeClassName);
if (userConfigs != null) {
generator.setUserConfig(userConfigs);
}
String command;
if (Runtime.JAVA == runtime) {
command = generator.generateCreateFunctionCommand();
} else if (Runtime.PYTHON == runtime) {
generator.setRuntime(runtime);
command = generator.generateCreateFunctionCommand(functionFile);
} else {
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}
log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Created successfully"));
}
@SuppressWarnings("try")
private <T> void ensureSubscriptionCreated(String inputTopicName,
String subscriptionName,
Schema<T> inputTopicSchema)
throws Exception {
// ensure the function subscription exists before we start producing messages
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build()) {
List<String> topics = new ArrayList<>();
if (inputTopicName.endsWith(".*")) {
topics.add(inputTopicName.substring(0, inputTopicName.length() - 2) + "1");
topics.add(inputTopicName.substring(0, inputTopicName.length() - 2) + "2");
} else if (inputTopicName.contains(",")) {
topics.addAll(Arrays.asList(inputTopicName.split(",")));
} else {
topics.add(inputTopicName);
}
try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
.topic(topics.toArray(new String[0]))
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subscriptionName)
.subscribe()) {
}
}
}
protected String getFunctionInfoSuccess(String functionName) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
log.info("FUNCTION STATE: {}", result.getStdout());
assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
return result.getStdout();
}
protected void getFunctionStatsEmpty(String functionName) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"stats",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
log.info("FUNCTION STATS: {}", result.getStdout());
FunctionStatsImpl functionStats = FunctionStatsImpl.decode(result.getStdout());
assertEquals(functionStats.getReceivedTotal(), 0);
assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.getUserExceptionsTotal(), 0);
assertEquals(functionStats.avgProcessLatency, null);
assertEquals(functionStats.oneMin.getReceivedTotal(), 0);
assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getAvgProcessLatency(), null);
assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
assertEquals(functionStats.getLastInvocation(), null);
assertEquals(functionStats.instances.size(), 1);
assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), null);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getReceivedTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getUserExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getAvgProcessLatency(), null);
}
private void getFunctionStats(String functionName, int numMessages) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"stats",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
log.info("FUNCTION STATS: {}", result.getStdout());
FunctionStatsImpl functionStats = FunctionStatsImpl.decode(result.getStdout());
assertEquals(functionStats.getReceivedTotal(), numMessages);
assertEquals(functionStats.getProcessedSuccessfullyTotal(), numMessages);
assertEquals(functionStats.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.getUserExceptionsTotal(), 0);
assertTrue(functionStats.avgProcessLatency > 0);
assertEquals(functionStats.oneMin.getReceivedTotal(), numMessages);
assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), numMessages);
assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
assertTrue(functionStats.getLastInvocation() > 0);
assertEquals(functionStats.instances.size(), 1);
assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 0);
assertTrue(functionStats.instances.get(0).getMetrics().getAvgProcessLatency() > 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getReceivedTotal(), numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(),
numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getUserExceptionsTotal(), 0);
assertTrue(functionStats.instances.get(0).getMetrics().getOneMin().getAvgProcessLatency() > 0);
}
private void getFunctionInfoNotFound(String functionName) throws Exception {
retryStrategically(aVoid -> {
try {
pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName);
} catch (ContainerExecException e) {
if (e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist")) {
return true;
}
} catch (Exception e) {
}
return false;
}, 5, 100, true);
}
private void checkSubscriptionsCleanup(String topic) throws Exception {
List<String> topics = new ArrayList<>();
if (topic.endsWith(".*")) {
topics.add(topic.substring(0, topic.length() - 2) + "1");
topics.add(topic.substring(0, topic.length() - 2) + "2");
} else if (topic.contains(",")) {
topics.addAll(Arrays.asList(topic.split(",")));
} else {
topics.add(topic);
}
topics.stream().forEach(t -> {
try {
ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"stats",
t);
TopicStats topicStats = ObjectMapperFactory.getMapper().reader()
.readValue(result.getStdout(), TopicStats.class);
assertEquals(topicStats.getSubscriptions().size(), 0);
} catch (Exception e) {
fail("Command should have exited with non-zero");
}
});
}
private void checkPublisherCleanup(String topic) throws Exception {
try {
ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"topics",
"stats",
topic);
TopicStats topicStats = ObjectMapperFactory.getMapper().reader()
.readValue(result.getStdout(), TopicStats.class);
assertEquals(topicStats.getPublishers().size(), 0);
} catch (ContainerExecException e) {
fail("Command should have exited with non-zero");
}
}
private void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
getFunctionStatus(functionName, numMessages, checkRestarts, 1);
}
private void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism)
throws Exception {
Awaitility.await()
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(15))
.ignoreExceptions()
.untilAsserted(() ->
doGetFunctionStatus(functionName, numMessages, checkRestarts, parallelism));
}
private void doGetFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism)
throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
FunctionStatus functionStatus = FunctionStatusUtil.decode(result.getStdout());
assertEquals(functionStatus.getNumInstances(), parallelism);
assertEquals(functionStatus.getNumRunning(), parallelism);
assertEquals(functionStatus.getInstances().size(), parallelism);
boolean avgLatencyGreaterThanZero = false;
int totalMessagesProcessed = 0;
int totalMessagesSuccessfullyProcessed = 0;
boolean lastInvocationTimeGreaterThanZero = false;
for (int i = 0; i < parallelism; ++i) {
assertEquals(functionStatus.getInstances().get(i).getStatus().isRunning(), true);
assertTrue(functionStatus.getInstances().get(i).getInstanceId() >= 0);
assertTrue(functionStatus.getInstances().get(i).getInstanceId() < parallelism);
avgLatencyGreaterThanZero = avgLatencyGreaterThanZero
|| functionStatus.getInstances().get(i).getStatus().getAverageLatency() > 0.0;
lastInvocationTimeGreaterThanZero = lastInvocationTimeGreaterThanZero
|| functionStatus.getInstances().get(i).getStatus().getLastInvocationTime() > 0;
totalMessagesProcessed += functionStatus.getInstances().get(i).getStatus().getNumReceived();
totalMessagesSuccessfullyProcessed +=
functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
if (checkRestarts) {
assertEquals(functionStatus.getInstances().get(i).getStatus().getNumRestarts(), 0);
}
assertEquals(functionStatus.getInstances().get(i).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(i).getStatus().getLatestSystemExceptions().size(), 0);
}
if (numMessages > 0) {
assertTrue(avgLatencyGreaterThanZero);
assertTrue(lastInvocationTimeGreaterThanZero);
}
assertEquals(totalMessagesProcessed, numMessages);
assertEquals(totalMessagesSuccessfullyProcessed, numMessages);
}
private void publishAndConsumeMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
if (inputTopic.endsWith(".*")) {
@Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
@Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();
for (int i = 0; i < numMessages / 2; i++) {
producer1.send("message-" + i);
}
for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send("message-" + i);
}
} else {
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
log.info("Trying to receive message.. {}/{}", i, numMessages);
Message<String> msg = consumer.receive(30, TimeUnit.MINUTES);
log.info("Received: {}", msg.getValue());
assertTrue(expectedMessages.contains(msg.getValue()));
expectedMessages.remove(msg.getValue());
}
}
private void publishAndConsumeMessagesBytes(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
if (inputTopic.endsWith(".*")) {
@Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
@Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();
for (int i = 0; i < numMessages / 2; i++) {
producer1.send(("message-" + i).getBytes(UTF_8));
}
for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send(("message-" + i).getBytes(UTF_8));
}
} else if (inputTopic.contains(",")) {
String[] topics = inputTopic.split(",");
@Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
.topic(topics[0])
.create();
@Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
.topic(topics[1])
.create();
for (int i = 0; i < numMessages / 2; i++) {
producer1.send(("message-" + i).getBytes(UTF_8));
}
for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send(("message-" + i).getBytes(UTF_8));
}
} else {
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send(("message-" + i).getBytes(UTF_8));
}
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
String msgValue = new String(msg.getValue(), UTF_8);
log.info("Received: {}", msgValue);
assertTrue(expectedMessages.contains(msgValue));
expectedMessages.remove(msgValue);
}
}
private void deleteFunction(String functionName) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
assertTrue(result.getStdout().contains("Deleted successfully"));
result.assertNoStderr();
}
protected void testAutoSchemaFunction() throws Exception {
String inputTopicName = "test-autoschema-input-" + randomName(8);
String outputTopicName = "test-autoschema-output-" + randomName(8);
String functionName = "test-autoschema-fn-" + randomName(8);
final int numMessages = 10;
// submit the exclamation function
submitFunction(
Runtime.JAVA,
inputTopicName,
outputTopicName,
functionName,
false,
false,
false,
AutoSchemaFunction.class.getName(),
Schema.AVRO(CustomObject.class),
null);
// get function info
getFunctionInfoSuccess(functionName);
// publish and consume result
publishAndConsumeAvroMessages(inputTopicName, outputTopicName, numMessages);
// get function status. Note that this function might restart a few times until
// the producer above writes the messages.
getFunctionStatus(functionName, numMessages, false);
// delete function
deleteFunction(functionName);
// get function info
getFunctionInfoNotFound(functionName);
}
private void publishAndConsumeAvroMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<CustomObject> producer = client.newProducer(Schema.AVRO(CustomObject.class))
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
CustomObject co = new CustomObject(i);
producer.send(co);
}
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive();
assertEquals("value-" + i, msg.getValue());
}
}
protected void testAvroSchemaFunction(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode
return;
}
log.info("testAvroSchemaFunction start ...");
final String inputTopic = "test-avroschema-input-" + randomName(8);
final String outputTopic = "test-avroschema-output-" + randomName(8);
final String functionName = "test-avroschema-fn-" + randomName(8);
final int numMessages = 10;
@Cleanup PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
log.info("pulsar client init - input: {}, output: {}", inputTopic, outputTopic);
@Cleanup Producer<AvroTestObject> producer = pulsarClient
.newProducer(Schema.AVRO(AvroTestObject.class))
.topic(inputTopic).create();
log.info("pulsar producer init - {}", inputTopic);
@Cleanup Consumer<AvroTestObject> consumer = pulsarClient
.newConsumer(Schema.AVRO(AvroTestObject.class))
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-avro-schema")
.topic(outputTopic)
.subscribe();
log.info("pulsar consumer init - {}", outputTopic);
CompletableFuture<Optional<SchemaInfo>> inputSchemaFuture =
((PulsarClientImpl) pulsarClient).getSchema(inputTopic);
inputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
if (schemaInfo.isPresent()) {
log.info("inputSchemaInfo: {}", schemaInfo.get().toString());
} else {
log.error("input schema is not present!");
}
});
CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture =
((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
outputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
if (throwable != null) {
log.error("get output schemaInfo error", throwable);
throwable.printStackTrace();
return;
}
if (schemaInfo.isPresent()) {
log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
} else {
log.error("output schema is not present!");
}
});
if (runtime == Runtime.JAVA) {
submitFunction(
Runtime.JAVA,
inputTopic,
outputTopic,
functionName,
null,
AvroSchemaTestFunction.class.getName(),
Schema.AVRO(AvroTestObject.class),
null);
} else if (runtime == Runtime.PYTHON) {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaType("avro");
Map<String, String> inputSpecs = new HashMap<>() {{
put(inputTopic, objectMapper.writeValueAsString(consumerConfig));
}};
submitFunction(
Runtime.PYTHON,
inputTopic,
outputTopic,
functionName,
AVRO_SCHEMA_FUNCTION_PYTHON_FILE,
AVRO_SCHEMA_PYTHON_CLASS,
Schema.AVRO(AvroTestObject.class),
null, objectMapper.writeValueAsString(inputSpecs), "avro", null,
"avro_schema_test_function.AvroTestObject", "avro_schema_test_function.AvroTestObject",
null);
}
log.info("pulsar submitFunction");
getFunctionInfoSuccess(functionName);
AvroSchemaTestFunction function = new AvroSchemaTestFunction();
Set<Object> expectedSet = new HashSet<>();
log.info("test-avro-schema producer connected: " + producer.isConnected());
for (int i = 0; i < numMessages; i++) {
AvroTestObject inputObject = new AvroTestObject();
inputObject.setBaseValue(i);
MessageId messageId = producer.send(inputObject);
log.info("test-avro-schema messageId: {}", messageId.toString());
expectedSet.add(function.process(inputObject, null));
log.info("test-avro-schema expectedSet size: {}", expectedSet.size());
}
getFunctionStatus(functionName, numMessages, false);
log.info("test-avro-schema producer send message finish");
CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture2 =
((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
outputSchemaFuture2.whenComplete((schemaInfo, throwable) -> {
if (throwable != null) {
log.error("get output schemaInfo error", throwable);
throwable.printStackTrace();
return;
}
if (schemaInfo.isPresent()) {
log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
} else {
log.error("output schema is not present!");
}
});
log.info("test-avro-schema consumer connected: " + consumer.isConnected());
for (int i = 0; i < numMessages; i++) {
log.info("test-avro-schema consumer receive [{}] start", i);
Message<AvroTestObject> message = consumer.receive();
log.info("test-avro-schema consumer receive [{}] over", i);
AvroTestObject outputObject = message.getValue();
assertTrue(expectedSet.contains(outputObject));
expectedSet.remove(outputObject);
consumer.acknowledge(message);
}
log.info("test-avro-schema consumer receive message finish");
assertEquals(expectedSet.size(), 0);
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
}
protected void testInitFunction(Runtime runtime) throws Exception {
if (runtime != Runtime.JAVA) {
// only java support init function
return;
}
Schema<?> schema = Schema.STRING;
String inputTopicName = "persistent://public/default/test-init-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-init-" + runtime + "-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}
String functionName = "test-init-fn-" + randomName(8);
final int numMessages = 10;
// submit the exclamation function
submitFunction(runtime, inputTopicName, outputTopicName, functionName, null,
InitializableFunction.class.getName(), schema,
Collections.singletonMap("publish-topic", outputTopicName), null, null, null, null, null, null);
// publish and consume result
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
// delete function
deleteFunction(functionName);
}
protected void testLoggingFunction(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode
return;
}
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) {
// go can only run on process mode
return;
}
Schema<?> schema;
if (Runtime.JAVA == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}
String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8);
String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(logTopicName);
}
String functionName = "test-logging-fn-" + randomName(8);
final int numMessages = 10;
// submit the exclamation function
submitJavaLoggingFunction(
inputTopicName, logTopicName, functionName, schema);
// get function info
getFunctionInfoSuccess(functionName);
// get function stats
getFunctionStatsEmpty(functionName);
try {
// publish and consume result
publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log");
} finally {
// dump function logs so that it's easier to investigate failures
pulsarCluster.dumpFunctionLogs(functionName);
}
// get function status
getFunctionStatus(functionName, numMessages, true);
// get function stats
getFunctionStats(functionName, numMessages);
// delete function
deleteFunction(functionName);
// get function info
getFunctionInfoNotFound(functionName);
// make sure subscriptions are cleanup
checkSubscriptionsCleanup(inputTopicName);
checkPublisherCleanup(logTopicName);
}
private void submitJavaLoggingFunction(String inputTopicName,
String logTopicName,
String functionName,
Schema<?> schema) throws Exception {
ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS);
} else {
log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS);
}
generator.setLogTopic(logTopicName);
generator.setFunctionName(functionName);
String command = generator.generateCreateFunctionCommand();
log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Created successfully"));
}
private void publishAndConsumeMessages(String inputTopic,
String outputTopic,
int numMessages,
String messagePostfix) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer()
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}
Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + messagePostfix);
}
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
if (msg == null) {
log.info("Input topic stats: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(
pulsarAdmin.topics().getStats(inputTopic, true)));
log.info("Output topic stats: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(
pulsarAdmin.topics().getStats(outputTopic, true)));
log.info("Input topic internal-stats: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(
pulsarAdmin.topics().getInternalStats(inputTopic, true)));
log.info("Output topic internal-stats: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(
pulsarAdmin.topics().getInternalStats(outputTopic, true)));
} else {
String logMsg = new String(msg.getValue(), UTF_8);
log.info("Received message: '{}'", logMsg);
assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected");
expectedMessages.remove(logMsg);
}
}
consumer.close();
producer.close();
client.close();
}
protected void testGenericObjectFunction(String function, boolean removeAgeField, boolean keyValue)
throws Exception {
log.info("start {} function test ...", function);
String ns = "public/ns-genericobject-" + randomName(8);
@Cleanup
PulsarAdmin pulsarAdmin = getPulsarAdmin();
pulsarAdmin.namespaces().createNamespace(ns);
@Cleanup
PulsarClient pulsarClient = getPulsarClient();
final int numMessages = 10;
final String inputTopic = ns + "/test-object-input-" + randomName(8);
final String outputTopic = ns + "/test-object-output" + randomName(8);
@Cleanup
Consumer<GenericRecord> consumer = pulsarClient
.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(outputTopic)
.subscribe();
final String functionName = "test-generic-fn-" + randomName(8);
submitFunction(
Runtime.JAVA,
inputTopic,
outputTopic,
functionName,
null,
function,
Schema.AUTO_CONSUME(),
null,
null,
SchemaType.NONE.name(),
SubscriptionInitialPosition.Earliest, null, null, null);
try {
if (keyValue) {
@Cleanup
Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = pulsarClient
.newProducer(Schema.KeyValue(
Schema.AVRO(Users.UserV1.class),
Schema.AVRO(Users.UserV1.class), KeyValueEncodingType.SEPARATED))
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send(new KeyValue<>(new Users.UserV1("foo" + i, i),
new Users.UserV1("bar" + i, i + 100)));
}
} else {
@Cleanup
Producer<Users.UserV1> producer = pulsarClient
.newProducer(Schema.AVRO(Users.UserV1.class))
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send(new Users.UserV1("bar" + i, i + 100));
}
}
getFunctionInfoSuccess(functionName);
getFunctionStatus(functionName, numMessages, true);
int i = 0;
Message<GenericRecord> message;
do {
message = consumer.receive(30, TimeUnit.SECONDS);
if (message != null) {
GenericRecord genericRecord = message.getValue();
if (keyValue) {
@SuppressWarnings("unchecked")
KeyValue<GenericRecord, GenericRecord> keyValueObject =
(KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
GenericRecord key = keyValueObject.getKey();
GenericRecord value = keyValueObject.getValue();
key.getFields().forEach(f -> {
log.info("key field {} value {}", f.getName(), key.getField(f.getName()));
});
value.getFields().forEach(f -> {
log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
});
assertEquals(i, key.getField("age"));
assertEquals("foo" + i, key.getField("name"));
if (removeAgeField) {
// field "age" is removed from the schema
assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
} else {
assertEquals(i + 100, value.getField("age"));
}
assertEquals("bar" + i, value.getField("name"));
} else {
GenericRecord value = genericRecord;
log.info("received value {}", value);
value.getFields().forEach(f -> {
log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
});
if (removeAgeField) {
// field "age" is removed from the schema
assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
} else {
assertEquals(i + 100, value.getField("age"));
}
assertEquals("bar" + i, value.getField("name"));
}
consumer.acknowledge(message);
i++;
}
} while (message != null);
} finally {
pulsarCluster.dumpFunctionLogs(functionName);
}
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
}
protected void testRecordFunction() throws Exception {
log.info("start RecordFunction function test ...");
String ns = "public/ns-recordfunction-" + randomName(8);
@Cleanup
PulsarAdmin pulsarAdmin = getPulsarAdmin();
pulsarAdmin.namespaces().createNamespace(ns);
@Cleanup
PulsarClient pulsarClient = getPulsarClient();
final int numMessages = 10;
final String inputTopic = ns + "/test-string-input-" + randomName(8);
final String outputTopic = ns + "/test-string-output-" + randomName(8);
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic("publishtopic")
.subscribe();
final String functionName = "test-record-fn-" + randomName(8);
submitFunction(
Runtime.JAVA,
inputTopic,
outputTopic,
functionName,
null,
RecordFunction.class.getName(),
Schema.AUTO_CONSUME(),
null);
try {
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.topic(inputTopic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.send("message" + i);
}
getFunctionInfoSuccess(functionName);
getFunctionStatus(functionName, numMessages, true);
for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
log.info("Received: {}", msg.getValue());
assertEquals(msg.getValue(), "message" + i + "!");
assertEquals(msg.getProperty("input_topic"), "persistent://" + inputTopic);
}
} finally {
pulsarCluster.dumpFunctionLogs(functionName);
}
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
}
protected void testMergeFunction() throws Exception {
log.info("start merge function test ...");
String ns = "public/ns-merge-" + randomName(8);
@Cleanup
PulsarAdmin pulsarAdmin = getPulsarAdmin();
pulsarAdmin.namespaces().createNamespace(ns);
pulsarAdmin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
SchemaCompatibilityStrategy strategy = pulsarAdmin.namespaces().getSchemaCompatibilityStrategy(ns);
log.info("namespace {} SchemaCompatibilityStrategy is {}", ns, strategy);
@Cleanup
PulsarClient pulsarClient = getPulsarClient();
ObjectNode inputSpecNode = objectMapper.createObjectNode();
Map<String, AtomicInteger> topicMsgCntMap = new ConcurrentHashMap<>();
int messagePerTopic = 10;
prepareDataForMergeFunction(ns, pulsarClient, inputSpecNode, messagePerTopic, topicMsgCntMap);
final String outputTopic = ns + "/test-merge-output";
@Cleanup
Consumer<GenericRecord> consumer = pulsarClient
.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-merge-fn")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(outputTopic)
.subscribe();
final String functionName = "test-merge-fn-" + randomName(8);
submitFunction(
Runtime.JAVA,
"",
outputTopic,
functionName,
null,
MergeTopicFunction.class.getName(),
null,
null,
inputSpecNode.toString(),
SchemaType.AUTO_PUBLISH.name().toUpperCase(),
SubscriptionInitialPosition.Earliest, null, null, null);
getFunctionInfoSuccess(functionName);
getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
try {
Message<GenericRecord> message;
do {
message = consumer.receive(30, TimeUnit.SECONDS);
if (message != null) {
String baseTopic = message.getProperty("baseTopic");
GenericRecord genericRecord = message.getValue();
log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
baseTopic,
genericRecord.getSchemaType(),
genericRecord.getNativeObject().getClass(),
genericRecord.getNativeObject());
checkSchemaForAutoSchema(message, baseTopic);
topicMsgCntMap.get(baseTopic).decrementAndGet();
consumer.acknowledge(message);
}
} while (message != null);
for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
assertEquals(entry.getValue().get(), 0,
"topic " + entry.getKey() + " left message cnt is not 0.");
}
} finally {
pulsarCluster.dumpFunctionLogs(functionName);
}
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
log.info("finish merge function test.");
}
private void prepareDataForMergeFunction(String ns,
PulsarClient pulsarClient,
ObjectNode inputSpecNode,
int messagePerTopic,
Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
generateDataByDifferentSchema(ns, "merge-schema-bytes", pulsarClient,
Schema.BYTES, "bytes schema test".getBytes(), messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-string", pulsarClient,
Schema.STRING, "string schema test", messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-json-userv1", pulsarClient,
Schema.JSON(Users.UserV1.class), new Users.UserV1("ran", 33),
messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-json-userv2", pulsarClient,
Schema.JSON(Users.UserV2.class), new Users.UserV2("tang", 18, "123123123"),
messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-avro-userv2", pulsarClient,
Schema.AVRO(Users.UserV2.class), new Users.UserV2("tang", 20, "456456456"),
messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-k-int-v-json-userv1-separate", pulsarClient,
Schema.KeyValue(Schema.INT32, Schema.JSON(Users.UserV1.class), KeyValueEncodingType.SEPARATED),
new KeyValue<>(100, new Users.UserV1("ran", 40)),
messagePerTopic, inputSpecNode, topicMsgCntMap);
generateDataByDifferentSchema(ns, "merge-schema-k-json-userv2-v-json-userv1-inline", pulsarClient,
Schema.KeyValue(Schema.JSON(Users.UserV2.class), Schema.JSON(Users.UserV1.class),
KeyValueEncodingType.INLINE),
new KeyValue<>(new Users.UserV2("tang", 20, "789789789"),
new Users.UserV1("ran", 40)),
messagePerTopic, inputSpecNode, topicMsgCntMap);
}
private <T> void generateDataByDifferentSchema(String ns,
String baseTopic,
PulsarClient pulsarClient,
Schema<T> schema,
T data,
int messageCnt,
ObjectNode inputSpecNode,
Map<String, AtomicInteger> topicMsgCntMap)
throws PulsarClientException {
String topic = ns + "/" + baseTopic;
Producer<T> producer = pulsarClient.newProducer(schema)
.topic(topic)
.create();
for (int i = 0; i < messageCnt; i++) {
producer.newMessage().value(data).property("baseTopic", baseTopic).send();
}
ObjectNode confNode = objectMapper.createObjectNode();
confNode.put("schemaType", SchemaType.AUTO_CONSUME.name().toUpperCase());
inputSpecNode.put(topic, confNode.toString());
topicMsgCntMap.put(baseTopic, new AtomicInteger(messageCnt));
producer.close();
log.info("[merge-fn] generate {} messages for schema {}", messageCnt, schema.getSchemaInfo());
}
@SuppressWarnings("unchecked")
private void checkSchemaForAutoSchema(Message<GenericRecord> message, String baseTopic) {
if (message.getReaderSchema().isEmpty()) {
fail("Failed to get reader schema for auto consume multiple schema topic.");
}
Object nativeObject = message.getValue().getNativeObject();
JsonNode jsonNode;
KeyValue<?, ?> kv;
switch (baseTopic) {
case "merge-schema-bytes":
assertEquals(new String((byte[]) nativeObject), "bytes schema test");
break;
case "merge-schema-string":
assertEquals((String) nativeObject, "string schema test");
break;
case "merge-schema-json-userv1":
jsonNode = (JsonNode) nativeObject;
assertEquals(jsonNode.get("name").textValue(), "ran");
assertEquals(jsonNode.get("age").intValue(), 33);
break;
case "merge-schema-json-userv2":
jsonNode = (JsonNode) nativeObject;
assertEquals(jsonNode.get("name").textValue(), "tang");
assertEquals(jsonNode.get("age").intValue(), 18);
assertEquals(jsonNode.get("phone").textValue(), "123123123");
break;
case "merge-schema-avro-userv2":
org.apache.avro.generic.GenericRecord genericRecord =
(org.apache.avro.generic.GenericRecord) nativeObject;
assertEquals(genericRecord.get("name").toString(), "tang");
assertEquals(genericRecord.get("age"), 20);
assertEquals(genericRecord.get("phone").toString(), "456456456");
break;
case "merge-schema-k-int-v-json-userv1-separate":
kv = (KeyValue<Integer, GenericRecord>) nativeObject;
assertEquals(kv.getKey(), 100);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
assertEquals(jsonNode.get("name").textValue(), "ran");
assertEquals(jsonNode.get("age").intValue(), 40);
break;
case "merge-schema-k-json-userv2-v-json-userv1-inline":
kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
assertEquals(jsonNode.get("name").textValue(), "tang");
assertEquals(jsonNode.get("age").intValue(), 20);
assertEquals(jsonNode.get("phone").textValue(), "789789789");
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
assertEquals(jsonNode.get("name").textValue(), "ran");
assertEquals(jsonNode.get("age").intValue(), 40);
break;
default:
// nothing to do
}
}
private PulsarClient getPulsarClient() throws PulsarClientException {
return PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
}
private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
}
}