blob: dfb0b9eac3ebe04c0c8d73a9f8ad7e72339a83b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import java.io.File;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
@Slf4j
public class FunctionConfigUtils {
static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {
boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
if (classLoader != null) {
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
}
}
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
// Setup source
Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
if (functionConfig.getInputs() != null) {
functionConfig.getInputs().forEach((topicName -> {
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
.build());
}));
}
if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(true)
.build());
}
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSerdeClassName(serdeClassName)
.setIsRegexPattern(false)
.build());
});
}
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName, conf) -> {
try {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSchemaType(consumerConfig.getSchemaType())
.putAllSchemaProperties(consumerConfig.getSchemaProperties())
.putAllConsumerProperties(consumerConfig.getConsumerProperties())
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(String.format("Incorrect custom schema inputs,Topic %s ", topicName));
}
});
}
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(consumerConf.isRegexPattern());
if (isNotBlank(consumerConf.getSchemaType())) {
bldr.setSchemaType(consumerConf.getSchemaType());
} else if (isNotBlank(consumerConf.getSerdeClassName())) {
bldr.setSerdeClassName(consumerConf.getSerdeClassName());
}
if (consumerConf.getReceiverQueueSize() != null) {
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(consumerConf.getReceiverQueueSize()).build());
}
if (consumerConf.getSchemaProperties() != null) {
bldr.putAllSchemaProperties(consumerConf.getSchemaProperties());
}
if (consumerConf.getCryptoConfig() != null) {
bldr.setCryptoSpec(CryptoUtils.convert(consumerConf.getCryptoConfig()));
}
bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
bldr.setPoolMessages(consumerConf.isPoolMessages());
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}
// Set subscription type
Function.SubscriptionType subType;
if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
|| FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) {
subType = Function.SubscriptionType.FAILOVER;
} else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
subType = Function.SubscriptionType.KEY_SHARED;
} else {
subType = Function.SubscriptionType.SHARED;
}
sourceSpecBuilder.setSubscriptionType(subType);
// Set subscription name
if (isNotBlank(functionConfig.getSubName())) {
sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
}
// Set subscription position
if (functionConfig.getSubscriptionPosition() != null) {
Function.SubscriptionPosition subPosition = null;
if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
subPosition = Function.SubscriptionPosition.LATEST;
}
sourceSpecBuilder.setSubscriptionPosition(subPosition);
}
if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
if (functionConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
// We use negative acks for fast tracking failures
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(functionConfig.getTimeoutMs());
}
if (functionConfig.getCleanupSubscription() != null) {
sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
} else {
sourceSpecBuilder.setCleanupSubscription(true);
}
functionDetailsBuilder.setSource(sourceSpecBuilder);
// Setup sink
Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
if (functionConfig.getOutput() != null) {
sinkSpecBuilder.setTopic(functionConfig.getOutput());
}
if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
}
if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
}
if (functionConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
sinkSpecBuilder.setForwardSourceMessageProperty(functionConfig.getForwardSourceMessageProperty());
}
if (functionConfig.getCustomSchemaOutputs() != null && functionConfig.getOutput() != null) {
String conf = functionConfig.getCustomSchemaOutputs().get(functionConfig.getOutput());
try {
if (StringUtils.isNotEmpty(conf)) {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sinkSpecBuilder.putAllSchemaProperties(consumerConfig.getSchemaProperties());
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
}
}
if (typeArgs != null) {
sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
}
if (functionConfig.getProducerConfig() != null) {
ProducerConfig producerConf = functionConfig.getProducerConfig();
Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
if (producerConf.getMaxPendingMessages() != null) {
pbldr.setMaxPendingMessages(producerConf.getMaxPendingMessages());
}
if (producerConf.getMaxPendingMessagesAcrossPartitions() != null) {
pbldr.setMaxPendingMessagesAcrossPartitions(producerConf.getMaxPendingMessagesAcrossPartitions());
}
if (producerConf.getUseThreadLocalProducers() != null) {
pbldr.setUseThreadLocalProducers(producerConf.getUseThreadLocalProducers());
}
if (producerConf.getCryptoConfig() != null) {
pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
}
if (producerConf.getBatchBuilder() != null) {
pbldr.setBatchBuilder(producerConf.getBatchBuilder());
}
sinkSpecBuilder.setProducerSpec(pbldr.build());
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
if (functionConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(functionConfig.getTenant());
}
if (functionConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
}
if (functionConfig.getName() != null) {
functionDetailsBuilder.setName(functionConfig.getName());
}
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getRuntime() != null) {
functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
if (functionConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
}
if (functionConfig.getRetainKeyOrdering() != null) {
functionDetailsBuilder.setRetainKeyOrdering(functionConfig.getRetainKeyOrdering());
}
if (functionConfig.getRetainOrdering() != null) {
functionDetailsBuilder.setRetainOrdering(functionConfig.getRetainOrdering());
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
}
functionDetailsBuilder.setRetryDetails(retryBuilder);
}
Map<String, Object> configs = new HashMap<>();
if (functionConfig.getUserConfig() != null) {
configs.putAll(functionConfig.getUserConfig());
}
// windowing related
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
// set class name to window function executor
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
} else {
if (functionConfig.getClassName() != null) {
functionDetailsBuilder.setClassName(functionConfig.getClassName());
}
}
if (!configs.isEmpty()) {
functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
}
if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
}
if (functionConfig.getAutoAck() != null) {
functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck());
} else {
functionDetailsBuilder.setAutoAck(true);
}
if (functionConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
} else {
functionDetailsBuilder.setParallelism(1);
}
// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(functionConfig.getResources());
Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
}
functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
if (!StringUtils.isEmpty(functionConfig.getCustomRuntimeOptions())) {
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}
if (isBuiltin) {
String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
functionDetailsBuilder.setBuiltin(builtin);
}
return functionDetailsBuilder.build();
}
public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(functionDetails.getTenant());
functionConfig.setNamespace(functionDetails.getNamespace());
functionConfig.setName(functionDetails.getName());
functionConfig.setParallelism(functionDetails.getParallelism());
functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
if (isNotEmpty(input.getValue().getSerdeClassName())) {
consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
}
if (isNotEmpty(input.getValue().getSchemaType())) {
consumerConfig.setSchemaType(input.getValue().getSchemaType());
}
if (input.getValue().hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
}
if (input.getValue().hasCryptoSpec()) {
consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(input.getValue().getCryptoSpec()));
}
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfig.setSchemaProperties(input.getValue().getSchemaPropertiesMap());
consumerConfig.setPoolMessages(input.getValue().getPoolMessages());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
functionConfig.setInputSpecs(consumerConfigMap);
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
}
functionConfig.setRetainOrdering(functionDetails.getRetainOrdering());
functionConfig.setRetainKeyOrdering(functionDetails.getRetainKeyOrdering());
functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
functionConfig.setAutoAck(functionDetails.getAutoAck());
// Set subscription position
functionConfig.setSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
if (functionDetails.getSource().getTimeoutMs() != 0) {
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
}
if (!isEmpty(functionDetails.getSink().getTopic())) {
functionConfig.setOutput(functionDetails.getSink().getTopic());
}
if (!isEmpty(functionDetails.getSink().getSerDeClassName())) {
functionConfig.setOutputSerdeClassName(functionDetails.getSink().getSerDeClassName());
}
if (!isEmpty(functionDetails.getSink().getSchemaType())) {
functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
}
if (functionDetails.getSink().getProducerSpec() != null) {
Function.ProducerSpec spec = functionDetails.getSink().getProducerSpec();
ProducerConfig producerConfig = new ProducerConfig();
if (spec.getMaxPendingMessages() != 0) {
producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
}
if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
}
if (spec.hasCryptoSpec()) {
producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
}
if (spec.getBatchBuilder() != null) {
producerConfig.setBatchBuilder(spec.getBatchBuilder());
}
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
functionConfig.setProducerConfig(producerConfig);
}
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
if (functionDetails.getSink().getForwardSourceMessageProperty()) {
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
}
functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
if (functionDetails.hasRetryDetails()) {
functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
functionConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
}
}
Map<String, Object> userConfig;
if (!isEmpty(functionDetails.getUserConfig())) {
Type type = new TypeToken<Map<String, Object>>() {
}.getType();
userConfig = new Gson().fromJson(functionDetails.getUserConfig(), type);
} else {
userConfig = new HashMap<>();
}
if (userConfig.containsKey(WindowConfig.WINDOW_CONFIG_KEY)) {
WindowConfig windowConfig = new Gson().fromJson(
(new Gson().toJson(userConfig.get(WindowConfig.WINDOW_CONFIG_KEY))),
WindowConfig.class);
userConfig.remove(WindowConfig.WINDOW_CONFIG_KEY);
functionConfig.setClassName(windowConfig.getActualWindowFunctionClassName());
functionConfig.setWindowConfig(windowConfig);
} else {
functionConfig.setClassName(functionDetails.getClassName());
}
functionConfig.setUserConfig(userConfig);
if (!isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {
}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
functionConfig.setSecrets(secretsMap);
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
resources.setRam(functionDetails.getResources().getRam());
resources.setDisk(functionDetails.getResources().getDisk());
functionConfig.setResources(resources);
}
if (!isEmpty(functionDetails.getRuntimeFlags())) {
functionConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
}
if (!isEmpty(functionDetails.getCustomRuntimeOptions())) {
functionConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
}
return functionConfig;
}
public static void inferMissingArguments(FunctionConfig functionConfig,
boolean forwardSourceMessagePropertyEnabled) {
if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getTenant())) {
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
}
if (functionConfig.getParallelism() == null) {
functionConfig.setParallelism(1);
}
if (functionConfig.getMaxPendingAsyncRequests() == null) {
functionConfig.setMaxPendingAsyncRequests(MAX_PENDING_ASYNC_REQUESTS_DEFAULT);
}
if (forwardSourceMessagePropertyEnabled) {
if (functionConfig.getForwardSourceMessageProperty() == null) {
functionConfig.setForwardSourceMessageProperty(FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT);
}
} else {
// if worker disables forward source message property, we don't need to set the default value.
functionConfig.setForwardSourceMessageProperty(null);
}
if (functionConfig.getJar() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
} else if (functionConfig.getPy() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
} else if (functionConfig.getGo() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
}
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
WindowConfigUtils.inferMissingArguments(windowConfig);
functionConfig.setAutoAck(false);
}
}
private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
try {
Class functionClass = clsLoader.loadClass(functionConfig.getClassName());
if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
&& !java.util.function.Function.class.isAssignableFrom(functionClass)
&& !org.apache.pulsar.functions.api.WindowFunction.class.isAssignableFrom(functionClass)) {
throw new IllegalArgumentException(
String.format("Function class %s does not implement the correct interface",
functionClass.getName()));
}
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
Class<?>[] typeArgs;
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
});
}
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName, conf) -> {
ConsumerConfig consumerConfig;
try {
consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(String.format("Topic %s has an incorrect schema Info", topicName));
}
ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0], clsLoader, true);
});
}
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements Schema or SerDe classes
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, conf) -> {
// Need to make sure that one and only one of schema/serde is set
if (!isEmpty(conf.getSchemaType()) && !isEmpty(conf.getSerdeClassName())) {
throw new IllegalArgumentException(
"Only one of schemaType or serdeClassName should be set in inputSpec");
}
if (!isEmpty(conf.getSerdeClassName())) {
ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
}
if (!isEmpty(conf.getSchemaType())) {
ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
}
if (conf.getCryptoConfig() != null) {
ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(), clsLoader, false);
}
});
}
if (Void.class.equals(typeArgs[1])) {
return;
}
// One and only one of outputSchemaType and outputSerdeClassName should be set
if (!isEmpty(functionConfig.getOutputSerdeClassName()) && !isEmpty(functionConfig.getOutputSchemaType())) {
throw new IllegalArgumentException(
"Only one of outputSchemaType or outputSerdeClassName should be set");
}
if (!isEmpty(functionConfig.getOutputSchemaType())) {
ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
}
if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
}
if (functionConfig.getProducerConfig() != null && functionConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils.validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
}
}
private static void doPythonChecks(FunctionConfig functionConfig) {
if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
}
if (functionConfig.getWindowConfig() != null) {
throw new IllegalArgumentException("There is currently no support windowing in python");
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
throw new IllegalArgumentException("Message retries not yet supported in python");
}
if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
throw new IllegalArgumentException("Retain Key Orderering not yet supported in python");
}
}
private static void doGolangChecks(FunctionConfig functionConfig) {
if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing guarantees not yet supported in Go function");
}
if (functionConfig.getWindowConfig() != null) {
throw new IllegalArgumentException("Windowing is not supported in Go function yet");
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
throw new IllegalArgumentException("Message retries not yet supported in Go function");
}
if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
throw new IllegalArgumentException("Retain Key Orderering not yet supported in Go function");
}
}
private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
if (inputTopics.contains(outputTopic)) {
throw new IllegalArgumentException(
String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
outputTopic));
}
}
private static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
if (isEmpty(functionConfig.getNamespace())) {
throw new IllegalArgumentException("Function namespace cannot be null");
}
if (isEmpty(functionConfig.getName())) {
throw new IllegalArgumentException("Function name cannot be null");
}
// go doesn't need className
if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
if (isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("Function classname cannot be null");
}
}
Collection<String> allInputTopics = collectAllInputTopics(functionConfig);
if (allInputTopics.isEmpty()) {
throw new IllegalArgumentException("No input topic(s) specified for the function");
}
for (String topic : allInputTopics) {
if (!TopicName.isValid(topic)) {
throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
}
}
if (!isEmpty(functionConfig.getOutput())) {
if (!TopicName.isValid(functionConfig.getOutput())) {
throw new IllegalArgumentException(String.format("Output topic %s is invalid", functionConfig.getOutput()));
}
}
if (!isEmpty(functionConfig.getLogTopic())) {
if (!TopicName.isValid(functionConfig.getLogTopic())) {
throw new IllegalArgumentException(String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
}
}
if (!isEmpty(functionConfig.getDeadLetterTopic())) {
if (!TopicName.isValid(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException(String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
}
}
if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Function parallelism must be a positive number");
}
// Ensure that topics aren't being used as both input and output
verifyNoTopicClash(allInputTopics, functionConfig.getOutput());
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
// set auto ack to false since windowing framework is responsible
// for acking and not the function framework
if (functionConfig.getAutoAck() != null && functionConfig.getAutoAck()) {
throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
}
WindowConfigUtils.validate(windowConfig);
}
if (functionConfig.getResources() != null) {
ResourceConfigUtils.validate(functionConfig.getResources());
}
if (functionConfig.getTimeoutMs() != null && functionConfig.getTimeoutMs() <= 0) {
throw new IllegalArgumentException("Function timeout must be a positive number");
}
if (functionConfig.getTimeoutMs() != null
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
+ FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
}
if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0) && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
if (functionConfig.getRetainKeyOrdering() != null
&& functionConfig.getRetainKeyOrdering()
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException("When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}
if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
&& functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
throw new IllegalArgumentException("The supplied python file does not exist");
}
}
if (!isEmpty(functionConfig.getGo()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getGo())
&& functionConfig.getGo().startsWith(BUILTIN)) {
if (!new File(functionConfig.getGo()).exists()) {
throw new IllegalArgumentException("The supplied go file does not exist");
}
}
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, conf) -> {
// receiver queue size should be >= 0
if (conf.getReceiverQueueSize() != null && conf.getReceiverQueueSize() < 0) {
throw new IllegalArgumentException(
"Receiver queue size should be >= zero");
}
if (conf.getCryptoConfig() != null && isBlank(conf.getCryptoConfig().getCryptoKeyReaderClassName())) {
throw new IllegalArgumentException(
"CryptoKeyReader class name required");
}
});
}
if (functionConfig.getProducerConfig() != null && functionConfig.getProducerConfig().getCryptoConfig() != null) {
if (isBlank(functionConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
throw new IllegalArgumentException("CryptoKeyReader class name required");
}
if (functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys() == null
|| functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys().length == 0) {
throw new IllegalArgumentException("Must provide encryption key name for crypto key reader");
}
}
}
private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
List<String> retval = new LinkedList<>();
if (functionConfig.getInputs() != null) {
retval.addAll(functionConfig.getInputs());
}
if (functionConfig.getTopicsPattern() != null) {
retval.add(functionConfig.getTopicsPattern());
}
if (functionConfig.getCustomSerdeInputs() != null) {
retval.addAll(functionConfig.getCustomSerdeInputs().keySet());
}
if (functionConfig.getCustomSchemaInputs() != null) {
retval.addAll(functionConfig.getCustomSchemaInputs().keySet());
}
if (functionConfig.getInputSpecs() != null) {
retval.addAll(functionConfig.getInputSpecs().keySet());
}
return retval;
}
public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
doCommonChecks(functionConfig);
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
ClassLoader classLoader = null;
if (functionPackageFile != null) {
try {
classLoader = loadJar(functionPackageFile);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Corrupted Jar File", e);
}
} else if (!isEmpty(functionConfig.getJar())) {
File jarFile = new File(functionConfig.getJar());
if (!jarFile.exists()) {
throw new IllegalArgumentException("Jar file does not exist");
}
try {
classLoader = loadJar(jarFile);
} catch (Exception e) {
throw new IllegalArgumentException("Corrupted Jar File", e);
}
} else {
throw new IllegalArgumentException("Function Package is not provided");
}
doJavaChecks(functionConfig, classLoader);
return classLoader;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
doGolangChecks(functionConfig);
return null;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON){
doPythonChecks(functionConfig);
return null;
} else {
throw new IllegalArgumentException("Function language runtime is either not set or cannot be determined");
}
}
public static void validateJavaFunction(FunctionConfig functionConfig, ClassLoader classLoader) {
doCommonChecks(functionConfig);
doJavaChecks(functionConfig, classLoader);
}
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
FunctionConfig mergedConfig = existingConfig.toBuilder().build();
if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
throw new IllegalArgumentException("Tenants differ");
}
if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
throw new IllegalArgumentException("Namespaces differ");
}
if (!existingConfig.getName().equals(newConfig.getName())) {
throw new IllegalArgumentException("Function Names differ");
}
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
if (!StringUtils.isEmpty(newConfig.getJar())) {
mergedConfig.setJar(newConfig.getJar());
}
if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder().isRegexPattern(false).build());
}));
}
if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) {
newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
ConsumerConfig.builder()
.isRegexPattern(true)
.build());
}
if (newConfig.getCustomSerdeInputs() != null) {
newConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.serdeClassName(serdeClassName)
.isRegexPattern(false)
.build());
});
}
if (newConfig.getCustomSchemaInputs() != null) {
newConfig.getCustomSchemaInputs().forEach((topicName, schemaClassname) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.schemaType(schemaClassname)
.isRegexPattern(false)
.build());
});
}
if (!newConfig.getInputSpecs().isEmpty()) {
newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
}
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName().equals(existingConfig.getOutputSerdeClassName())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
throw new IllegalArgumentException("Output Schema mismatch");
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
}
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Ordering cannot be altered");
}
if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering().equals(existingConfig.getRetainKeyOrdering())) {
throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
}
if (!StringUtils.isEmpty(newConfig.getOutput())) {
mergedConfig.setOutput(newConfig.getOutput());
}
if (newConfig.getUserConfig() != null) {
mergedConfig.setUserConfig(newConfig.getUserConfig());
}
if (newConfig.getSecrets() != null) {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (newConfig.getRuntime() != null && !newConfig.getRuntime().equals(existingConfig.getRuntime())) {
throw new IllegalArgumentException("Runtime cannot be altered");
}
if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
throw new IllegalArgumentException("AutoAck cannot be altered");
}
if (newConfig.getMaxMessageRetries() != null) {
mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries());
}
if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) {
mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic());
}
if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName().equals(existingConfig.getSubName())) {
throw new IllegalArgumentException("Subscription Name cannot be altered");
}
if (newConfig.getParallelism() != null) {
mergedConfig.setParallelism(newConfig.getParallelism());
}
if (newConfig.getResources() != null) {
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
}
if (newConfig.getWindowConfig() != null) {
mergedConfig.setWindowConfig(newConfig.getWindowConfig());
}
if (newConfig.getTimeoutMs() != null) {
mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
}
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
return mergedConfig;
}
}