| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker; |
| |
| import static org.apache.commons.lang3.StringUtils.isBlank; |
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import lombok.AccessLevel; |
| import lombok.Data; |
| import lombok.Getter; |
| import lombok.ToString; |
| import lombok.experimental.Accessors; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; |
| import org.apache.pulsar.common.configuration.Category; |
| import org.apache.pulsar.common.configuration.FieldContext; |
| import org.apache.pulsar.common.configuration.PulsarConfiguration; |
| import org.apache.pulsar.common.functions.Resources; |
| import org.apache.pulsar.common.nar.NarClassLoader; |
| import org.apache.pulsar.common.sasl.SaslConstants; |
| import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider; |
| import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl; |
| import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory; |
| import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig; |
| import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig; |
| import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; |
| |
| @Data |
| @Accessors(chain = true) |
| @JsonIgnoreProperties(ignoreUnknown = true) |
| public class WorkerConfig implements Serializable, PulsarConfiguration { |
| |
| private static final long serialVersionUID = 1L; |
| |
| @Category |
| private static final String CATEGORY_WORKER = "Worker Settings"; |
| @Category |
| private static final String CATEGORY_FUNC_PKG = "Function Package Management"; |
| @Category |
| private static final String CATEGORY_FUNC_METADATA_MNG = "Function Metadata Management"; |
| @Category |
| private static final String CATEGORY_FUNC_RUNTIME_MNG = "Function Runtime Management"; |
| @Category |
| private static final String CATEGORY_FUNC_SCHEDULE_MNG = "Function Scheduling Management"; |
| @Category |
| private static final String CATEGORY_SECURITY = "Common Security Settings (applied for both worker and client)"; |
| @Category |
| private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS"; |
| @Category |
| private static final String CATEGORY_WORKER_SECURITY = "Worker Security Settings"; |
| @Category |
| private static final String CATEGORY_CLIENT_SECURITY = "Security settings for clients talking to brokers"; |
| @Category |
| private static final String CATEGORY_STATE = "State Management"; |
| @Category |
| private static final String CATEGORY_CONNECTORS = "Connectors"; |
| @Category |
| private static final String CATEGORY_FUNCTIONS = "Functions"; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Id to identify a worker instance" |
| ) |
| private String workerId; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Hostname of the worker instance" |
| ) |
| private String workerHostname; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "The port for serving worker http requests. Set to null to disable serving on the http port." |
| ) |
| private Integer workerPort; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "The port for serving worker https requests" |
| ) |
| private Integer workerPortTls; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true." |
| + "'authenticationEnabled' must also be set for this to take effect." |
| ) |
| private boolean authenticateMetricsEndpoint = true; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Whether the '/metrics' endpoint should return default prometheus metrics. Defaults to false." |
| ) |
| private boolean includeStandardPrometheusMetrics = false; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics") |
| private String jvmGCMetricsLoggerClassName; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Number of threads to use for HTTP requests processing" |
| ) |
| private int numHttpServerThreads = 8; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Enable the enforcement of limits on the incoming HTTP requests" |
| ) |
| private boolean httpRequestsLimitEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Max HTTP requests per seconds allowed. The excess " |
| + "of requests will be rejected with HTTP code 429 (Too many requests)" |
| ) |
| private double httpRequestsMaxPerSecond = 100.0; |
| |
| @FieldContext(category = CATEGORY_WORKER, doc = "Max concurrent web requests") |
| private int maxConcurrentHttpRequests = 1024; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Capacity for thread pool queue in the HTTP server" |
| + " Default is set to 8192." |
| ) |
| private int httpServerThreadPoolQueueSize = 8192; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Capacity for accept queue in the HTTP server" |
| + " Default is set to 8192." |
| ) |
| private int httpServerAcceptQueueSize = 8192; |
| |
| @FieldContext(category = CATEGORY_WORKER, doc = "Maximum number of inbound http connections. " |
| + "(0 to disable limiting)") |
| private int maxHttpServerConnections = 2048; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| required = false, |
| deprecated = true, |
| doc = "Configuration store connection string (as a comma-separated list). Deprecated in favor of " |
| + "`configurationMetadataStoreUrl`" |
| ) |
| @Deprecated |
| private String configurationStoreServers; |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| required = false, |
| doc = "Configuration store connection string (as a comma-separated list)" |
| ) |
| private String configurationMetadataStoreUrl; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Metadata store session timeout in milliseconds." |
| ) |
| private long metadataStoreSessionTimeoutMillis = 30_000; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Metadata store operation timeout in seconds." |
| ) |
| private int metadataStoreOperationTimeoutSeconds = 30; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Metadata store cache expiry time in seconds." |
| ) |
| private int metadataStoreCacheExpirySeconds = 300; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Is metadata store read-only operations." |
| ) |
| private boolean metadataStoreAllowReadOnlyOperations; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| deprecated = true, |
| doc = "ZooKeeper session timeout in milliseconds. " |
| + "@deprecated - Use metadataStoreSessionTimeoutMillis instead." |
| ) |
| private long zooKeeperSessionTimeoutMillis = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| deprecated = true, |
| doc = "ZooKeeper operation timeout in seconds. " |
| + "@deprecated - Use metadataStoreOperationTimeoutSeconds instead." |
| ) |
| private int zooKeeperOperationTimeoutSeconds = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| deprecated = true, |
| doc = "ZooKeeper cache expiry time in seconds. " |
| + "@deprecated - Use metadataStoreCacheExpirySeconds instead." |
| ) |
| private int zooKeeperCacheExpirySeconds = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| deprecated = true, |
| doc = "Is zooKeeper allow read-only operations." |
| ) |
| private boolean zooKeeperAllowReadOnlyOperations; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Specifies if the function worker should use classloading for validating submissions for built-in " |
| + "connectors and functions. This is required for validateConnectorConfig to take effect. " |
| + "Default is false." |
| ) |
| private Boolean enableClassloadingOfBuiltinFiles = false; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Specifies if the function worker should use classloading for validating submissions for external " |
| + "connectors and functions. This is required for validateConnectorConfig to take effect. " |
| + "Default is false." |
| ) |
| private Boolean enableClassloadingOfExternalFiles = false; |
| |
| @FieldContext( |
| category = CATEGORY_CONNECTORS, |
| doc = "The path to the location to locate builtin connectors" |
| ) |
| private String connectorsDirectory = "./connectors"; |
| @FieldContext( |
| category = CATEGORY_CONNECTORS, |
| doc = "The directory where nar packages are extractors" |
| ) |
| private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; |
| @FieldContext( |
| category = CATEGORY_CONNECTORS, |
| doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) " |
| + "creation. Default is true." |
| ) |
| private Boolean enableReferencingConnectorDirectoryFiles = true; |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https " |
| + "urls." |
| ) |
| private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>(); |
| @FieldContext( |
| category = CATEGORY_CONNECTORS, |
| doc = "Enables extended validation for connector config with fine-grain annotation based validation " |
| + "during submission. Classloading with either enableClassloadingOfExternalFiles or " |
| + "enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect. " |
| + "Default is false." |
| ) |
| private Boolean validateConnectorConfig = false; |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Should the builtin sources/sinks/functions be uploaded for the externally managed runtimes?" |
| ) |
| private Boolean uploadBuiltinSinksSources = true; |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "The path to the location to locate builtin functions" |
| ) |
| private String functionsDirectory = "./functions"; |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Whether to enable referencing functions directory files by file url in functions creation. " |
| + "Default is true." |
| ) |
| private Boolean enableReferencingFunctionsDirectoryFiles = true; |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https " |
| + "urls." |
| ) |
| private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>(); |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar topic used for storing function metadata" |
| ) |
| private String functionMetadataTopicName; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "Should the metadata topic be compacted?" |
| ) |
| private Boolean useCompactedMetadataTopic = false; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The web service URL for function workers" |
| ) |
| private String functionWebServiceUrl; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar binary service URL that function metadata manager talks to" |
| ) |
| private String pulsarServiceUrl; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar web service URL that function metadata manager talks to" |
| ) |
| private String pulsarWebServiceUrl; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar topic used for cluster coordination" |
| ) |
| private String clusterCoordinationTopicName; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar namespace for storing metadata topics" |
| ) |
| private String pulsarFunctionsNamespace; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "The Pulsar cluster name. Used for creating Pulsar namespace during worker initialization" |
| ) |
| private String pulsarFunctionsCluster; |
| @FieldContext( |
| category = CATEGORY_FUNC_PKG, |
| doc = "The number of replicas for storing functions" |
| ) |
| private int numFunctionPackageReplicas; |
| @FieldContext( |
| category = CATEGORY_FUNC_PKG, |
| doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service." |
| ) |
| private boolean functionsWorkerEnablePackageManagement = false; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The directory to download functions by runtime manager" |
| ) |
| private String downloadDirectory; |
| @FieldContext( |
| category = CATEGORY_STATE, |
| doc = "The service URL of state storage" |
| ) |
| private String stateStorageServiceUrl; |
| |
| @FieldContext( |
| category = CATEGORY_STATE, |
| doc = "The implementation class for the state store" |
| ) |
| private String stateStorageProviderImplementation = BKStateStoreProviderImpl.class.getName(); |
| |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The Pulsar topic used for storing function assignment informations" |
| ) |
| private String functionAssignmentTopicName; |
| @FieldContext( |
| category = CATEGORY_FUNC_SCHEDULE_MNG, |
| doc = "The scheduler class used by assigning functions to workers" |
| ) |
| private String schedulerClassName; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The frequency of failure checks, in milliseconds" |
| ) |
| private long failureCheckFreqMs; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The reschedule timeout of function assignment, in milliseconds" |
| ) |
| private long rescheduleTimeoutMs; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The frequency to check whether the cluster needs rebalancing" |
| ) |
| private long rebalanceCheckFreqSec; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Interval to probe for changes in list of workers, in seconds" |
| ) |
| private int workerListProbeIntervalSec = 60; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The max number of retries for initial broker reconnects when function metadata manager" |
| + " tries to create producer on metadata topics" |
| ) |
| private int initialBrokerReconnectMaxRetries; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The max number of retries for writing assignment to assignment topic" |
| ) |
| private int assignmentWriteMaxRetries; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The frequency of instance liveness check, in milliseconds" |
| ) |
| private long instanceLivenessCheckFreqMs; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "Whether to enable the broker client authentication used by function workers to talk to brokers" |
| ) |
| private Boolean brokerClientAuthenticationEnabled = null; |
| public boolean isBrokerClientAuthenticationEnabled() { |
| if (brokerClientAuthenticationEnabled != null) { |
| return brokerClientAuthenticationEnabled; |
| } else { |
| return authenticationEnabled; |
| } |
| } |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "The authentication plugin used by function workers to talk to brokers" |
| ) |
| private String brokerClientAuthenticationPlugin; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "The parameters of the authentication plugin used by function workers to talk to brokers" |
| ) |
| private String brokerClientAuthenticationParameters; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "Authentication plugin to use when connecting to bookies" |
| ) |
| private String bookkeeperClientAuthenticationPlugin; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "BookKeeper auth plugin implementation specifics parameters name and values" |
| ) |
| private String bookkeeperClientAuthenticationParametersName; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "Parameters for bookkeeper auth plugin" |
| ) |
| private String bookkeeperClientAuthenticationParameters; |
| @FieldContext( |
| category = CATEGORY_FUNC_METADATA_MNG, |
| doc = "Frequency how often worker performs compaction on function-topics, in seconds" |
| ) |
| private long topicCompactionFrequencySec = 30 * 60; // 30 minutes |
| /***** --- TLS. --- ****/ |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Enable TLS" |
| ) |
| @Deprecated |
| private boolean tlsEnabled = false; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Path for the TLS certificate file" |
| ) |
| private String tlsCertificateFilePath; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Path for the TLS private key file" |
| ) |
| private String tlsKeyFilePath; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Path for the trusted TLS certificate file" |
| ) |
| private String tlsTrustCertsFilePath = ""; |
| @FieldContext( |
| category = CATEGORY_SECURITY, |
| doc = "Accept untrusted TLS certificate from client" |
| ) |
| private boolean tlsAllowInsecureConnection = false; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Require trusted client cert on connect" |
| ) |
| private boolean tlsRequireTrustedClientCertOnConnect = false; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "Whether to enable TLS when clients connect to broker", |
| deprecated = true |
| ) |
| // TLS for Functions -> Broker |
| // @deprecated use "pulsar+ssl://" in serviceUrl to enable |
| @Deprecated |
| private boolean useTls = false; |
| @FieldContext( |
| category = CATEGORY_SECURITY, |
| doc = "Whether to enable hostname verification on TLS connections" |
| ) |
| private boolean tlsEnableHostnameVerification = false; |
| @FieldContext( |
| category = CATEGORY_SECURITY, |
| doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)" |
| ) |
| private long tlsCertRefreshCheckDurationSec = 300; |
| |
| /**** --- KeyStore TLS config variables. --- ****/ |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Enable TLS with KeyStore type configuration in function worker" |
| ) |
| private boolean tlsEnabledWithKeyStore = false; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Specify the TLS provider for the function worker service: \n" |
| + "When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.\n" |
| + "When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc." |
| ) |
| private String tlsProvider = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore type configuration in function worker: JKS, PKCS12" |
| ) |
| private String tlsKeyStoreType = "JKS"; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore path in function worker" |
| ) |
| private String tlsKeyStore = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore password for function worker" |
| ) |
| @ToString.Exclude |
| private String tlsKeyStorePassword = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore type configuration in function worker: JKS, PKCS12" |
| ) |
| private String tlsTrustStoreType = "JKS"; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore path in function worker" |
| ) |
| private String tlsTrustStore = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore password for function worker, null means empty password." |
| ) |
| @ToString.Exclude |
| private String tlsTrustStorePassword = null; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Specify the tls protocols the proxy's web service will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLSv1.3, TLSv1.2]" |
| ) |
| private Set<String> webServiceTlsProtocols = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Specify the tls cipher the proxy's web service will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]" |
| ) |
| private Set<String> webServiceTlsCiphers = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Enforce authentication" |
| ) |
| private boolean authenticationEnabled = false; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Authentication provider name list, which is a list of class names" |
| ) |
| private Set<String> authenticationProviders = new TreeSet<>(); |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Enforce authorization on accessing functions admin-api" |
| ) |
| private boolean authorizationEnabled = false; |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Authorization provider fully qualified class-name" |
| ) |
| private String authorizationProvider = PulsarAuthorizationProvider.class.getName(); |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Role names that are treated as `super-user`, meaning they will be able to access any admin-api" |
| ) |
| private Set<String> superUserRoles = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Role names that are treated as `proxy roles`. These are the only roles that can supply the " |
| + "originalPrincipal.") |
| private Set<String> proxyRoles = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL." |
| + "\n Default value is: \".*pulsar.*\", so only clients whose id contains 'pulsar' are allowed to" |
| + " connect." |
| ) |
| private String saslJaasClientAllowedIds = SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Service Principal, for login context name. Default value is \"PulsarFunction\"." |
| ) |
| private String saslJaasServerSectionName = SaslConstants.JAAS_DEFAULT_FUNCTION_SECTION_NAME; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "Path to file containing the secret to be used to SaslRoleTokenSigner\n" |
| + "The secret can be specified like:\n" |
| + "saslJaasServerRoleTokenSignerSecretPath=file:///my/saslRoleTokenSignerSecret.key." |
| ) |
| private String saslJaasServerRoleTokenSignerSecretPath; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER_SECURITY, |
| doc = "kerberos kinit command." |
| ) |
| private String kinitCommand = "/usr/bin/kinit"; |
| |
| private Properties properties = new Properties(); |
| |
| public boolean getTlsEnabled() { |
| return tlsEnabled && workerPortTls != null; |
| } |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Whether to initialize distributed log metadata in runtime" |
| ) |
| private Boolean initializedDlogMetadata = false; |
| |
| public Boolean isInitializedDlogMetadata() { |
| if (this.initializedDlogMetadata == null){ |
| return false; |
| } |
| return this.initializedDlogMetadata; |
| }; |
| |
| /******** security settings for Pulsar broker client. **********/ |
| |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "The path to trusted certificates used by the Pulsar client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientTrustCertsFilePath; |
| |
| public String getBrokerClientTrustCertsFilePath() { |
| // for compatible, if user do not define brokerClientTrustCertsFilePath, we will use tlsTrustCertsFilePath, |
| // otherwise we will use brokerClientTrustCertsFilePath |
| if (StringUtils.isNotBlank(brokerClientTrustCertsFilePath)) { |
| return brokerClientTrustCertsFilePath; |
| } else { |
| return tlsTrustCertsFilePath; |
| } |
| } |
| |
| /******** Function Runtime configurations. **********/ |
| |
| |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The classname of the function runtime factory." |
| ) |
| private String functionRuntimeFactoryClassName; |
| |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "A map of configs for function runtime factory." |
| ) |
| private Map<String, Object> functionRuntimeFactoryConfigs; |
| |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The classname of the secrets provider configurator." |
| ) |
| private String secretsProviderConfiguratorClassName; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Any config the secret provider configurator might need. \n\nThis is passed on" |
| + " to the init method of the SecretsProviderConfigurator" |
| ) |
| private Map<String, String> secretsProviderConfiguratorConfig; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "A set of the minimum amount of resources functions must request. " |
| + "Support for this depends on function runtime." |
| ) |
| private Resources functionInstanceMinResources; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "A set of the maximum amount of resources functions may request. " |
| + "Support for this depends on function runtime." |
| ) |
| private Resources functionInstanceMaxResources; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Granularities of requested resources. If the granularity of any type of resource is set," |
| + " the requested resource of the type must be a multiple of the granularity." |
| ) |
| private Resources functionInstanceResourceGranularities; |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "If this configuration is set to be true, the amount of requested resources of all type of resources" |
| + " that have the granularity set must be the same multiples of their granularities." |
| ) |
| private boolean functionInstanceResourceChangeInLockStep = false; |
| |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "The class name of the Function Authentication Provider to use." |
| + " The Function Authentication Provider is responsible to distributing the necessary" |
| + " authentication information to individual functions e.g. user tokens" |
| ) |
| @Getter(AccessLevel.NONE) |
| private String functionAuthProviderClassName; |
| |
| public String getFunctionAuthProviderClassName() { |
| // if we haven't set a value and are running kubernetes, we default to the SecretsTokenAuthProvider |
| // as that matches behavior before this property could be overridden |
| if (!StringUtils.isEmpty(functionAuthProviderClassName)) { |
| return functionAuthProviderClassName; |
| } else { |
| if (StringUtils.equals(this.getFunctionRuntimeFactoryClassName(), KubernetesRuntimeFactory.class.getName()) |
| || getKubernetesContainerFactory() != null) { |
| return KubernetesSecretsTokenAuthProvider.class.getName(); |
| } |
| return null; |
| } |
| } |
| |
| @FieldContext( |
| doc = "The full class-name of an instance of RuntimeCustomizer." |
| + " This class receives the 'customRuntimeOptions string and can customize" |
| + " details of how the runtime operates" |
| ) |
| protected String runtimeCustomizerClassName; |
| |
| @FieldContext( |
| doc = "A map of config passed to the RuntimeCustomizer." |
| + " This config is distinct from the `customRuntimeOptions` provided by functions" |
| + " as this config is the the same across all functions" |
| ) |
| private Map<String, Object> runtimeCustomizerConfig = Collections.emptyMap(); |
| |
| @FieldContext( |
| doc = "Max pending async requests per instance to avoid large number of concurrent requests." |
| + "Only used in AsyncFunction. Default: 1000" |
| ) |
| private int maxPendingAsyncRequests = 1000; |
| |
| @FieldContext( |
| doc = "Whether to forward the source message properties to the output message" |
| ) |
| private boolean forwardSourceMessageProperty = true; |
| |
| @FieldContext( |
| doc = "Additional arguments to pass to the Java command line for Java functions" |
| ) |
| private List<String> additionalJavaRuntimeArguments = new ArrayList<>(); |
| |
| @FieldContext( |
| category = CATEGORY_CONNECTORS, |
| doc = "Whether to ignore unknown properties when deserializing the connector configuration. " |
| + "After upgrading a connector to a new version with a new configuration, " |
| + "the new configuration may not be compatible with the old connector. " |
| + "In case of rollback, it's required to also rollback the connector configuration. " |
| + "Ignoring unknown fields makes possible to keep the new configuration and " |
| + "only rollback the connector." |
| ) |
| private boolean ignoreUnknownConfigFields = false; |
| |
| public String getFunctionMetadataTopic() { |
| return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName); |
| } |
| |
| public String getClusterCoordinationTopic() { |
| return String.format("persistent://%s/%s", pulsarFunctionsNamespace, clusterCoordinationTopicName); |
| } |
| |
| public String getFunctionAssignmentTopic() { |
| return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionAssignmentTopicName); |
| } |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "The nar package for the function worker service" |
| ) |
| private String functionsWorkerServiceNarPackage = ""; |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "The additional configs for the function worker service if functionsWorkerServiceNarPackage provided" |
| ) |
| private Map<String, Object> functionsWorkerServiceCustomConfigs = Collections.emptyMap(); |
| |
| @FieldContext( |
| category = CATEGORY_WORKER, |
| doc = "Enable to expose Pulsar Admin Client from Function Context, default is disabled" |
| ) |
| private boolean exposeAdminClientEnabled = false; |
| |
| public static WorkerConfig load(String yamlFile) throws IOException { |
| if (isBlank(yamlFile)) { |
| return new WorkerConfig(); |
| } |
| ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); |
| return mapper.readValue(new File(yamlFile), WorkerConfig.class); |
| } |
| |
| public String getWorkerId() { |
| if (isBlank(this.workerId)) { |
| this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort() != null |
| ? this.getWorkerPort() : this.getWorkerPortTls()); |
| } |
| return this.workerId; |
| } |
| |
| public String getWorkerHostname() { |
| if (isBlank(this.workerHostname)) { |
| this.workerHostname = unsafeLocalhostResolve(); |
| } |
| return this.workerHostname; |
| } |
| |
| public byte[] getTlsTrustChainBytes() { |
| if (StringUtils.isNotEmpty(getBrokerClientTrustCertsFilePath()) |
| && Files.exists(Paths.get(getBrokerClientTrustCertsFilePath()))) { |
| try { |
| return Files.readAllBytes(Paths.get(getBrokerClientTrustCertsFilePath())); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to read CA bytes", e); |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| public String getWorkerWebAddress() { |
| return String.format("http://%s:%d", this.getWorkerHostname(), this.getWorkerPort()); |
| } |
| |
| public String getWorkerWebAddressTls() { |
| return String.format("https://%s:%d", this.getWorkerHostname(), this.getWorkerPortTls()); |
| } |
| |
| public static String unsafeLocalhostResolve() { |
| try { |
| // Get the fully qualified hostname |
| return InetAddress.getLocalHost().getCanonicalHostName(); |
| } catch (UnknownHostException ex) { |
| throw new IllegalStateException("Failed to resolve localhost name.", ex); |
| } |
| } |
| |
| public String getConfigurationMetadataStoreUrl() { |
| if (StringUtils.isNotBlank(configurationMetadataStoreUrl)) { |
| return configurationMetadataStoreUrl; |
| } else { |
| return configurationStoreServers; |
| } |
| } |
| |
| @Override |
| public void setProperties(Properties properties) { |
| this.properties = properties; |
| } |
| |
| /********* DEPRECATED CONFIGS. *********/ |
| |
| @Deprecated |
| @Data |
| /** |
| * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs |
| * for specifying the function runtime and configs to use |
| */ |
| public static class ThreadContainerFactory extends ThreadRuntimeFactoryConfig { |
| |
| } |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Thread based runtime settings" |
| ) |
| @Deprecated |
| private ThreadContainerFactory threadContainerFactory; |
| |
| @Deprecated |
| @Data |
| /** |
| * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs |
| * for specifying the function runtime and configs to use |
| */ |
| public static class ProcessContainerFactory extends ProcessRuntimeFactoryConfig { |
| |
| } |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Process based runtime settings" |
| ) |
| @Deprecated |
| private ProcessContainerFactory processContainerFactory; |
| |
| @Deprecated |
| @Data |
| /** |
| * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs |
| * for specifying the function runtime and configs to use |
| */ |
| public static class KubernetesContainerFactory extends KubernetesRuntimeFactoryConfig { |
| |
| } |
| @FieldContext( |
| category = CATEGORY_FUNC_RUNTIME_MNG, |
| doc = "Kubernetes based runtime settings" |
| ) |
| @Deprecated |
| private KubernetesContainerFactory kubernetesContainerFactory; |
| |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "The parameters of the authentication plugin used by function workers to talk to brokers" |
| ) |
| @Deprecated |
| private String clientAuthenticationParameters; |
| @FieldContext( |
| category = CATEGORY_CLIENT_SECURITY, |
| doc = "The authentication plugin used by function workers to talk to brokers" |
| ) |
| @Deprecated |
| private String clientAuthenticationPlugin; |
| |
| public String getBrokerClientAuthenticationPlugin() { |
| if (null == brokerClientAuthenticationPlugin) { |
| return clientAuthenticationPlugin; |
| } else { |
| return brokerClientAuthenticationPlugin; |
| } |
| } |
| |
| public String getBrokerClientAuthenticationParameters() { |
| if (null == brokerClientAuthenticationParameters) { |
| return clientAuthenticationParameters; |
| } else { |
| return brokerClientAuthenticationParameters; |
| } |
| } |
| |
| public long getMetadataStoreSessionTimeoutMillis() { |
| return zooKeeperSessionTimeoutMillis > 0 ? zooKeeperSessionTimeoutMillis : metadataStoreSessionTimeoutMillis; |
| } |
| |
| public int getMetadataStoreOperationTimeoutSeconds() { |
| return zooKeeperOperationTimeoutSeconds > 0 ? zooKeeperOperationTimeoutSeconds |
| : metadataStoreOperationTimeoutSeconds; |
| } |
| |
| public int getMetadataStoreCacheExpirySeconds() { |
| return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds; |
| } |
| |
| public boolean isMetadataStoreAllowReadOnlyOperations() { |
| return zooKeeperAllowReadOnlyOperations || metadataStoreAllowReadOnlyOperations; |
| } |
| } |