| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.admin.cli; |
| |
| import static org.apache.commons.lang.StringUtils.isBlank; |
| import static org.apache.commons.lang.StringUtils.isNotBlank; |
| import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; |
| import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.ParameterException; |
| import com.beust.jcommander.Parameters; |
| import com.beust.jcommander.converters.StringConverter; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| import com.google.gson.reflect.TypeToken; |
| import java.io.File; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.Type; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Supplier; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.pulsar.admin.cli.utils.CmdUtils; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.common.functions.ConsumerConfig; |
| import org.apache.pulsar.common.functions.FunctionConfig; |
| import org.apache.pulsar.common.functions.FunctionState; |
| import org.apache.pulsar.common.functions.ProducerConfig; |
| import org.apache.pulsar.common.functions.Resources; |
| import org.apache.pulsar.common.functions.UpdateOptionsImpl; |
| import org.apache.pulsar.common.functions.Utils; |
| import org.apache.pulsar.common.functions.WindowConfig; |
| import org.apache.pulsar.common.util.ObjectMapperFactory; |
| |
| @Slf4j |
| @Parameters(commandDescription = "Interface for managing Pulsar Functions " |
| + "(lightweight, Lambda-style compute processes that work with Pulsar)") |
| public class CmdFunctions extends CmdBase { |
| private final LocalRunner localRunner; |
| private final CreateFunction creater; |
| private final DeleteFunction deleter; |
| private final UpdateFunction updater; |
| private final GetFunction getter; |
| private final GetFunctionStatus functionStatus; |
| @Getter |
| private final GetFunctionStats functionStats; |
| private final RestartFunction restart; |
| private final StopFunction stop; |
| private final StartFunction start; |
| private final ListFunctions lister; |
| private final StateGetter stateGetter; |
| private final StatePutter statePutter; |
| private final TriggerFunction triggerer; |
| private final UploadFunction uploader; |
| private final DownloadFunction downloader; |
| |
| /** |
| * Base command. |
| */ |
| @Getter |
| abstract class BaseCommand extends CliCommand { |
| @Override |
| void run() throws Exception { |
| try { |
| processArguments(); |
| } catch (Exception e) { |
| System.err.println(e.getMessage()); |
| System.err.println(); |
| String chosenCommand = jcommander.getParsedCommand(); |
| getUsageFormatter().usage(chosenCommand); |
| return; |
| } |
| runCmd(); |
| } |
| |
| void processArguments() throws Exception {} |
| |
| abstract void runCmd() throws Exception; |
| } |
| |
| /** |
| * Namespace level command. |
| */ |
| @Getter |
| abstract class NamespaceCommand extends BaseCommand { |
| @Parameter(names = "--tenant", description = "The tenant of a Pulsar Function") |
| protected String tenant; |
| |
| @Parameter(names = "--namespace", description = "The namespace of a Pulsar Function") |
| protected String namespace; |
| |
| @Override |
| public void processArguments() { |
| if (tenant == null) { |
| tenant = PUBLIC_TENANT; |
| } |
| if (namespace == null) { |
| namespace = DEFAULT_NAMESPACE; |
| } |
| } |
| } |
| |
| /** |
| * Function level command. |
| */ |
| @Getter |
| abstract class FunctionCommand extends BaseCommand { |
| @Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") |
| protected String fqfn; |
| |
| @Parameter(names = "--tenant", description = "The tenant of a Pulsar Function") |
| protected String tenant; |
| |
| @Parameter(names = "--namespace", description = "The namespace of a Pulsar Function") |
| protected String namespace; |
| |
| @Parameter(names = "--name", description = "The name of a Pulsar Function") |
| protected String functionName; |
| |
| @Override |
| void processArguments() throws Exception { |
| super.processArguments(); |
| |
| boolean usesSetters = (null != tenant || null != namespace || null != functionName); |
| boolean usesFqfn = (null != fqfn); |
| |
| // Throw an exception if --fqfn is set alongside any combination of --tenant, --namespace, and --name |
| if (usesFqfn && usesSetters) { |
| throw new RuntimeException("You must specify either a Fully Qualified Function Name (FQFN) " |
| + "or tenant, namespace, and function name"); |
| } else if (usesFqfn) { |
| // If the --fqfn flag is used, parse tenant, namespace, and name using that flag |
| String[] fqfnParts = fqfn.split("/"); |
| if (fqfnParts.length != 3) { |
| throw new RuntimeException( |
| "Fully qualified function names (FQFNs) must be of the form tenant/namespace/name"); |
| } |
| tenant = fqfnParts[0]; |
| namespace = fqfnParts[1]; |
| functionName = fqfnParts[2]; |
| } else { |
| if (tenant == null) { |
| tenant = PUBLIC_TENANT; |
| } |
| if (namespace == null) { |
| namespace = DEFAULT_NAMESPACE; |
| } |
| if (null == functionName) { |
| throw new RuntimeException( |
| "You must specify a name for the function or a Fully Qualified Function Name (FQFN)"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Commands that require a function config. |
| */ |
| @Getter |
| abstract class FunctionDetailsCommand extends BaseCommand { |
| @Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") |
| protected String fqfn; |
| @Parameter(names = "--tenant", description = "The tenant of a Pulsar Function") |
| protected String tenant; |
| @Parameter(names = "--namespace", description = "The namespace of a Pulsar Function") |
| protected String namespace; |
| @Parameter(names = "--name", description = "The name of a Pulsar Function") |
| protected String functionName; |
| // for backwards compatibility purposes |
| @Parameter(names = "--className", description = "The class name of a Pulsar Function", hidden = true) |
| protected String deprecatedClassName; |
| @Parameter(names = "--classname", description = "The class name of a Pulsar Function") |
| protected String className; |
| @Parameter(names = "--jar", description = "Path to the JAR file for the function " |
| + "(if the function is written in Java). It also supports URL path [http/https/file " |
| + "(file protocol assumes that file already exists on worker host)/function " |
| + "(package URL from packages management service)] from which worker can download the package.", |
| listConverter = StringConverter.class) |
| protected String jarFile; |
| @Parameter(names = "--py", description = "Path to the main Python file/Python Wheel file for the function " |
| + "(if the function is written in Python). It also supports URL path [http/https/file " |
| + "(file protocol assumes that file already exists on worker host)/function " |
| + "(package URL from packages management service)] from which worker can download the package.", |
| listConverter = StringConverter.class) |
| protected String pyFile; |
| @Parameter(names = "--go", description = "Path to the main Go executable binary for the function " |
| + "(if the function is written in Go). It also supports URL path [http/https/file " |
| + "(file protocol assumes that file already exists on worker host)/function " |
| + "(package URL from packages management service)] from which worker can download the package.") |
| protected String goFile; |
| @Parameter(names = {"-i", "--inputs"}, description = "The input topic or " |
| + "topics (multiple topics can be specified as a comma-separated list) of a Pulsar Function") |
| protected String inputs; |
| // for backwards compatibility purposes |
| @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics " |
| + "under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. " |
| + "Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", |
| hidden = true) |
| protected String deprecatedTopicsPattern; |
| @Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics " |
| + "under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. " |
| + "Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)") |
| protected String topicsPattern; |
| |
| @Parameter(names = {"-o", "--output"}, |
| description = "The output topic of a Pulsar Function (If none is specified, no output is written)") |
| protected String output; |
| @Parameter(names = "--producer-config", description = "The custom producer configuration (as a JSON string)") |
| protected String producerConfig; |
| // for backwards compatibility purposes |
| @Parameter(names = "--logTopic", |
| description = "The topic to which the logs of a Pulsar Function are produced", hidden = true) |
| protected String deprecatedLogTopic; |
| @Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Function are produced") |
| protected String logTopic; |
| |
| @Parameter(names = {"-st", "--schema-type"}, description = "The builtin schema type or " |
| + "custom schema class name to be used for messages output by the function") |
| protected String schemaType = ""; |
| |
| // for backwards compatibility purposes |
| @Parameter(names = "--customSerdeInputs", |
| description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true) |
| protected String deprecatedCustomSerdeInputString; |
| @Parameter(names = "--custom-serde-inputs", |
| description = "The map of input topics to SerDe class names (as a JSON string)") |
| protected String customSerdeInputString; |
| @Parameter(names = "--custom-schema-inputs", |
| description = "The map of input topics to Schema properties (as a JSON string)") |
| protected String customSchemaInputString; |
| @Parameter(names = "--custom-schema-outputs", |
| description = "The map of input topics to Schema properties (as a JSON string)") |
| protected String customSchemaOutputString; |
| @Parameter(names = "--input-specs", |
| description = "The map of inputs to custom configuration (as a JSON string)") |
| protected String inputSpecs; |
| // for backwards compatibility purposes |
| @Parameter(names = "--outputSerdeClassName", |
| description = "The SerDe class to be used for messages output by the function", hidden = true) |
| protected String deprecatedOutputSerdeClassName; |
| @Parameter(names = "--output-serde-classname", |
| description = "The SerDe class to be used for messages output by the function") |
| protected String outputSerdeClassName; |
| // for backwards compatibility purposes |
| @Parameter(names = "--functionConfigFile", description = "The path to a YAML config file that specifies " |
| + "the configuration of a Pulsar Function", hidden = true) |
| protected String deprecatedFnConfigFile; |
| @Parameter(names = "--function-config-file", |
| description = "The path to a YAML config file that specifies the configuration of a Pulsar Function") |
| protected String fnConfigFile; |
| // for backwards compatibility purposes |
| @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) " |
| + "applied to the function", hidden = true) |
| protected FunctionConfig.ProcessingGuarantees deprecatedProcessingGuarantees; |
| @Parameter(names = "--processing-guarantees", |
| description = "The processing guarantees (aka delivery semantics) applied to the function") |
| protected FunctionConfig.ProcessingGuarantees processingGuarantees; |
| // for backwards compatibility purposes |
| @Parameter(names = "--userConfig", description = "User-defined config key/values", hidden = true) |
| protected String deprecatedUserConfigString; |
| @Parameter(names = "--user-config", description = "User-defined config key/values") |
| protected String userConfigString; |
| @Parameter(names = "--retainOrdering", |
| description = "Function consumes and processes messages in order", hidden = true) |
| protected Boolean deprecatedRetainOrdering; |
| @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") |
| protected Boolean retainOrdering; |
| @Parameter(names = "--retain-key-ordering", |
| description = "Function consumes and processes messages in key order") |
| protected Boolean retainKeyOrdering; |
| @Parameter(names = "--batch-builder", description = "BatcherBuilder provides two types of " |
| + "batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT") |
| protected String batchBuilder; |
| @Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties " |
| + "to output topic when processing (use false to disable it)", arity = 1) |
| protected Boolean forwardSourceMessageProperty = true; |
| @Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific " |
| + "subscription-name for input-topic consumer") |
| protected String subsName; |
| @Parameter(names = "--subs-position", description = "Pulsar source subscription position if user wants to " |
| + "consume messages from the specified location") |
| protected SubscriptionInitialPosition subsPosition; |
| @Parameter(names = "--parallelism", description = "The parallelism factor of a Pulsar Function " |
| + "(i.e. the number of function instances to run)") |
| protected Integer parallelism; |
| @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated " |
| + "per function instance(applicable only to docker runtime)") |
| protected Double cpu; |
| @Parameter(names = "--ram", description = "The ram in bytes that need to be allocated " |
| + "per function instance(applicable only to process/docker runtime)") |
| protected Long ram; |
| @Parameter(names = "--disk", description = "The disk in bytes that need to be allocated " |
| + "per function instance(applicable only to docker runtime)") |
| protected Long disk; |
| // for backwards compatibility purposes |
| @Parameter(names = "--windowLengthCount", description = "The number of messages per window", hidden = true) |
| protected Integer deprecatedWindowLengthCount; |
| @Parameter(names = "--window-length-count", description = "The number of messages per window") |
| protected Integer windowLengthCount; |
| // for backwards compatibility purposes |
| @Parameter(names = "--windowLengthDurationMs", |
| description = "The time duration of the window in milliseconds", hidden = true) |
| protected Long deprecatedWindowLengthDurationMs; |
| @Parameter(names = "--window-length-duration-ms", |
| description = "The time duration of the window in milliseconds") |
| protected Long windowLengthDurationMs; |
| // for backwards compatibility purposes |
| @Parameter(names = "--slidingIntervalCount", |
| description = "The number of messages after which the window slides", hidden = true) |
| protected Integer deprecatedSlidingIntervalCount; |
| @Parameter(names = "--sliding-interval-count", |
| description = "The number of messages after which the window slides") |
| protected Integer slidingIntervalCount; |
| // for backwards compatibility purposes |
| @Parameter(names = "--slidingIntervalDurationMs", |
| description = "The time duration after which the window slides", hidden = true) |
| protected Long deprecatedSlidingIntervalDurationMs; |
| @Parameter(names = "--sliding-interval-duration-ms", |
| description = "The time duration after which the window slides") |
| protected Long slidingIntervalDurationMs; |
| // for backwards compatibility purposes |
| @Parameter(names = "--autoAck", |
| description = "Whether or not the framework acknowledges messages automatically", hidden = true) |
| protected Boolean deprecatedAutoAck = null; |
| @Parameter(names = "--auto-ack", |
| description = "Whether or not the framework acknowledges messages automatically", arity = 1) |
| protected Boolean autoAck; |
| // for backwards compatibility purposes |
| @Parameter(names = "--timeoutMs", description = "The message timeout in milliseconds", hidden = true) |
| protected Long deprecatedTimeoutMs; |
| @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") |
| protected Long timeoutMs; |
| @Parameter(names = "--max-message-retries", |
| description = "How many times should we try to process a message before giving up") |
| protected Integer maxMessageRetries; |
| @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to " |
| + "customize the runtime, see docs for configured runtime for details") |
| protected String customRuntimeOptions; |
| @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates " |
| + "how the secret is fetched by the underlying secrets provider") |
| protected String secretsString; |
| @Parameter(names = "--dead-letter-topic", |
| description = "The topic where messages that are not processed successfully are sent to") |
| protected String deadLetterTopic; |
| protected FunctionConfig functionConfig; |
| protected String userCodeFile; |
| |
| private void mergeArgs() { |
| if (isBlank(className) && !isBlank(deprecatedClassName)) { |
| className = deprecatedClassName; |
| } |
| if (isBlank(topicsPattern) && !isBlank(deprecatedTopicsPattern)) { |
| topicsPattern = deprecatedTopicsPattern; |
| } |
| if (isBlank(logTopic) && !isBlank(deprecatedLogTopic)) { |
| logTopic = deprecatedLogTopic; |
| } |
| if (isBlank(outputSerdeClassName) && !isBlank(deprecatedOutputSerdeClassName)) { |
| outputSerdeClassName = deprecatedOutputSerdeClassName; |
| } |
| if (isBlank(customSerdeInputString) && !isBlank(deprecatedCustomSerdeInputString)) { |
| customSerdeInputString = deprecatedCustomSerdeInputString; |
| } |
| |
| if (isBlank(fnConfigFile) && !isBlank(deprecatedFnConfigFile)) { |
| fnConfigFile = deprecatedFnConfigFile; |
| } |
| if (processingGuarantees == null && deprecatedProcessingGuarantees != null) { |
| processingGuarantees = deprecatedProcessingGuarantees; |
| } |
| if (isBlank(userConfigString) && !isBlank(deprecatedUserConfigString)) { |
| userConfigString = deprecatedUserConfigString; |
| } |
| if (retainOrdering == null && deprecatedRetainOrdering != null) { |
| retainOrdering = deprecatedRetainOrdering; |
| } |
| if (windowLengthCount == null && deprecatedWindowLengthCount != null) { |
| windowLengthCount = deprecatedWindowLengthCount; |
| } |
| if (windowLengthDurationMs == null && deprecatedWindowLengthDurationMs != null) { |
| windowLengthDurationMs = deprecatedWindowLengthDurationMs; |
| } |
| if (slidingIntervalCount == null && deprecatedSlidingIntervalCount != null) { |
| slidingIntervalCount = deprecatedSlidingIntervalCount; |
| } |
| if (slidingIntervalDurationMs == null && deprecatedSlidingIntervalDurationMs != null) { |
| slidingIntervalDurationMs = deprecatedSlidingIntervalDurationMs; |
| } |
| if (autoAck == null && deprecatedAutoAck != null) { |
| autoAck = deprecatedAutoAck; |
| } |
| if (timeoutMs == null && deprecatedTimeoutMs != null) { |
| timeoutMs = deprecatedTimeoutMs; |
| } |
| } |
| |
| @Override |
| void processArguments() throws Exception { |
| super.processArguments(); |
| // merge deprecated args with new args |
| mergeArgs(); |
| |
| // Initialize config builder either from a supplied YAML config file or from scratch |
| if (null != fnConfigFile) { |
| functionConfig = CmdUtils.loadConfig(fnConfigFile, FunctionConfig.class); |
| } else { |
| functionConfig = new FunctionConfig(); |
| } |
| |
| if (null != fqfn) { |
| parseFullyQualifiedFunctionName(fqfn, functionConfig); |
| } else { |
| if (null != tenant) { |
| functionConfig.setTenant(tenant); |
| } |
| if (null != namespace) { |
| functionConfig.setNamespace(namespace); |
| } |
| if (null != functionName) { |
| functionConfig.setName(functionName); |
| } |
| } |
| |
| if (null != inputs) { |
| List<String> inputTopics = Arrays.asList(inputs.split(",")); |
| functionConfig.setInputs(inputTopics); |
| } |
| if (null != customSerdeInputString) { |
| Type type = new TypeToken<Map<String, String>>() {}.getType(); |
| Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); |
| functionConfig.setCustomSerdeInputs(customSerdeInputMap); |
| } |
| if (null != customSchemaInputString) { |
| Type type = new TypeToken<Map<String, String>>() {}.getType(); |
| Map<String, String> customschemaInputMap = new Gson().fromJson(customSchemaInputString, type); |
| functionConfig.setCustomSchemaInputs(customschemaInputMap); |
| } |
| if (null != customSchemaOutputString) { |
| Type type = new TypeToken<Map<String, String>>() {}.getType(); |
| Map<String, String> customSchemaOutputMap = new Gson().fromJson(customSchemaOutputString, type); |
| functionConfig.setCustomSchemaOutputs(customSchemaOutputMap); |
| } |
| if (null != inputSpecs) { |
| Type type = new TypeToken<Map<String, ConsumerConfig>>() {}.getType(); |
| functionConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type)); |
| } |
| if (null != topicsPattern) { |
| functionConfig.setTopicsPattern(topicsPattern); |
| } |
| if (null != output) { |
| functionConfig.setOutput(output); |
| } |
| if (null != producerConfig) { |
| Type type = new TypeToken<ProducerConfig>() {}.getType(); |
| functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type)); |
| } |
| if (null != logTopic) { |
| functionConfig.setLogTopic(logTopic); |
| } |
| if (null != className) { |
| functionConfig.setClassName(className); |
| } |
| if (null != outputSerdeClassName) { |
| functionConfig.setOutputSerdeClassName(outputSerdeClassName); |
| } |
| |
| if (null != schemaType) { |
| functionConfig.setOutputSchemaType(schemaType); |
| } |
| if (null != processingGuarantees) { |
| functionConfig.setProcessingGuarantees(processingGuarantees); |
| } |
| |
| if (null != retainOrdering) { |
| functionConfig.setRetainOrdering(retainOrdering); |
| } |
| |
| if (null != retainKeyOrdering) { |
| functionConfig.setRetainKeyOrdering(retainKeyOrdering); |
| } |
| |
| if (isNotBlank(batchBuilder)) { |
| functionConfig.setBatchBuilder(batchBuilder); |
| } |
| |
| if (null != forwardSourceMessageProperty) { |
| functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty); |
| } |
| |
| if (isNotBlank(subsName)) { |
| functionConfig.setSubName(subsName); |
| } |
| |
| if (null != subsPosition) { |
| functionConfig.setSubscriptionPosition(subsPosition); |
| } |
| |
| if (null != userConfigString) { |
| Type type = new TypeToken<Map<String, String>>() {}.getType(); |
| Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type); |
| if (userConfigMap == null) { |
| userConfigMap = new HashMap<>(); |
| } |
| functionConfig.setUserConfig(userConfigMap); |
| } |
| |
| if (parallelism != null) { |
| functionConfig.setParallelism(parallelism); |
| } |
| |
| Resources resources = functionConfig.getResources(); |
| if (cpu != null) { |
| if (resources == null) { |
| resources = new Resources(); |
| } |
| resources.setCpu(cpu); |
| } |
| |
| if (ram != null) { |
| if (resources == null) { |
| resources = new Resources(); |
| } |
| resources.setRam(ram); |
| } |
| |
| if (disk != null) { |
| if (resources == null) { |
| resources = new Resources(); |
| } |
| resources.setDisk(disk); |
| } |
| if (resources != null) { |
| functionConfig.setResources(resources); |
| } |
| |
| if (timeoutMs != null) { |
| functionConfig.setTimeoutMs(timeoutMs); |
| } |
| |
| if (customRuntimeOptions != null) { |
| functionConfig.setCustomRuntimeOptions(customRuntimeOptions); |
| } |
| |
| if (secretsString != null) { |
| Type type = new TypeToken<Map<String, Object>>() {}.getType(); |
| Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type); |
| if (secretsMap == null) { |
| secretsMap = Collections.emptyMap(); |
| } |
| functionConfig.setSecrets(secretsMap); |
| } |
| |
| // window configs |
| WindowConfig windowConfig = functionConfig.getWindowConfig(); |
| if (null != windowLengthCount) { |
| if (windowConfig == null) { |
| windowConfig = new WindowConfig(); |
| } |
| windowConfig.setWindowLengthCount(windowLengthCount); |
| } |
| if (null != windowLengthDurationMs) { |
| if (windowConfig == null) { |
| windowConfig = new WindowConfig(); |
| } |
| windowConfig.setWindowLengthDurationMs(windowLengthDurationMs); |
| } |
| if (null != slidingIntervalCount) { |
| if (windowConfig == null) { |
| windowConfig = new WindowConfig(); |
| } |
| windowConfig.setSlidingIntervalCount(slidingIntervalCount); |
| } |
| if (null != slidingIntervalDurationMs) { |
| if (windowConfig == null) { |
| windowConfig = new WindowConfig(); |
| } |
| windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs); |
| } |
| |
| functionConfig.setWindowConfig(windowConfig); |
| |
| if (autoAck != null) { |
| functionConfig.setAutoAck(autoAck); |
| } |
| |
| if (null != maxMessageRetries) { |
| functionConfig.setMaxMessageRetries(maxMessageRetries); |
| } |
| if (null != deadLetterTopic) { |
| functionConfig.setDeadLetterTopic(deadLetterTopic); |
| } |
| |
| if (null != jarFile) { |
| functionConfig.setJar(jarFile); |
| } |
| |
| if (null != pyFile) { |
| functionConfig.setPy(pyFile); |
| } |
| |
| if (null != goFile) { |
| functionConfig.setGo(goFile); |
| } |
| |
| if (functionConfig.getJar() != null) { |
| userCodeFile = functionConfig.getJar(); |
| } else if (functionConfig.getPy() != null) { |
| userCodeFile = functionConfig.getPy(); |
| } else if (functionConfig.getGo() != null) { |
| userCodeFile = functionConfig.getGo(); |
| } |
| |
| // check if configs are valid |
| validateFunctionConfigs(functionConfig); |
| } |
| |
| protected void validateFunctionConfigs(FunctionConfig functionConfig) { |
| // go doesn't need className |
| if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON |
| || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){ |
| if (StringUtils.isEmpty(functionConfig.getClassName())) { |
| throw new IllegalArgumentException("No Function Classname specified"); |
| } |
| } |
| if (StringUtils.isEmpty(functionConfig.getName())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig); |
| } |
| if (StringUtils.isEmpty(functionConfig.getTenant())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig); |
| } |
| if (StringUtils.isEmpty(functionConfig.getNamespace())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig); |
| } |
| |
| if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy()) |
| && isNotBlank(functionConfig.getGo())) { |
| throw new ParameterException("Either a Java jar or a Python file or a Go executable binary needs to" |
| + " be specified for the function. Cannot specify both."); |
| } |
| |
| if (isBlank(functionConfig.getJar()) && isBlank(functionConfig.getPy()) |
| && isBlank(functionConfig.getGo())) { |
| throw new ParameterException("Either a Java jar or a Python file or a Go executable binary needs to" |
| + " be specified for the function. Please specify one."); |
| } |
| |
| if (!isBlank(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) |
| && !new File(functionConfig.getJar()).exists()) { |
| throw new ParameterException("The specified jar file does not exist"); |
| } |
| if (!isBlank(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) |
| && !new File(functionConfig.getPy()).exists()) { |
| throw new ParameterException("The specified python file does not exist"); |
| } |
| if (!isBlank(functionConfig.getGo()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getGo()) |
| && !new File(functionConfig.getGo()).exists()) { |
| throw new ParameterException("The specified go executable binary does not exist"); |
| } |
| } |
| } |
| |
| @Parameters(commandDescription = "Run a Pulsar Function locally, rather than deploy to a Pulsar cluster)") |
| class LocalRunner extends FunctionDetailsCommand { |
| |
| // TODO: this should become BookKeeper URL and it should be fetched from Pulsar client. |
| // for backwards compatibility purposes |
| @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service " |
| + "(the default is Apache BookKeeper)", hidden = true) |
| protected String deprecatedStateStorageServiceUrl; |
| @Parameter(names = "--state-storage-service-url", description = "The URL for the state storage service " |
| + "(the default is Apache BookKeeper)") |
| protected String stateStorageServiceUrl; |
| // for backwards compatibility purposes |
| @Parameter(names = "--brokerServiceUrl", description = "The URL for Pulsar broker", hidden = true) |
| protected String deprecatedBrokerServiceUrl; |
| @Parameter(names = "--broker-service-url", description = "The URL for Pulsar broker") |
| protected String brokerServiceUrl; |
| @Parameter(names = "--web-service-url", description = "The URL for Pulsar web service") |
| protected String webServiceUrl = null; |
| // for backwards compatibility purposes |
| @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using " |
| + "which function-process can connect to broker", hidden = true) |
| protected String deprecatedClientAuthPlugin; |
| @Parameter(names = "--client-auth-plugin", |
| description = "Client authentication plugin using which function-process can connect to broker") |
| protected String clientAuthPlugin; |
| // for backwards compatibility purposes |
| @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true) |
| protected String deprecatedClientAuthParams; |
| @Parameter(names = "--client-auth-params", description = "Client authentication param") |
| protected String clientAuthParams; |
| // for backwards compatibility purposes |
| @Parameter(names = "--use_tls", description = "Use tls connection", hidden = true) |
| protected Boolean deprecatedUseTls = null; |
| @Parameter(names = "--use-tls", description = "Use tls connection") |
| protected boolean useTls; |
| // for backwards compatibility purposes |
| @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection", hidden = true) |
| protected Boolean deprecatedTlsAllowInsecureConnection = null; |
| @Parameter(names = "--tls-allow-insecure", description = "Allow insecure tls connection") |
| protected boolean tlsAllowInsecureConnection; |
| // for backwards compatibility purposes |
| @Parameter(names = "--hostname_verification_enabled", |
| description = "Enable hostname verification", hidden = true) |
| protected Boolean deprecatedTlsHostNameVerificationEnabled = null; |
| @Parameter(names = "--hostname-verification-enabled", description = "Enable hostname verification") |
| protected boolean tlsHostNameVerificationEnabled; |
| // for backwards compatibility purposes |
| @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path", hidden = true) |
| protected String deprecatedTlsTrustCertFilePath; |
| @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path") |
| protected String tlsTrustCertFilePath; |
| // for backwards compatibility purposes |
| @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true) |
| protected Integer deprecatedInstanceIdOffset = null; |
| @Parameter(names = "--instance-id-offset", description = "Start the instanceIds from this offset") |
| protected Integer instanceIdOffset = 0; |
| @Parameter(names = "--runtime", description = "either THREAD or PROCESS. Only applies for Java functions") |
| protected String runtime; |
| @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider") |
| protected String secretsProviderClassName; |
| @Parameter(names = "--secrets-provider-config", |
| description = "Config that needs to be passed to secrets provider") |
| protected String secretsProviderConfig; |
| @Parameter(names = "--metrics-port-start", description = "The starting port range for metrics server") |
| protected String metricsPortStart; |
| |
| private void mergeArgs() { |
| if (isBlank(stateStorageServiceUrl) && !isBlank(deprecatedStateStorageServiceUrl)) { |
| stateStorageServiceUrl = deprecatedStateStorageServiceUrl; |
| } |
| if (isBlank(brokerServiceUrl) && !isBlank(deprecatedBrokerServiceUrl)) { |
| brokerServiceUrl = deprecatedBrokerServiceUrl; |
| } |
| if (isBlank(clientAuthPlugin) && !isBlank(deprecatedClientAuthPlugin)) { |
| clientAuthPlugin = deprecatedClientAuthPlugin; |
| } |
| if (isBlank(clientAuthParams) && !isBlank(deprecatedClientAuthParams)) { |
| clientAuthParams = deprecatedClientAuthParams; |
| } |
| if (!useTls && deprecatedUseTls != null) { |
| useTls = deprecatedUseTls; |
| } |
| if (!tlsAllowInsecureConnection && deprecatedTlsAllowInsecureConnection != null) { |
| tlsAllowInsecureConnection = deprecatedTlsAllowInsecureConnection; |
| } |
| if (!tlsHostNameVerificationEnabled && deprecatedTlsHostNameVerificationEnabled != null) { |
| tlsHostNameVerificationEnabled = deprecatedTlsHostNameVerificationEnabled; |
| } |
| if (isBlank(tlsTrustCertFilePath) && !isBlank(deprecatedTlsTrustCertFilePath)) { |
| tlsTrustCertFilePath = deprecatedTlsTrustCertFilePath; |
| } |
| if (instanceIdOffset == null && deprecatedInstanceIdOffset != null) { |
| instanceIdOffset = deprecatedInstanceIdOffset; |
| } |
| } |
| |
| @Override |
| void runCmd() throws Exception { |
| // merge deprecated args with new args |
| mergeArgs(); |
| List<String> localRunArgs = new LinkedList<>(); |
| localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); |
| localRunArgs.add("--functionConfig"); |
| localRunArgs.add(new Gson().toJson(functionConfig)); |
| for (Field field : this.getClass().getDeclaredFields()) { |
| if (field.getName().startsWith("DEPRECATED")) { |
| continue; |
| } |
| if (field.getName().contains("$")) { |
| continue; |
| } |
| Object value = field.get(this); |
| if (value != null) { |
| localRunArgs.add("--" + field.getName()); |
| localRunArgs.add(value.toString()); |
| } |
| } |
| ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); |
| Process process = processBuilder.start(); |
| process.waitFor(); |
| } |
| } |
| |
| @Parameters(commandDescription = "Create a Pulsar Function in cluster mode (deploy it on a Pulsar cluster)") |
| class CreateFunction extends FunctionDetailsCommand { |
| @Override |
| void runCmd() throws Exception { |
| if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { |
| getAdmin().functions().createFunctionWithUrl(functionConfig, functionConfig.getJar()); |
| } else if (Utils.isFunctionPackageUrlSupported(functionConfig.getPy())) { |
| getAdmin().functions().createFunctionWithUrl(functionConfig, functionConfig.getPy()); |
| } else if (Utils.isFunctionPackageUrlSupported(functionConfig.getGo())) { |
| getAdmin().functions().createFunctionWithUrl(functionConfig, functionConfig.getGo()); |
| } else { |
| getAdmin().functions().createFunction(functionConfig, userCodeFile); |
| } |
| |
| print("Created successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Fetch information about a Pulsar Function") |
| class GetFunction extends FunctionCommand { |
| @Override |
| void runCmd() throws Exception { |
| FunctionConfig functionConfig = getAdmin().functions().getFunction(tenant, namespace, functionName); |
| Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| System.out.println(gson.toJson(functionConfig)); |
| } |
| } |
| |
| @Parameters(commandDescription = "Check the current status of a Pulsar Function") |
| class GetFunctionStatus extends FunctionCommand { |
| |
| @Parameter(names = "--instance-id", description = "The function instanceId " |
| + "(Get-status of all instances if instance-id is not provided)") |
| protected String instanceId; |
| |
| @Override |
| void runCmd() throws Exception { |
| if (isBlank(instanceId)) { |
| print(getAdmin().functions().getFunctionStatus(tenant, namespace, functionName)); |
| } else { |
| print(getAdmin().functions() |
| .getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId))); |
| } |
| } |
| } |
| |
| @Parameters(commandDescription = "Get the current stats of a Pulsar Function") |
| class GetFunctionStats extends FunctionCommand { |
| |
| @Parameter(names = "--instance-id", description = "The function instanceId " |
| + "(Get-stats of all instances if instance-id is not provided)") |
| protected String instanceId; |
| |
| @Override |
| void runCmd() throws Exception { |
| |
| if (isBlank(instanceId)) { |
| print(getAdmin().functions().getFunctionStats(tenant, namespace, functionName)); |
| } else { |
| print(getAdmin().functions() |
| .getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId))); |
| } |
| } |
| } |
| |
| @Parameters(commandDescription = "Restart function instance") |
| class RestartFunction extends FunctionCommand { |
| |
| @Parameter(names = "--instance-id", description = "The function instanceId " |
| + "(restart all instances if instance-id is not provided)") |
| protected String instanceId; |
| |
| @Override |
| void runCmd() throws Exception { |
| if (isNotBlank(instanceId)) { |
| try { |
| getAdmin().functions() |
| .restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); |
| } catch (NumberFormatException e) { |
| System.err.println("instance-id must be a number"); |
| } |
| } else { |
| getAdmin().functions().restartFunction(tenant, namespace, functionName); |
| } |
| System.out.println("Restarted successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Stops function instance") |
| class StopFunction extends FunctionCommand { |
| |
| @Parameter(names = "--instance-id", description = "The function instanceId " |
| + "(stop all instances if instance-id is not provided)") |
| protected String instanceId; |
| |
| @Override |
| void runCmd() throws Exception { |
| if (isNotBlank(instanceId)) { |
| try { |
| getAdmin().functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); |
| } catch (NumberFormatException e) { |
| System.err.println("instance-id must be a number"); |
| } |
| } else { |
| getAdmin().functions().stopFunction(tenant, namespace, functionName); |
| } |
| System.out.println("Stopped successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Starts a stopped function instance") |
| class StartFunction extends FunctionCommand { |
| |
| @Parameter(names = "--instance-id", description = "The function instanceId " |
| + "(start all instances if instance-id is not provided)") |
| protected String instanceId; |
| |
| @Override |
| void runCmd() throws Exception { |
| if (isNotBlank(instanceId)) { |
| try { |
| getAdmin().functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); |
| } catch (NumberFormatException e) { |
| System.err.println("instance-id must be a number"); |
| } |
| } else { |
| getAdmin().functions().startFunction(tenant, namespace, functionName); |
| } |
| System.out.println("Started successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Delete a Pulsar Function that is running on a Pulsar cluster") |
| class DeleteFunction extends FunctionCommand { |
| @Override |
| void runCmd() throws Exception { |
| getAdmin().functions().deleteFunction(tenant, namespace, functionName); |
| print("Deleted successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Update a Pulsar Function that has been deployed to a Pulsar cluster") |
| class UpdateFunction extends FunctionDetailsCommand { |
| |
| @Parameter(names = "--update-auth-data", description = "Whether or not to update the auth data") |
| protected boolean updateAuthData; |
| |
| @Override |
| protected void validateFunctionConfigs(FunctionConfig functionConfig) { |
| if (StringUtils.isEmpty(functionConfig.getClassName())) { |
| if (StringUtils.isEmpty(functionConfig.getName())) { |
| throw new IllegalArgumentException("Function Name not provided"); |
| } |
| } else if (StringUtils.isEmpty(functionConfig.getName())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig); |
| } |
| if (StringUtils.isEmpty(functionConfig.getTenant())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig); |
| } |
| if (StringUtils.isEmpty(functionConfig.getNamespace())) { |
| org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig); |
| } |
| } |
| |
| @Override |
| void runCmd() throws Exception { |
| |
| UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); |
| updateOptions.setUpdateAuthData(updateAuthData); |
| if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { |
| getAdmin().functions().updateFunctionWithUrl(functionConfig, functionConfig.getJar(), updateOptions); |
| } else { |
| getAdmin().functions().updateFunction(functionConfig, userCodeFile, updateOptions); |
| } |
| print("Updated successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "List all Pulsar Functions running under a specific tenant and namespace") |
| class ListFunctions extends NamespaceCommand { |
| @Override |
| void runCmd() throws Exception { |
| print(getAdmin().functions().getFunctions(tenant, namespace)); |
| } |
| } |
| |
| @Parameters(commandDescription = "Fetch the current state associated with a Pulsar Function") |
| class StateGetter extends FunctionCommand { |
| |
| @Parameter(names = { "-k", "--key" }, description = "Key name of State") |
| private String key = null; |
| |
| @Parameter(names = { "-w", "--watch" }, description = "Watch for changes in the value associated with a key " |
| + "for a Pulsar Function") |
| private boolean watch = false; |
| |
| @Override |
| void runCmd() throws Exception { |
| if (isBlank(key)) { |
| throw new ParameterException("State key needs to be specified"); |
| } |
| do { |
| try { |
| FunctionState functionState = getAdmin().functions() |
| .getFunctionState(tenant, namespace, functionName, key); |
| Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| System.out.println(gson.toJson(functionState)); |
| } catch (PulsarAdminException pae) { |
| if (pae.getStatusCode() == 404 && watch) { |
| System.err.println(pae.getMessage()); |
| } else { |
| throw pae; |
| } |
| } |
| if (watch) { |
| Thread.sleep(1000); |
| } |
| } while (watch); |
| } |
| } |
| |
| @Parameters(commandDescription = "Put the state associated with a Pulsar Function") |
| class StatePutter extends FunctionCommand { |
| |
| @Parameter(names = { "-s", "--state" }, description = "The FunctionState that needs to be put", required = true) |
| private String state = null; |
| |
| @Override |
| void runCmd() throws Exception { |
| TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {}; |
| FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef); |
| getAdmin().functions() |
| .putFunctionState(tenant, namespace, functionName, stateRepr); |
| } |
| } |
| |
| @Parameters(commandDescription = "Trigger the specified Pulsar Function with a supplied value") |
| class TriggerFunction extends FunctionCommand { |
| // for backward compatibility purposes |
| @Parameter(names = "--triggerValue", |
| description = "The value with which you want to trigger the function", hidden = true) |
| protected String deprecatedTriggerValue; |
| @Parameter(names = "--trigger-value", description = "The value with which you want to trigger the function") |
| protected String triggerValue; |
| // for backward compatibility purposes |
| @Parameter(names = "--triggerFile", description = "The path to the file that contains the data with which " |
| + "you want to trigger the function", hidden = true) |
| protected String deprecatedTriggerFile; |
| @Parameter(names = "--trigger-file", description = "The path to the file that contains the data with which " |
| + "you want to trigger the function") |
| protected String triggerFile; |
| @Parameter(names = "--topic", description = "The specific topic name that the function consumes from that" |
| + " you want to inject the data to") |
| protected String topic; |
| |
| public void mergeArgs() { |
| if (isBlank(triggerValue) && !isBlank(deprecatedTriggerValue)) { |
| triggerValue = deprecatedTriggerValue; |
| } |
| if (isBlank(triggerFile) && !isBlank(deprecatedTriggerFile)) { |
| triggerFile = deprecatedTriggerFile; |
| } |
| } |
| |
| @Override |
| void runCmd() throws Exception { |
| // merge deprecated args with new args |
| mergeArgs(); |
| if (triggerFile == null && triggerValue == null) { |
| throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); |
| } |
| String retval = getAdmin().functions() |
| .triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile); |
| System.out.println(retval); |
| } |
| } |
| |
| @Parameters(commandDescription = "Upload File Data to Pulsar", hidden = true) |
| class UploadFunction extends BaseCommand { |
| // for backward compatibility purposes |
| @Parameter( |
| names = "--sourceFile", |
| description = "The file whose contents need to be uploaded", |
| listConverter = StringConverter.class, hidden = true) |
| protected String deprecatedSourceFile; |
| @Parameter( |
| names = "--source-file", |
| description = "The file whose contents need to be uploaded", |
| listConverter = StringConverter.class) |
| protected String sourceFile; |
| @Parameter( |
| names = "--path", |
| description = "Path where the contents need to be stored", |
| listConverter = StringConverter.class, required = true) |
| protected String path; |
| |
| private void mergeArgs() { |
| if (isBlank(sourceFile) && !isBlank(deprecatedSourceFile)) { |
| sourceFile = deprecatedSourceFile; |
| } |
| } |
| |
| @Override |
| void runCmd() throws Exception { |
| // merge deprecated args with new args |
| mergeArgs(); |
| if (StringUtils.isBlank(sourceFile)) { |
| throw new ParameterException("--source-file needs to be specified"); |
| } |
| getAdmin().functions().uploadFunction(sourceFile, path); |
| print("Uploaded successfully"); |
| } |
| } |
| |
| @Parameters(commandDescription = "Download File Data from Pulsar", hidden = true) |
| class DownloadFunction extends FunctionCommand { |
| // for backward compatibility purposes |
| @Parameter( |
| names = "--destinationFile", |
| description = "The file to store downloaded content", |
| listConverter = StringConverter.class, hidden = true) |
| protected String deprecatedDestinationFile; |
| @Parameter( |
| names = "--destination-file", |
| description = "The file to store downloaded content", |
| listConverter = StringConverter.class) |
| protected String destinationFile; |
| @Parameter( |
| names = "--path", |
| description = "Path to store the content", |
| listConverter = StringConverter.class, required = false, hidden = true) |
| protected String path; |
| |
| private void mergeArgs() { |
| if (isBlank(destinationFile) && !isBlank(deprecatedDestinationFile)) { |
| destinationFile = deprecatedDestinationFile; |
| } |
| } |
| |
| @Override |
| void processArguments() throws Exception { |
| if (path == null) { |
| super.processArguments(); |
| } |
| } |
| |
| @Override |
| void runCmd() throws Exception { |
| // merge deprecated args with new args |
| mergeArgs(); |
| if (StringUtils.isBlank(destinationFile)) { |
| throw new ParameterException("--destination-file needs to be specified"); |
| } |
| if (path != null) { |
| getAdmin().functions().downloadFunction(destinationFile, path); |
| } else { |
| getAdmin().functions().downloadFunction(destinationFile, tenant, namespace, functionName); |
| } |
| print("Downloaded successfully"); |
| } |
| } |
| |
| public CmdFunctions(Supplier<PulsarAdmin> admin) throws PulsarClientException { |
| super("functions", admin); |
| localRunner = new LocalRunner(); |
| creater = new CreateFunction(); |
| deleter = new DeleteFunction(); |
| updater = new UpdateFunction(); |
| getter = new GetFunction(); |
| functionStatus = new GetFunctionStatus(); |
| functionStats = new GetFunctionStats(); |
| lister = new ListFunctions(); |
| stateGetter = new StateGetter(); |
| statePutter = new StatePutter(); |
| triggerer = new TriggerFunction(); |
| uploader = new UploadFunction(); |
| downloader = new DownloadFunction(); |
| restart = new RestartFunction(); |
| stop = new StopFunction(); |
| start = new StartFunction(); |
| jcommander.addCommand("localrun", getLocalRunner()); |
| jcommander.addCommand("create", getCreater()); |
| jcommander.addCommand("delete", getDeleter()); |
| jcommander.addCommand("update", getUpdater()); |
| jcommander.addCommand("get", getGetter()); |
| jcommander.addCommand("restart", getRestarter()); |
| jcommander.addCommand("stop", getStopper()); |
| jcommander.addCommand("start", getStarter()); |
| // TODO depecreate getstatus |
| jcommander.addCommand("status", getStatuser(), "getstatus"); |
| jcommander.addCommand("stats", getFunctionStats()); |
| jcommander.addCommand("list", getLister()); |
| jcommander.addCommand("querystate", getStateGetter()); |
| jcommander.addCommand("putstate", getStatePutter()); |
| jcommander.addCommand("trigger", getTriggerer()); |
| jcommander.addCommand("upload", getUploader()); |
| jcommander.addCommand("download", getDownloader()); |
| } |
| |
| @VisibleForTesting |
| LocalRunner getLocalRunner() { |
| return localRunner; |
| } |
| |
| @VisibleForTesting |
| CreateFunction getCreater() { |
| return creater; |
| } |
| |
| @VisibleForTesting |
| DeleteFunction getDeleter() { |
| return deleter; |
| } |
| |
| @VisibleForTesting |
| UpdateFunction getUpdater() { |
| return updater; |
| } |
| |
| @VisibleForTesting |
| GetFunction getGetter() { |
| return getter; |
| } |
| |
| @VisibleForTesting |
| GetFunctionStatus getStatuser() { |
| return functionStatus; |
| } |
| |
| @VisibleForTesting |
| ListFunctions getLister() { |
| return lister; |
| } |
| |
| @VisibleForTesting |
| StatePutter getStatePutter() { |
| return statePutter; |
| } |
| |
| @VisibleForTesting |
| StateGetter getStateGetter() { |
| return stateGetter; |
| } |
| |
| @VisibleForTesting |
| TriggerFunction getTriggerer() { |
| return triggerer; |
| } |
| |
| @VisibleForTesting |
| UploadFunction getUploader() { |
| return uploader; |
| } |
| |
| @VisibleForTesting |
| DownloadFunction getDownloader() { |
| return downloader; |
| } |
| |
| @VisibleForTesting |
| RestartFunction getRestarter() { |
| return restart; |
| } |
| |
| @VisibleForTesting |
| StopFunction getStopper() { |
| return stop; |
| } |
| |
| @VisibleForTesting |
| StartFunction getStarter() { |
| return start; |
| } |
| |
| private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { |
| String[] args = fqfn.split("/"); |
| if (args.length != 3) { |
| throw new ParameterException("Fully qualified function names (FQFNs) must " |
| + "be of the form tenant/namespace/name"); |
| } else { |
| functionConfig.setTenant(args[0]); |
| functionConfig.setNamespace(args[1]); |
| functionConfig.setName(args[2]); |
| } |
| } |
| |
| } |