blob: 7cd718652104e383543fc6f2f4e4ac8c3fb49b50 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.functions.Resources;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
@Data
@Accessors(chain = true)
@JsonIgnoreProperties(ignoreUnknown = true)
public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final long serialVersionUID = 1L;
@Category
private static final String CATEGORY_WORKER = "Worker Settings";
@Category
private static final String CATEGORY_FUNC_PKG = "Function Package Management";
@Category
private static final String CATEGORY_FUNC_METADATA_MNG = "Function Metadata Management";
@Category
private static final String CATEGORY_FUNC_RUNTIME_MNG = "Function Runtime Management";
@Category
private static final String CATEGORY_SECURITY = "Common Security Settings (applied for both worker and client)";
@Category
private static final String CATEGORY_WORKER_SECURITY = "Worker Security Settings";
@Category
private static final String CATEGORY_CLIENT_SECURITY = "Security settings for clients talking to brokers";
@Category
private static final String CATEGORY_STATE = "State Management";
@Category
private static final String CATEGORY_CONNECTORS = "Connectors";
@FieldContext(
category = CATEGORY_WORKER,
doc = "Id to identify a worker instance"
)
private String workerId;
@FieldContext(
category = CATEGORY_WORKER,
doc = "Hostname of the worker instance"
)
private String workerHostname;
@FieldContext(
category = CATEGORY_WORKER,
doc = "The port for serving worker http requests"
)
private Integer workerPort;
@FieldContext(
category = CATEGORY_WORKER,
doc = "The port for serving worker https requests"
)
private Integer workerPortTls;
@FieldContext(
category = CATEGORY_WORKER,
doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics")
private String jvmGCMetricsLoggerClassName;
@FieldContext(
category = CATEGORY_WORKER,
doc = "Number of threads to use for HTTP requests processing"
)
private int numHttpServerThreads = 8;
@FieldContext(
category = CATEGORY_WORKER,
required = false,
doc = "Configuration store connection string (as a comma-separated list)"
)
private String configurationStoreServers;
@FieldContext(
category = CATEGORY_WORKER,
doc = "ZooKeeper session timeout in milliseconds"
)
private long zooKeeperSessionTimeoutMillis = 30000;
@FieldContext(
category = CATEGORY_WORKER,
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_WORKER,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
)
private String connectorsDirectory = "./connectors";
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
)
private String functionMetadataTopicName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The web service url for function workers"
)
private String functionWebServiceUrl;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulser binary service url that function metadata manager talks to"
)
private String pulsarServiceUrl;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar web service url that function metadata manager talks to"
)
private String pulsarWebServiceUrl;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for cluster coordination"
)
private String clusterCoordinationTopicName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar namespace for storing metadata topics"
)
private String pulsarFunctionsNamespace;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar cluster name. Used for creating pulsar namespace during worker initialization"
)
private String pulsarFunctionsCluster;
@FieldContext(
category = CATEGORY_FUNC_PKG,
doc = "The number of replicas for storing functions"
)
private int numFunctionPackageReplicas;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The directory to download functions by runtime manager"
)
private String downloadDirectory;
@FieldContext(
category = CATEGORY_STATE,
doc = "The service url of state storage"
)
private String stateStorageServiceUrl;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function assignment informations"
)
private String functionAssignmentTopicName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The scheduler class used by assigning functions to workers"
)
private String schedulerClassName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The frequency of failure checks, in milliseconds"
)
private long failureCheckFreqMs;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The reschedule timeout of function assignment, in milliseconds"
)
private long rescheduleTimeoutMs;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The max number of retries for initial broker reconnects when function metadata manager"
+ " tries to create producer on metadata topics"
)
private int initialBrokerReconnectMaxRetries;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The max number of retries for writing assignment to assignment topic"
)
private int assignmentWriteMaxRetries;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The frequency of instance liveness check, in milliseconds"
)
private long instanceLivenessCheckFreqMs;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The authentication plugin used by function workers to talk to brokers"
)
private String clientAuthenticationPlugin;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The parameters of the authentication plugin used by function workers to talk to brokers"
)
private String clientAuthenticationParameters;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "Authentication plugin to use when connecting to bookies"
)
private String bookkeeperClientAuthenticationPlugin;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "BookKeeper auth plugin implementatation specifics parameters name and values"
)
private String bookkeeperClientAuthenticationParametersName;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "Parameters for bookkeeper auth plugin"
)
private String bookkeeperClientAuthenticationParameters;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "Frequency how often worker performs compaction on function-topics, in seconds"
)
private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Enable TLS"
)
@Deprecated
private boolean tlsEnabled = false;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Path for the TLS certificate file"
)
private String tlsCertificateFilePath;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Path for the TLS private key file"
)
private String tlsKeyFilePath;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Path for the trusted TLS certificate file"
)
private String tlsTrustCertsFilePath = "";
@FieldContext(
category = CATEGORY_SECURITY,
doc = "Accept untrusted TLS certificate from client"
)
private boolean tlsAllowInsecureConnection = false;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Require trusted client cert on connect"
)
private boolean tlsRequireTrustedClientCertOnConnect = false;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "Whether to enable TLS when clients connect to broker",
deprecated = true
)
// TLS for Functions -> Broker
// @deprecated use "pulsar+ssl://" in serviceUrl to enable
@Deprecated
private boolean useTls = false;
@FieldContext(
category = CATEGORY_SECURITY,
doc = "Whether to enable hostname verification on TLS connections"
)
private boolean tlsHostnameVerificationEnable = false;
@FieldContext(
category = CATEGORY_SECURITY,
doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)"
)
private long tlsCertRefreshCheckDurationSec = 300;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Enforce authentication"
)
private boolean authenticationEnabled = false;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Authentication provider name list, which is a list of class names"
)
private Set<String> authenticationProviders = Sets.newTreeSet();
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Enforce authorization on accessing functions admin-api"
)
private boolean authorizationEnabled = false;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Authorization provider fully qualified class-name"
)
private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Role names that are treated as `super-user`, meaning they will be able to access any admin-api"
)
private Set<String> superUserRoles = Sets.newTreeSet();
private Properties properties = new Properties();
public boolean getTlsEnabled() {
return tlsEnabled || workerPortTls != null;
}
/******** security settings for pulsar broker client **********/
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The path to trusted certificates used by the Pulsar client to authenticate with Pulsar brokers"
)
private String brokerClientTrustCertsFilePath;
/******** Function Runtime configurations **********/
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The classname of the function runtime factory."
)
private String functionRuntimeFactoryClassName;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "A map of configs for function runtime factory."
)
private Map<String, Object> functionRuntimeFactoryConfigs;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The classname of the secrets provider configurator."
)
private String secretsProviderConfiguratorClassName;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Any config the secret provider configurator might need. \n\nThis is passed on"
+ " to the init method of the SecretsProviderConfigurator"
)
private Map<String, String> secretsProviderConfiguratorConfig;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "A set of the minimum amount of resources functions must request. Support for this depends on function runtime."
)
private Resources functionInstanceMinResources;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The class name of the Function Authentication Provider to use." +
" The Function Authentication Provider is responsible to distributing the necessary" +
" authentication information to individual functions e.g. user tokens"
)
@Getter(AccessLevel.NONE) private String functionAuthProviderClassName;
public String getFunctionAuthProviderClassName() {
// if we haven't set a value and are running kubernetes, we default to the SecretsTokenAuthProvider
// as that matches behavior before this property could be overridden
if (!StringUtils.isEmpty(functionAuthProviderClassName)) {
return functionAuthProviderClassName;
} else {
if (StringUtils.equals(this.getFunctionRuntimeFactoryClassName(), KubernetesRuntimeFactory.class.getName()) || getKubernetesContainerFactory() != null) {
return KubernetesSecretsTokenAuthProvider.class.getName();
}
return null;
}
}
@FieldContext(
doc = "The full class-name of an instance of RuntimeCustomizer." +
" This class receives the 'customRuntimeOptions string and can customize" +
" details of how the runtime operates"
)
protected String runtimeCustomizerClassName;
@FieldContext(
doc = "A map of config passed to the RuntimeCustomizer." +
" This config is distinct from the `customRuntimeOptions` provided by functions" +
" as this config is the the same across all functions"
)
private Map<String, Object> runtimeCustomizerConfig = Collections.emptyMap();
@FieldContext(
doc = "Max pending async requests per instance to avoid large number of concurrent requests."
+ "Only used in AsyncFunction. Default: 1000"
)
private int maxPendingAsyncRequests = 1000;
public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
public String getClusterCoordinationTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, clusterCoordinationTopicName);
}
public String getFunctionAssignmentTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionAssignmentTopicName);
}
public static WorkerConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), WorkerConfig.class);
}
public String getWorkerId() {
if (StringUtils.isBlank(this.workerId)) {
this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort());
}
return this.workerId;
}
public String getWorkerHostname() {
if (StringUtils.isBlank(this.workerHostname)) {
this.workerHostname = unsafeLocalhostResolve();
}
return this.workerHostname;
}
public byte[] getTlsTrustChainBytes() {
if (StringUtils.isNotEmpty(getTlsTrustCertsFilePath()) && Files.exists(Paths.get(getTlsTrustCertsFilePath()))) {
try {
return Files.readAllBytes(Paths.get(getTlsTrustCertsFilePath()));
} catch (IOException e) {
throw new IllegalStateException("Failed to read CA bytes", e);
}
} else {
return null;
}
}
public String getWorkerWebAddress() {
return String.format("http://%s:%d", this.getWorkerHostname(), this.getWorkerPort());
}
public static String unsafeLocalhostResolve() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
}
@Override
public void setProperties(Properties properties) {
this.properties = properties;
}
/********* DEPRECATED CONFIGS *********/
@Deprecated
@Data
/**
* @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
* for specifying the function runtime and configs to use
*/
public static class ThreadContainerFactory extends ThreadRuntimeFactoryConfig {
}
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Thread based runtime settings"
)
@Deprecated
private ThreadContainerFactory threadContainerFactory;
@Deprecated
@Data
/**
* @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
* for specifying the function runtime and configs to use
*/
public static class ProcessContainerFactory extends ProcessRuntimeFactoryConfig {
}
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Process based runtime settings"
)
@Deprecated
private ProcessContainerFactory processContainerFactory;
@Deprecated
@Data
/**
* @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
* for specifying the function runtime and configs to use
*/
public static class KubernetesContainerFactory extends KubernetesRuntimeFactoryConfig {
}
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Kubernetes based runtime settings"
)
@Deprecated
private KubernetesContainerFactory kubernetesContainerFactory;
}