| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeUnit; |
| import lombok.AccessLevel; |
| import lombok.Getter; |
| import lombok.Setter; |
| import lombok.ToString; |
| import org.apache.bookkeeper.client.api.DigestType; |
| import org.apache.bookkeeper.util.BookKeeperConstants; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; |
| import org.apache.pulsar.common.configuration.Category; |
| import org.apache.pulsar.common.configuration.FieldContext; |
| import org.apache.pulsar.common.configuration.PulsarConfiguration; |
| import org.apache.pulsar.common.nar.NarClassLoader; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; |
| import org.apache.pulsar.common.policies.data.OffloadedReadPriority; |
| import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; |
| import org.apache.pulsar.common.policies.data.TopicType; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.sasl.SaslConstants; |
| import org.apache.pulsar.common.util.DirectMemoryUtils; |
| import org.apache.pulsar.metadata.api.MetadataStoreFactory; |
| import org.apache.pulsar.metadata.impl.ZKMetadataStore; |
| |
| /** |
| * Pulsar service configuration object. |
| */ |
| @Getter |
| @Setter |
| @ToString |
| public class ServiceConfiguration implements PulsarConfiguration { |
| |
| @Category |
| private static final String CATEGORY_SERVER = "Server"; |
| @Category |
| private static final String CATEGORY_PROTOCOLS = "Protocols"; |
| @Category |
| private static final String CATEGORY_STORAGE_BK = "Storage (BookKeeper)"; |
| @Category |
| private static final String CATEGORY_STORAGE_ML = "Storage (Managed Ledger)"; |
| @Category |
| private static final String CATEGORY_STORAGE_OFFLOADING = "Storage (Ledger Offloading)"; |
| @Category |
| private static final String CATEGORY_POLICIES = "Policies"; |
| @Category |
| private static final String CATEGORY_WEBSOCKET = "WebSocket"; |
| @Category |
| private static final String CATEGORY_SCHEMA = "Schema"; |
| @Category |
| private static final String CATEGORY_METRICS = "Metrics"; |
| @Category |
| private static final String CATEGORY_REPLICATION = "Replication"; |
| @Category |
| private static final String CATEGORY_LOAD_BALANCER = "Load Balancer"; |
| @Category |
| private static final String CATEGORY_FUNCTIONS = "Functions"; |
| @Category |
| private static final String CATEGORY_TLS = "TLS"; |
| @Category |
| private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS"; |
| @Category |
| private static final String CATEGORY_AUTHENTICATION = "Authentication"; |
| @Category |
| private static final String CATEGORY_AUTHORIZATION = "Authorization"; |
| @Category |
| private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider"; |
| @Category |
| private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider"; |
| @Category |
| private static final String CATEGORY_HTTP = "HTTP"; |
| @Category |
| private static final String CATEGORY_TRANSACTION = "Transaction"; |
| @Category |
| private static final String CATEGORY_PACKAGES_MANAGEMENT = "Packages Management"; |
| @Category |
| private static final String CATEGORY_PLUGIN = "Broker Plugin"; |
| |
| private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001; |
| private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0; |
| private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L; |
| |
| /***** --- pulsar configuration. --- ****/ |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = false, |
| deprecated = true, |
| doc = "The Zookeeper quorum connection string (as a comma-separated list). Deprecated in favour of " |
| + "metadataStoreUrl" |
| ) |
| @Getter(AccessLevel.NONE) |
| @Deprecated |
| private String zookeeperServers; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = false, |
| doc = "The metadata store URL. \n" |
| + " Examples: \n" |
| + " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181\n" |
| + " * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not " |
| + "specified)\n" |
| + " * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)\n" |
| ) |
| private String metadataStoreUrl; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = false, |
| deprecated = true, |
| doc = "Global Zookeeper quorum connection string (as a comma-separated list)." |
| + " Deprecated in favor of using `configurationStoreServers`" |
| ) |
| @Getter(AccessLevel.NONE) |
| @Deprecated |
| private String globalZookeeperServers; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = false, |
| deprecated = true, |
| doc = "Configuration store connection string (as a comma-separated list). Deprecated in favor of " |
| + "`configurationMetadataStoreUrl`" |
| ) |
| @Getter(AccessLevel.NONE) |
| @Deprecated |
| private String configurationStoreServers; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = false, |
| doc = "The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl" |
| ) |
| private String configurationMetadataStoreUrl; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The port for serving binary protobuf requests." |
| + " If set, defines a server binding for bindAddress:brokerServicePort." |
| + " The Default value is 6650." |
| ) |
| |
| private Optional<Integer> brokerServicePort = Optional.of(6650); |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The port for serving TLS-secured binary protobuf requests." |
| + " If set, defines a server binding for bindAddress:brokerServicePortTls." |
| ) |
| private Optional<Integer> brokerServicePortTls = Optional.empty(); |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The port for serving http requests" |
| ) |
| private Optional<Integer> webServicePort = Optional.of(8080); |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The port for serving https requests" |
| ) |
| private Optional<Integer> webServicePortTls = Optional.empty(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Specify the TLS provider for the web service: SunJSSE, Conscrypt and etc." |
| ) |
| private String webServiceTlsProvider = "Conscrypt"; |
| |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Specify the tls protocols the proxy's web service will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLSv1.3, TLSv1.2]" |
| ) |
| private Set<String> webServiceTlsProtocols = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Specify the tls cipher the proxy's web service will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]" |
| ) |
| private Set<String> webServiceTlsCiphers = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Hostname or IP address the service binds on" |
| ) |
| private String bindAddress = "0.0.0.0"; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Hostname or IP address the service advertises to the outside world." |
| + " If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used." |
| ) |
| private String advertisedAddress; |
| |
| @FieldContext(category = CATEGORY_SERVER, |
| doc = "Used to specify multiple advertised listeners for the broker." |
| + " The value must format as <listener_name>:pulsar://<host>:<port>," |
| + "multiple listeners should separate with commas." |
| + "Do not use this configuration with advertisedAddress and brokerServicePort." |
| + "The Default value is absent means use advertisedAddress and brokerServicePort.") |
| private String advertisedListeners; |
| |
| @FieldContext(category = CATEGORY_SERVER, |
| doc = "Used to specify the internal listener name for the broker." |
| + "The listener name must contain in the advertisedListeners." |
| + "The Default value is absent, the broker uses the first listener as the internal listener.") |
| private String internalListenerName; |
| |
| @FieldContext(category = CATEGORY_SERVER, |
| doc = "Used to specify additional bind addresses for the broker." |
| + " The value must format as <listener_name>:<scheme>://<host>:<port>," |
| + " multiple bind addresses should be separated with commas." |
| + " Associates each bind address with an advertised listener and protocol handler." |
| + " Note that the brokerServicePort, brokerServicePortTls, webServicePort, and" |
| + " webServicePortTls properties define additional bindings.") |
| private String bindAddresses; |
| |
| @FieldContext(category = CATEGORY_SERVER, |
| doc = "Enable or disable the proxy protocol." |
| + " If true, the real IP addresses of consumers and producers can be obtained" |
| + " when getting topic statistics data.") |
| private boolean haProxyProtocolEnabled; |
| |
| @FieldContext(category = CATEGORY_SERVER, |
| doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https " |
| + "requests. Default is false.") |
| private boolean webServiceHaProxyProtocolEnabled = false; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = |
| "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n" |
| + "Default is false.") |
| private boolean webServiceTrustXForwardedFor = false; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = |
| "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n" |
| + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor " |
| + "is enabled.") |
| private Boolean webServiceLogDetailedAddresses; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of threads to use for Netty Acceptor." |
| + " Default is set to `1`" |
| ) |
| private int numAcceptorThreads = 1; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of threads to use for Netty IO." |
| + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" |
| ) |
| private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of threads to use for orderedExecutor." |
| + " The ordered executor is used to operate with zookeeper, such as init zookeeper client," |
| + " get namespace policies from zookeeper etc. It also used to split bundle. Default is 8" |
| ) |
| private int numOrderedExecutorThreads = 8; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of threads to use for HTTP requests processing" |
| + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" |
| ) |
| // Use at least 8 threads to avoid having Jetty go into threads starving and |
| // having the possibility of getting into a deadlock where a Jetty thread is |
| // waiting for another HTTP call to complete in same thread. |
| private int numHttpServerThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors()); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of threads to use for pulsar broker service." |
| + " The executor in thread pool will do basic broker operation like load/unload bundle," |
| + " update managedLedgerConfig, update topic/subscription/replicator message dispatch rate," |
| + " do leader election etc. Default is set to 20 " |
| ) |
| private int numExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of thread pool size to use for pulsar zookeeper callback service." |
| + "The cache executor thread pool is used for restarting global zookeeper session. " |
| + "Default is 10" |
| ) |
| @Deprecated |
| private int numCacheExecutorThreadPoolSize = 10; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Option to enable busy-wait settings. Default is false. " |
| + "WARNING: This option will enable spin-waiting on executors and IO threads in order " |
| + "to reduce latency during context switches. The spinning will consume 100% CPU even " |
| + "when the broker is not doing any work. It is recommended to reduce the number of IO threads " |
| + "and BK client threads to only have few CPU cores busy." |
| ) |
| private boolean enableBusyWait = false; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests") |
| private int maxConcurrentHttpRequests = 1024; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Capacity for thread pool queue in the HTTP server" |
| + " Default is set to 8192." |
| ) |
| private int httpServerThreadPoolQueueSize = 8192; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Capacity for accept queue in the HTTP server" |
| + " Default is set to 8192." |
| ) |
| private int httpServerAcceptQueueSize = 8192; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. " |
| + "(0 to disable limiting)") |
| private int maxHttpServerConnections = 2048; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = |
| "Gzip compression is enabled by default. Specific paths can be excluded from compression.\n" |
| + "There are 2 syntaxes supported, Servlet url-pattern based, and Regex based.\n" |
| + "If the spec starts with '^' the spec is assumed to be a regex based path spec and will match " |
| + "with normal Java regex rules.\n" |
| + "If the spec starts with '/' then spec is assumed to be a Servlet url-pattern rules path spec " |
| + "for either an exact match or prefix based match.\n" |
| + "If the spec starts with '*.' then spec is assumed to be a Servlet url-pattern rules path spec " |
| + "for a suffix based match.\n" |
| + "All other syntaxes are unsupported.\n" |
| + "Disable all compression with ^.* or ^.*$") |
| private List<String> httpServerGzipCompressionExcludedPaths = new ArrayList<>(); |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.") |
| private boolean delayedDeliveryEnabled = true; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| Class name of the factory that implements the delayed deliver tracker. |
| If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \ |
| will create bucket based delayed message index tracker. |
| """) |
| private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed" |
| + ".InMemoryDelayedDeliveryTrackerFactory"; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, " |
| + "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. " |
| + "Note that this time is used to configure the HashedWheelTimer's tick time.") |
| private long delayedDeliveryTickTimeMillis = 1000; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Whether the deliverAt time is strictly followed. " |
| + "When false (default), messages may be sent to consumers before the deliverAt time by as much " |
| + "as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index " |
| + "for a potentially very short time period. When true, messages will not be sent to consumer until the " |
| + "deliverAt time has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for " |
| + "the topic plus the delayedDeliveryTickTimeMillis.") |
| private boolean isDelayedDeliveryDeliverAtTimeStrict = false; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| The delayed message index bucket min index count. When the index count of the current bucket is more than \ |
| this value and all message indexes of current ledger have already been added to the tracker \ |
| we will seal the bucket.""") |
| private long delayedDeliveryMinIndexCountPerBucket = 50000; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| The delayed message index time step(in seconds) in per bucket snapshot segment, \ |
| after reaching the max time step limitation, the snapshot segment will be cut off.""") |
| private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| The max number of delayed message index in per bucket snapshot segment, -1 means no limitation, \ |
| after reaching the max number limitation, the snapshot segment will be cut off.""") |
| private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| The max number of delayed message index bucket, \ |
| after reaching the max buckets limitation, the adjacent buckets will be merged.\ |
| (disable with value -1)""") |
| private int delayedDeliveryMaxNumBuckets = -1; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use " |
| + "when detecting if all the messages in the topic have a fixed delay for " |
| + "InMemoryDelayedDeliveryTracker (the default DelayedDeliverTracker). " |
| + "Default is 50,000. Setting the lookahead window to 0 will disable the " |
| + "logic to handle fixed delays in messages in a different way.") |
| private long delayedDeliveryFixedDelayDetectionLookahead = 50_000; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = """ |
| The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \ |
| exceeds this max delay, then it will return an error to the producer. \ |
| The default value is 0 which means there is no limit on the max delivery delay.""") |
| private long delayedDeliveryMaxDelayInMillis = 0; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") |
| private boolean acknowledgmentAtBatchIndexLevelEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Enable the WebSocket API service in broker" |
| ) |
| private boolean webSocketServiceEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Flag indicates whether to run broker in standalone mode" |
| ) |
| private boolean isRunningStandalone = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| required = true, |
| doc = "Name of the cluster to which this broker belongs to" |
| ) |
| private String clusterName; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "The maximum number of tenants that each pulsar cluster can create." |
| + "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded." |
| ) |
| private int maxTenants = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Enable cluster's failure-domain which can distribute brokers into logical region" |
| ) |
| private boolean failureDomainsEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Metadata store session timeout in milliseconds." |
| ) |
| private long metadataStoreSessionTimeoutMillis = 30_000; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Metadata store operation timeout in seconds." |
| ) |
| private int metadataStoreOperationTimeoutSeconds = 30; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Metadata store cache expiry time in seconds." |
| ) |
| private int metadataStoreCacheExpirySeconds = 300; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Is metadata store read-only operations." |
| ) |
| private boolean metadataStoreAllowReadOnlyOperations; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| deprecated = true, |
| doc = "ZooKeeper session timeout in milliseconds. " |
| + "@deprecated - Use metadataStoreSessionTimeoutMillis instead." |
| ) |
| private long zooKeeperSessionTimeoutMillis = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| deprecated = true, |
| doc = "ZooKeeper operation timeout in seconds. " |
| + "@deprecated - Use metadataStoreOperationTimeoutSeconds instead." |
| ) |
| private int zooKeeperOperationTimeoutSeconds = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| deprecated = true, |
| doc = "ZooKeeper cache expiry time in seconds. " |
| + "@deprecated - Use metadataStoreCacheExpirySeconds instead." |
| ) |
| private int zooKeeperCacheExpirySeconds = -1; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| deprecated = true, |
| doc = "Is zookeeper allow read-only operations." |
| ) |
| private boolean zooKeeperAllowReadOnlyOperations; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed" |
| ) |
| private long brokerShutdownTimeoutMs = 60000; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Flag to skip broker shutdown when broker handles Out of memory error" |
| ) |
| private boolean skipBrokerShutdownOnOOM = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Amount of seconds to timeout when loading a topic. In situations with many geo-replicated clusters, " |
| + "this may need raised." |
| ) |
| private long topicLoadTimeoutSeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Whether we should enable metadata operations batching" |
| ) |
| private boolean metadataStoreBatchingEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Maximum delay to impose on batching grouping" |
| ) |
| private int metadataStoreBatchingMaxDelayMillis = 5; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Maximum number of operations to include in a singular batch" |
| ) |
| private int metadataStoreBatchingMaxOperations = 1_000; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Maximum size of a batch" |
| ) |
| private int metadataStoreBatchingMaxSizeKb = 128; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Configuration file path for local metadata store. It's supported by RocksdbMetadataStore for now." |
| ) |
| private String metadataStoreConfigPath = null; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Event topic to sync metadata between separate pulsar " |
| + "clusters on different cloud platforms." |
| ) |
| private String metadataSyncEventTopic = null; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Event topic to sync configuration-metadata between separate pulsar " |
| + "clusters on different cloud platforms." |
| ) |
| private String configurationMetadataSyncEventTopic = null; |
| |
| @FieldContext( |
| dynamic = true, |
| doc = "Factory class-name to create topic with custom workflow" |
| ) |
| private String topicFactoryClassName; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached" |
| ) |
| private boolean backlogQuotaCheckEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Whether to enable precise time based backlog quota check. " |
| + "Enabling precise time based backlog quota check will cause broker to read first entry in backlog " |
| + "of the slowest cursor on a ledger which will mostly result in reading entry from BookKeeper's " |
| + "disk which can have negative impact on overall performance. " |
| + "Disabling precise time based backlog quota check will just use the timestamp indicating when a " |
| + "ledger was closed, which is of coarser granularity." |
| ) |
| private boolean preciseTimeBasedBacklogQuotaCheck = false; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| minValue = 1, |
| doc = "How often to check for topics that have reached the quota." |
| + " It only takes effects when `backlogQuotaCheckEnabled` is true" |
| ) |
| private int backlogQuotaCheckIntervalInSeconds = 60; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead." |
| ) |
| private double backlogQuotaDefaultLimitGB = -1; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1." |
| + " Increase it if you want to allow larger msg backlog" |
| ) |
| private long backlogQuotaDefaultLimitBytes = -1; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default per-topic backlog quota limit by time in second, less than 0 means no limitation. " |
| + "default is -1. Increase it if you want to allow larger msg backlog" |
| ) |
| private int backlogQuotaDefaultLimitSecond = -1; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default backlog quota retention policy. Default is producer_request_hold\n\n" |
| + "'producer_request_hold' Policy which holds producer's send request until the" |
| + "resource becomes available (or holding times out)\n" |
| + "'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer\n" |
| + "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog" |
| ) |
| private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy |
| .producer_request_hold; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default ttl for namespaces if ttl is not already configured at namespace policies. " |
| + "(disable default-ttl with value 0)" |
| ) |
| private int ttlDurationDefaultInSeconds = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Enable the deletion of inactive topics.\n" |
| + "If only enable this option, will not clean the metadata of partitioned topic." |
| ) |
| private boolean brokerDeleteInactiveTopicsEnabled = true; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Metadata of inactive partitioned topic will not be automatically cleaned up by default.\n" |
| + "Note: If `allowAutoTopicCreation` and this option are enabled at the same time,\n" |
| + "it may appear that a partitioned topic has just been deleted but is automatically created as a " |
| + "non-partitioned topic." |
| ) |
| private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| minValue = 1, |
| dynamic = true, |
| doc = "How often to check for inactive topics" |
| ) |
| private int brokerDeleteInactiveTopicsFrequencySeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n" |
| + "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active " |
| + "producers\n" |
| + "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no " |
| + "backlogs(caught up) and no active producers/consumers" |
| ) |
| private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode. |
| delete_when_no_subscriptions; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Max duration of topic inactivity in seconds, default is not present\n" |
| + "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used\n" |
| + "Topics that are inactive for longer than this value will be deleted" |
| ) |
| private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Allow forced deletion of tenants. Default is false." |
| ) |
| private boolean forceDeleteTenantAllowed = false; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Allow forced deletion of namespaces. Default is false." |
| ) |
| private boolean forceDeleteNamespaceAllowed = false; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Max pending publish requests per connection to avoid keeping large number of pending " |
| + "requests in memory. Default: 1000" |
| ) |
| private int maxPendingPublishRequestsPerConnection = 1000; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| minValue = 1, |
| doc = "How frequently to proactively check and purge expired messages" |
| ) |
| private int messageExpiryCheckIntervalInMinutes = 5; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed" |
| ) |
| private int activeConsumerFailoverDelayTimeMillis = 1000; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog" |
| ) |
| private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog" |
| ) |
| private long subscriptionBacklogScanMaxEntries = 10_000; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "How long to delete inactive subscriptions from last consuming." |
| + " When it is 0, inactive subscriptions are not deleted automatically" |
| ) |
| private int subscriptionExpirationTimeMinutes = 0; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Enable subscription message redelivery tracker to send redelivery " |
| + "count to consumer (default is enabled)" |
| ) |
| private boolean subscriptionRedeliveryTrackerEnabled = true; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "How frequently to proactively check and purge expired subscription" |
| ) |
| private int subscriptionExpiryCheckIntervalInMinutes = 5; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Enable subscription types (default is all type enabled)" |
| ) |
| private Set<String> subscriptionTypesEnabled = |
| Sets.newHashSet("Exclusive", "Shared", "Failover", "Key_Shared"); |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Enable Key_Shared subscription (default is enabled).\n" |
| + "@deprecated - use subscriptionTypesEnabled instead." |
| ) |
| private boolean subscriptionKeySharedEnable = true; |
| |
| @FieldContext(category = CATEGORY_POLICIES, |
| doc = "On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or " |
| + "consistent hashing to reassign keys to new consumers (default is consistent hashing)") |
| private boolean subscriptionKeySharedUseConsistentHashing = true; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "On KeyShared subscriptions, number of points in the consistent-hashing ring. " |
| + "The higher the number, the more equal the assignment of keys to consumers") |
| private int subscriptionKeySharedConsistentHashingReplicaPoints = 100; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Set the default behavior for message deduplication in the broker.\n\n" |
| + "This can be overridden per-namespace. If enabled, broker will reject" |
| + " messages that were already stored in the topic" |
| ) |
| private boolean brokerDeduplicationEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Maximum number of producer information that it's going to be persisted for deduplication purposes" |
| ) |
| private int brokerDeduplicationMaxNumberOfProducers = 10000; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "How often is the thread pool scheduled to check whether a snapshot needs to be taken." |
| + "(disable with value 0)" |
| ) |
| private int brokerDeduplicationSnapshotFrequencyInSeconds = 120; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "If this time interval is exceeded, a snapshot will be taken." |
| + "It will run simultaneously with `brokerDeduplicationEntriesInterval`" |
| ) |
| private Integer brokerDeduplicationSnapshotIntervalSeconds = 120; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Number of entries after which a dedup info snapshot is taken.\n\n" |
| + "A bigger interval will lead to less snapshots being taken though it would" |
| + " increase the topic recovery time, when the entries published after the" |
| + " snapshot need to be replayed" |
| ) |
| private int brokerDeduplicationEntriesInterval = 1000; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| minValue = 1, |
| doc = "Time of inactivity after which the broker will discard the deduplication information" |
| + " relative to a disconnected producer. Default is 6 hours.") |
| private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "When a namespace is created without specifying the number of bundle, this" |
| + " value will be used as the default") |
| private int defaultNumberOfNamespaceBundles = 4; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "The maximum number of namespaces that each tenant can create." |
| + "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded") |
| private int maxNamespacesPerTenant = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Max number of topics allowed to be created in the namespace. " |
| + "When the topics reach the max topics of the namespace, the broker should reject " |
| + "the new topic request(include topic auto-created by the producer or consumer) until " |
| + "the number of connected consumers decrease. " |
| + " Using a value of 0, is disabling maxTopicsPerNamespace-limit check." |
| ) |
| private int maxTopicsPerNamespace = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "The maximum number of connections in the broker. If it exceeds, new connections are rejected." |
| ) |
| private int brokerMaxConnections = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "The maximum number of connections per IP. If it exceeds, new connections are rejected." |
| ) |
| private int brokerMaxConnectionsPerIp = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'" |
| + " of namespace policy. This is enabled by default." |
| ) |
| private boolean isAllowAutoUpdateSchemaEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Whether to enable the automatic shrink of pendingAcks map, " |
| + "the default is false, which means it is not enabled. " |
| + "When there are a large number of share or key share consumers in the cluster, " |
| + "it can be enabled to reduce the memory consumption caused by pendingAcks.") |
| private boolean autoShrinkForConsumerPendingAcksMap = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Enable check for minimum allowed client library version" |
| ) |
| private boolean clientLibraryVersionCheckEnabled = false; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Path for the file used to determine the rotation status for the broker" |
| + " when responding to service discovery health checks") |
| private String statusFilePath; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Max number of unacknowledged messages allowed to receive messages by a consumer on" |
| + " a shared subscription.\n\n Broker will stop sending messages to consumer once," |
| + " this limit reaches until consumer starts acknowledging messages back and unack count" |
| + " reaches to `maxUnackedMessagesPerConsumer/2`. Using a value of 0, it is disabling " |
| + " unackedMessage-limit check and consumer can receive messages without any restriction") |
| private int maxUnackedMessagesPerConsumer = 50000; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Max number of unacknowledged messages allowed per shared subscription. \n\n" |
| + " Broker will stop dispatching messages to all consumers of the subscription once this " |
| + " limit reaches until consumer starts acknowledging messages back and unack count reaches" |
| + " to `limit/2`. Using a value of 0, is disabling unackedMessage-limit check and dispatcher" |
| + " can dispatch messages without any restriction") |
| private int maxUnackedMessagesPerSubscription = 4 * 50000; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Max number of unacknowledged messages allowed per broker. \n\n" |
| + " Once this limit reaches, broker will stop dispatching messages to all shared subscription " |
| + " which has higher number of unack messages until subscriptions start acknowledging messages " |
| + " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit" |
| + " check and broker doesn't block dispatchers") |
| private int maxUnackedMessagesPerBroker = 0; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher " |
| + " unacked messages than this percentage limit and subscription will not receive any new messages " |
| + " until that subscription acks back `limit/2` messages") |
| private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Maximum size of Consumer metadata") |
| private int maxConsumerMetadataSize = 1024; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Broker periodically checks if subscription is stuck and unblock if flag is enabled. " |
| + "(Default is disabled)" |
| ) |
| private boolean unblockStuckSubscriptionEnabled = false; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Tick time to schedule task that checks topic publish rate limiting across all topics " |
| + "Reducing to lower value can give more accuracy while throttling publish but " |
| + "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)" |
| ) |
| private int topicPublisherThrottlingTickTimeMillis = 10; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable precise rate limit for topic publish" |
| ) |
| private boolean preciseTopicPublishRateLimiterEnable = false; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Tick time to schedule task that checks broker publish rate limiting across all topics " |
| + "Reducing to lower value can give more accuracy while throttling publish but " |
| + "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)" |
| ) |
| private int brokerPublisherThrottlingTickTimeMillis = 50; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Max Rate(in 1 seconds) of Message allowed to publish for a broker " |
| + "when broker publish rate limiting enabled. (Disable message rate limit with value 0)" |
| ) |
| private int brokerPublisherThrottlingMaxMessageRate = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Max Rate(in 1 seconds) of Byte allowed to publish for a broker " |
| + "when broker publish rate limiting enabled. (Disable byte rate limit with value 0)" |
| ) |
| private long brokerPublisherThrottlingMaxByteRate = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Default messages per second dispatch throttling-limit for whole broker. " |
| + "Using a value of 0, is disabling default message-byte dispatch-throttling" |
| ) |
| private int dispatchThrottlingRateInMsg = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Default bytes per second dispatch throttling-limit for whole broker. " |
| + "Using a value of 0, is disabling default message-byte dispatch-throttling" |
| ) |
| private long dispatchThrottlingRateInByte = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Max Rate(in 1 seconds) of Message allowed to publish for a topic " |
| + "when topic publish rate limiting enabled. (Disable byte rate limit with value 0)" |
| ) |
| private int maxPublishRatePerTopicInMessages = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Max Rate(in 1 seconds) of Byte allowed to publish for a topic " |
| + "when topic publish rate limiting enabled. (Disable byte rate limit with value 0)" |
| ) |
| private long maxPublishRatePerTopicInBytes = 0; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| dynamic = true, |
| doc = "Too many subscribe requests from a consumer can cause broker rewinding consumer cursors " |
| + " and loading data from bookies, hence causing high network bandwidth usage When the positive" |
| + " value is set, broker will throttle the subscribe requests for one consumer. Otherwise, the" |
| + " throttling will be disabled. The default value of this setting is 0 - throttling is disabled.") |
| private int subscribeThrottlingRatePerConsumer = 0; |
| @FieldContext( |
| minValue = 1, |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s." |
| ) |
| private int subscribeRatePeriodPerConsumerInSecond = 30; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message dispatching throttling-limit for every topic. \n\n" |
| + "Using a value of 0, is disabling default message dispatch-throttling") |
| private int dispatchThrottlingRatePerTopicInMsg = 0; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n" |
| + "Using a value of 0, is disabling default message-byte dispatch-throttling") |
| private long dispatchThrottlingRatePerTopicInByte = 0; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Apply dispatch rate limiting on batch message instead individual " |
| + "messages with in batch message. (Default is disabled)") |
| private boolean dispatchThrottlingOnBatchMessageEnabled = false; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message dispatching throttling-limit for a subscription. \n\n" |
| + "Using a value of 0, is disabling default message dispatch-throttling.") |
| private int dispatchThrottlingRatePerSubscriptionInMsg = 0; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n" |
| + "Using a value of 0, is disabling default message-byte dispatch-throttling.") |
| private long dispatchThrottlingRatePerSubscriptionInByte = 0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message dispatching throttling-limit for every replicator in replication. \n\n" |
| + "Using a value of 0, is disabling replication message dispatch-throttling") |
| private int dispatchThrottlingRatePerReplicatorInMsg = 0; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n" |
| + "Using a value of 0, is disabling replication message-byte dispatch-throttling") |
| private long dispatchThrottlingRatePerReplicatorInByte = 0; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Dispatch rate-limiting relative to publish rate. (Enabling flag will make broker to dynamically " |
| + "update dispatch-rate relatively to publish-rate: " |
| + "throttle-dispatch-rate = (publish-rate + configured dispatch-rate) ") |
| private boolean dispatchThrottlingRateRelativeToPublishRate = false; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default dispatch-throttling is disabled for consumers which already caught-up with" |
| + " published messages and don't have backlog. This enables dispatch-throttling for " |
| + " non-backlog consumers as well.") |
| private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default policy for publishing usage reports to system topic is disabled." |
| + "This enables publishing of usage reports" |
| ) |
| private String resourceUsageTransportClassName = ""; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_POLICIES, |
| doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled." |
| ) |
| private int resourceUsageTransportPublishIntervalInSecs = 60; |
| |
| @FieldContext( |
| dynamic = false, |
| category = CATEGORY_POLICIES, |
| doc = "Enables evaluating subscription pattern on broker side." |
| ) |
| private boolean enableBrokerSideSubscriptionPatternEvaluation = true; |
| |
| @FieldContext( |
| dynamic = false, |
| category = CATEGORY_POLICIES, |
| doc = "Max length of subscription pattern" |
| ) |
| private int subscriptionPatternMaxLength = 50; |
| |
| // <-- dispatcher read settings --> |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Max number of entries to read from bookkeeper. By default it is 100 entries." |
| ) |
| private int dispatcherMaxReadBatchSize = 100; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Dispatch messages and execute broker side filters in a per-subscription thread" |
| ) |
| private boolean dispatcherDispatchMessagesInSubscriptionThread = true; |
| |
| @FieldContext( |
| dynamic = false, |
| category = CATEGORY_SERVER, |
| doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, " |
| + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and " |
| + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards " |
| + "each relevant rate limit." |
| ) |
| private boolean dispatchThrottlingForFilteredEntriesEnabled = false; |
| |
| // <-- dispatcher read settings --> |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Max size in bytes of entries to read from bookkeeper. By default it is 5MB." |
| ) |
| private int dispatcherMaxReadSizeBytes = 5 * 1024 * 1024; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Min number of entries to read from bookkeeper. By default it is 1 entries." |
| + "When there is an error occurred on reading entries from bookkeeper, the broker" |
| + " will backoff the batch size to this minimum number." |
| ) |
| private int dispatcherMinReadBatchSize = 1; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "The read failure backoff initial time in milliseconds. By default it is 15s." |
| ) |
| private int dispatcherReadFailureBackoffInitialTimeInMs = 15000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "The read failure backoff max time in milliseconds. By default it is 60s." |
| ) |
| private int dispatcherReadFailureBackoffMaxTimeInMs = 60000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "The read failure backoff mandatory stop time in milliseconds. By default it is 0s." |
| ) |
| private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Time in milliseconds to delay the new delivery of a message when an EntryFilter returns RESCHEDULE." |
| ) |
| private int dispatcherEntryFilterRescheduledMessageDelay = 1000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Max number of entries to dispatch for a shared subscription. By default it is 20 entries." |
| ) |
| private int dispatcherMaxRoundRobinBatchSize = 20; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Precise dispatcher flow control according to history message number of each entry" |
| ) |
| private boolean preciseDispatcherFlowControl = false; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = " Class name of pluggable entry filter that decides whether the entry needs to be filtered." |
| + "You can use this class to decide which entries can be sent to consumers." |
| + "Multiple names need to be separated by commas." |
| ) |
| private List<String> entryFilterNames = new ArrayList<>(); |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = " The directory for all the entry filter implementations." |
| ) |
| private String entryFiltersDirectory = ""; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Whether allow topic level entry filters policies overrides broker configuration." |
| ) |
| private boolean allowOverrideEntryFilters = false; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic") |
| private int maxConcurrentLookupRequest = 50000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "Max number of concurrent topic loading request broker allows to control number of zk-operations" |
| ) |
| private int maxConcurrentTopicLoadRequest = 5000; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max concurrent non-persistent message can be processed per connection") |
| private int maxConcurrentNonPersistentMessagePerConnection = 1000; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| deprecated = true, |
| doc = "Number of worker threads to serve non-persistent topic.\n" |
| + "@deprecated - use topicOrderedExecutorThreadNum instead.") |
| private int numWorkerThreadsForNonPersistentTopic = -1; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Number of worker threads to serve topic ordered executor") |
| private int topicOrderedExecutorThreadNum = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable broker to load persistent topics" |
| ) |
| private boolean enablePersistentTopics = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable broker to load non-persistent topics" |
| ) |
| private boolean enableNonPersistentTopics = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable to run bookie along with broker" |
| ) |
| private boolean enableRunBookieTogether = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable to run bookie autorecovery along with broker" |
| ) |
| private boolean enableRunBookieAutoRecoveryTogether = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of producers allowed to connect to topic. \n\nOnce this limit reaches," |
| + " Broker will reject new producers until the number of connected producers decrease." |
| + " Using a value of 0, is disabling maxProducersPerTopic-limit check.") |
| private int maxProducersPerTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of producers with the same IP address allowed to connect to topic." |
| + " \n\nOnce this limit reaches, Broker will reject new producers until the number of" |
| + " connected producers with the same IP address decrease." |
| + " Using a value of 0, is disabling maxSameAddressProducersPerTopic-limit check.") |
| private int maxSameAddressProducersPerTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enforce producer to publish encrypted messages.(default disable).") |
| private boolean encryptionRequireOnProducer = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of consumers allowed to connect to topic. \n\nOnce this limit reaches," |
| + " Broker will reject new consumers until the number of connected consumers decrease." |
| + " Using a value of 0, is disabling maxConsumersPerTopic-limit check.") |
| private int maxConsumersPerTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of consumers with the same IP address allowed to connect to topic." |
| + " \n\nOnce this limit reaches, Broker will reject new consumers until the number of" |
| + " connected consumers with the same IP address decrease." |
| + " Using a value of 0, is disabling maxSameAddressConsumersPerTopic-limit check.") |
| private int maxSameAddressConsumersPerTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of subscriptions allowed to subscribe to topic. \n\nOnce this limit reaches, " |
| + " broker will reject new subscription until the number of subscribed subscriptions decrease.\n" |
| + " Using a value of 0, is disabling maxSubscriptionsPerTopic limit check." |
| ) |
| private int maxSubscriptionsPerTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches," |
| + " Broker will reject new consumers until the number of connected consumers decrease." |
| + " Using a value of 0, is disabling maxConsumersPerSubscription-limit check.") |
| private int maxConsumersPerSubscription = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max size of messages.", |
| maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING) |
| private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable tracking of replicated subscriptions state across clusters.") |
| private boolean enableReplicatedSubscriptions = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Frequency of snapshots for replicated subscriptions tracking.") |
| private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ") |
| private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Max number of snapshot to be cached per subscription.") |
| private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Max memory size for broker handling messages sending from producers.\n\n" |
| + " If the processing message size exceed this value, broker will stop read data" |
| + " from the connection. The processing messages means messages are sends to broker" |
| + " but broker have not send response to client, usually waiting to write to bookies.\n\n" |
| + " It's shared across all the topics running in the same broker.\n\n" |
| + " Use -1 to disable the memory limitation. Default is 1/2 of direct memory.\n\n") |
| private int maxMessagePublishBufferSizeInMB = Math.max(64, |
| (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 2 / (1024 * 1024))); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Interval between checks to see if message publish buffer size is exceed the max message publish " |
| + "buffer size" |
| ) |
| private int messagePublishBufferCheckIntervalInMillis = 100; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Whether to recover cursors lazily when trying to recover a " |
| + "managed ledger backing a persistent topic. It can improve write availability of topics.\n" |
| + "The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark " |
| + "delete position can be recovered or not.") |
| private boolean lazyCursorRecovery = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Check between intervals to see if consumed ledgers need to be trimmed" |
| ) |
| private int retentionCheckIntervalInSeconds = 120; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "The number of partitions per partitioned topic.\n" |
| + "If try to create or update partitioned topics by exceeded number of partitions, then fail.\n" |
| + "Use 0 or negative number to disable the check." |
| ) |
| private int maxNumPartitionsPerPartitionedTopic = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The directory to locate broker interceptors" |
| ) |
| private String brokerInterceptorsDirectory = "./interceptors"; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "List of broker interceptor to load, which is a list of broker interceptor names" |
| ) |
| private Set<String> brokerInterceptors = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "List of interceptors for payload processing.") |
| private Set<String> brokerEntryPayloadProcessors = new LinkedHashSet<>(); |
| |
| @FieldContext( |
| doc = "There are two policies to apply when broker metadata session expires: session expired happens, " |
| + "\"shutdown\" or \"reconnect\". \n\n" |
| + " With \"shutdown\", the broker will be restarted.\n\n" |
| + " With \"reconnect\", the broker will keep serving the topics, while attempting to recreate a new session." |
| ) |
| private MetadataSessionExpiredPolicy zookeeperSessionExpiredPolicy = MetadataSessionExpiredPolicy.reconnect; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If a topic remains fenced for this number of seconds, it will be closed forcefully.\n" |
| + " If it is set to 0 or a negative number, the fenced topic will not be closed." |
| ) |
| private int topicFencingTimeoutSeconds = 0; |
| |
| /**** --- Messaging Protocol. --- ****/ |
| |
| @FieldContext( |
| category = CATEGORY_PROTOCOLS, |
| doc = "The directory to locate messaging protocol handlers" |
| ) |
| private String protocolHandlerDirectory = "./protocols"; |
| |
| @FieldContext( |
| category = CATEGORY_PROTOCOLS, |
| doc = "Use a separate ThreadPool for each Protocol Handler" |
| ) |
| private boolean useSeparateThreadPoolForProtocolHandlers = true; |
| |
| @FieldContext( |
| category = CATEGORY_PROTOCOLS, |
| doc = "List of messaging protocols to load, which is a list of protocol names" |
| ) |
| private Set<String> messagingProtocols = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable or disable system topic.") |
| private boolean systemTopicEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "# Enable strict topic name check. Which includes two parts as follows:\n" |
| + "# 1. Mark `-partition-` as a keyword.\n" |
| + "# E.g.\n" |
| + " Create a non-partitioned topic.\n" |
| + " No corresponding partitioned topic\n" |
| + " - persistent://public/default/local-name (passed)\n" |
| + " - persistent://public/default/local-name-partition-z (rejected by keyword)\n" |
| + " - persistent://public/default/local-name-partition-0 (rejected by keyword)\n" |
| + " Has corresponding partitioned topic, partitions=2 and topic partition name " |
| + "is persistent://public/default/local-name\n" |
| + " - persistent://public/default/local-name-partition-0 (passed," |
| + " Because it is the partition topic's sub-partition)\n" |
| + " - persistent://public/default/local-name-partition-z (rejected by keyword)\n" |
| + " - persistent://public/default/local-name-partition-4 (rejected," |
| + " Because it exceeds the number of maximum partitions)\n" |
| + " Create a partitioned topic(topic metadata)\n" |
| + " - persistent://public/default/local-name (passed)\n" |
| + " - persistent://public/default/local-name-partition-z (rejected by keyword)\n" |
| + " - persistent://public/default/local-name-partition-0 (rejected by keyword)\n" |
| + "# 2. Allowed alphanumeric (a-zA-Z_0-9) and these special chars -=:. for topic name.\n" |
| + "# NOTE: This flag will be removed in some major releases in the future.\n") |
| private boolean strictTopicNameEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_SCHEMA, |
| doc = "The schema compatibility strategy to use for system topics" |
| ) |
| private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy = |
| SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " |
| + "please enable the system topic first.") |
| private boolean topicLevelPoliciesEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "List of interceptors for entry metadata.") |
| private Set<String> brokerEntryMetadataInterceptors = new HashSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Enable or disable exposing broker entry metadata to client.") |
| private boolean exposingBrokerEntryMetadataToClientEnabled = false; |
| |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "This config never takes effect and will be removed in the next release" |
| ) |
| private boolean enableNamespaceIsolationUpdateOnTime = false; |
| |
| @FieldContext(category = CATEGORY_SERVER, doc = "Enable or disable strict bookie affinity.") |
| private boolean strictBookieAffinityEnabled = false; |
| |
| /***** --- TLS. --- ****/ |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Enable TLS" |
| ) |
| @Deprecated |
| private boolean tlsEnabled = false; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)" |
| ) |
| private long tlsCertRefreshCheckDurationSec = 300; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Path for the TLS certificate file" |
| ) |
| private String tlsCertificateFilePath; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Path for the TLS private key file" |
| ) |
| private String tlsKeyFilePath; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Path for the trusted TLS certificate file" |
| ) |
| private String tlsTrustCertsFilePath = ""; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Accept untrusted TLS certificate from client" |
| ) |
| private boolean tlsAllowInsecureConnection = false; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Whether the hostname is validated when the broker creates a TLS connection with other brokers" |
| ) |
| private boolean tlsHostnameVerificationEnabled = false; |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Specify the tls protocols the broker will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLSv1.3, TLSv1.2]" |
| ) |
| private Set<String> tlsProtocols = new TreeSet<>(); |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake.\n\n" |
| + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]" |
| ) |
| private Set<String> tlsCiphers = new TreeSet<>(); |
| @FieldContext( |
| category = CATEGORY_TLS, |
| doc = "Specify whether Client certificates are required for TLS Reject.\n" |
| + "the Connection if the Client Certificate is not trusted") |
| private boolean tlsRequireTrustedClientCertOnConnect = false; |
| |
| /***** --- Authentication. --- ****/ |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "Enable authentication" |
| ) |
| private boolean authenticationEnabled = false; |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "Authentication provider name list, which is a list of class names" |
| ) |
| private Set<String> authenticationProviders = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "Interval of time for checking for expired authentication credentials" |
| ) |
| private int authenticationRefreshCheckSeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "Enforce authorization" |
| ) |
| private boolean authorizationEnabled = false; |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "Authorization provider fully qualified class-name" |
| ) |
| private String authorizationProvider = PulsarAuthorizationProvider.class.getName(); |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| dynamic = true, |
| doc = "Role names that are treated as `super-user`, meaning they will be able to" |
| + " do all admin operations and publish/consume from all topics" |
| ) |
| private Set<String> superUserRoles = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "Role names that are treated as `proxy roles`. \n\nIf the broker sees" |
| + " a request with role as proxyRoles - it will demand to see the original" |
| + " client role or certificate.") |
| private Set<String> proxyRoles = new TreeSet<>(); |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "If this flag is set then the broker authenticates the original Auth data" |
| + " else it just accepts the originalPrincipal and authorizes it (if required)") |
| private boolean authenticateOriginalAuthData = false; |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "Allow wildcard matching in authorization\n\n" |
| + "(wildcard matching only applicable if wildcard-char: * presents at first" |
| + " or last position eg: *.pulsar.service, pulsar.service.*)") |
| private boolean authorizationAllowWildcardsMatching = false; |
| |
| @FieldContext( |
| category = CATEGORY_AUTHORIZATION, |
| doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole" |
| ) |
| private String anonymousUserRole = null; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| doc = "If >0, it will reject all HTTP requests with bodies larged than the configured limit" |
| ) |
| private long httpMaxRequestSize = -1; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| doc = """ |
| The maximum size in bytes of the request header. |
| Larger headers will allow for more and/or larger cookies plus larger form content encoded in a URL. |
| However, larger headers consume more memory and can make a server more vulnerable to denial of service |
| attacks. |
| """ |
| ) |
| private int httpMaxRequestHeaderSize = 8 * 1024; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| doc = "If true, the broker will reject all HTTP requests using the TRACE and TRACK verbs.\n" |
| + " This setting may be necessary if the broker is deployed into an environment that uses http port\n" |
| + " scanning and flags web servers allowing the TRACE method as insecure." |
| ) |
| private boolean disableHttpDebugMethods = false; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| doc = "Enable the enforcement of limits on the incoming HTTP requests" |
| ) |
| private boolean httpRequestsLimitEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 " |
| + "(Too many requests)" |
| ) |
| private double httpRequestsMaxPerSecond = 100.0; |
| |
| @FieldContext( |
| category = CATEGORY_HTTP, |
| dynamic = true, |
| doc = "Admin API fail on unknown request parameter in request-body. see PIP-179. Default false." |
| ) |
| private boolean httpRequestsFailOnUnknownPropertiesEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n" |
| + " Default value is: \".*pulsar.*\", so only clients whose id contains 'pulsar' are allowed to connect." |
| ) |
| private String saslJaasClientAllowedIds = SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "Service Principal, for login context name. Default value is \"PulsarBroker\"." |
| ) |
| private String saslJaasServerSectionName = SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "Path to file containing the secret to be used to SaslRoleTokenSigner\n" |
| + "The secret can be specified like:\n" |
| + "saslJaasServerRoleTokenSignerSecretPath=file:///my/saslRoleTokenSignerSecret.key." |
| ) |
| private String saslJaasServerRoleTokenSignerSecretPath; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "kerberos kinit command." |
| ) |
| private String kinitCommand = "/usr/bin/kinit"; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "how often the broker expires the inflight SASL context." |
| ) |
| private long inflightSaslContextExpiryMs = 30_000L; |
| |
| @FieldContext( |
| category = CATEGORY_SASL_AUTH, |
| doc = "Maximum number of inflight sasl context." |
| ) |
| private long maxInflightSaslContext = 50_000L; |
| |
| /**** --- BookKeeper Client. --- ****/ |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Metadata service uri that bookkeeper is used for loading corresponding metadata driver" |
| + " and resolving its metadata service location" |
| ) |
| @Getter(AccessLevel.NONE) |
| private String bookkeeperMetadataServiceUri; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Authentication plugin to use when connecting to bookies" |
| ) |
| private String bookkeeperClientAuthenticationPlugin; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "BookKeeper auth plugin implementation specifics parameters name and values" |
| ) |
| private String bookkeeperClientAuthenticationParametersName; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Parameters for bookkeeper auth plugin" |
| ) |
| @ToString.Exclude |
| private String bookkeeperClientAuthenticationParameters; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Timeout for BK add / read operations" |
| ) |
| private long bookkeeperClientTimeoutInSeconds = 30; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Speculative reads are initiated if a read request doesn't complete within" |
| + " a certain time Using a value of 0, is disabling the speculative reads") |
| private int bookkeeperClientSpeculativeReadTimeoutInMillis = 0; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Number of channels per bookie" |
| ) |
| private int bookkeeperNumberOfChannelsPerBookie = 16; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_BK, |
| doc = "Use older Bookkeeper wire protocol with bookie" |
| ) |
| private boolean bookkeeperUseV2WireProtocol = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Enable bookies health check. \n\n Bookies that have more than the configured" |
| + " number of failure within the interval will be quarantined for some time." |
| + " During this period, new ledgers won't be created on these bookies") |
| private boolean bookkeeperClientHealthCheckEnabled = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Bookies health check interval in seconds" |
| ) |
| private long bookkeeperClientHealthCheckIntervalSeconds = 60; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Bookies health check error threshold per check interval" |
| ) |
| private long bookkeeperClientHealthCheckErrorThresholdPerInterval = 5; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Bookie health check quarantined time in seconds" |
| ) |
| private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "bookie quarantine ratio to avoid all clients quarantine " |
| + "the high pressure bookie servers at the same time" |
| ) |
| private double bookkeeperClientQuarantineRatio = 1.0; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Enable rack-aware bookie selection policy. \n\nBK will chose bookies from" |
| + " different racks when forming a new bookie ensemble") |
| private boolean bookkeeperClientRackawarePolicyEnabled = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Enable region-aware bookie selection policy. \n\nBK will chose bookies from" |
| + " different regions and racks when forming a new bookie ensemble") |
| private boolean bookkeeperClientRegionawarePolicyEnabled = false; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Minimum number of racks per write quorum. \n\nBK rack-aware bookie selection policy will try to" |
| + " get bookies from at least 'bookkeeperClientMinNumRacksPerWriteQuorum' racks for a write quorum.") |
| private int bookkeeperClientMinNumRacksPerWriteQuorum = 2; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Enforces rack-aware bookie selection policy to pick bookies from " |
| + "'bookkeeperClientMinNumRacksPerWriteQuorum' racks for a writeQuorum. \n\n" |
| + "If BK can't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.") |
| private boolean bookkeeperClientEnforceMinNumRacksPerWriteQuorum = false; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Enable/disable reordering read sequence on reading entries") |
| private boolean bookkeeperClientReorderReadSequenceEnabled = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| required = false, |
| doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n" |
| + "Any bookie outside the specified groups will not be used by the broker") |
| private String bookkeeperClientIsolationGroups; |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| required = false, |
| doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough " |
| + "bookie available." |
| ) |
| private String bookkeeperClientSecondaryIsolationGroups; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to periodically check bookie info") |
| private int bookkeeperClientGetBookieInfoIntervalSeconds = 60 * 60 * 24; // defaults to 24 hours |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to retry a failed bookie info lookup") |
| private int bookkeeperClientGetBookieInfoRetryIntervalSeconds = 60; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be " |
| + "sticky to a single bookie.\n" |
| + "If this flag is enabled, the client will use one single bookie (by " |
| + "preference) to read all entries for a ledger.") |
| private boolean bookkeeperEnableStickyReads = true; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the client security provider factory class name. " |
| + "Default: org.apache.bookkeeper.tls.TLSContextFactory") |
| private String bookkeeperTLSProviderFactoryClass = "org.apache.bookkeeper.tls.TLSContextFactory"; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable tls authentication with bookie") |
| private boolean bookkeeperTLSClientAuthentication = false; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Supported type: PEM, JKS, PKCS12. Default value: PEM") |
| private String bookkeeperTLSKeyFileType = "PEM"; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Supported type: PEM, JKS, PKCS12. Default value: PEM") |
| private String bookkeeperTLSTrustCertTypes = "PEM"; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path to file containing keystore password, " |
| + "if the client keystore is password protected.") |
| private String bookkeeperTLSKeyStorePasswordPath; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path to file containing truststore password, " |
| + "if the client truststore is password protected.") |
| private String bookkeeperTLSTrustStorePasswordPath; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path for the TLS private key file") |
| private String bookkeeperTLSKeyFilePath; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path for the TLS certificate file") |
| private String bookkeeperTLSCertificateFilePath; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Path for the trusted TLS certificate file") |
| private String bookkeeperTLSTrustCertsFilePath; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Tls cert refresh duration at bookKeeper-client in seconds (0 " |
| + "to disable check)") |
| private int bookkeeperTlsCertFilesRefreshDurationSeconds = 300; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable disk weight based placement. Default is false") |
| private boolean bookkeeperDiskWeightBasedPlacementEnabled = false; |
| |
| @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit " |
| + "LAC") |
| private int bookkeeperExplicitLacIntervalInMills = 0; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "whether expose managed ledger client stats to prometheus" |
| ) |
| private boolean bookkeeperClientExposeStatsToPrometheus = false; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "whether limit per_channel_bookie_client metrics of bookkeeper client stats" |
| ) |
| private boolean bookkeeperClientLimitStatsLogging = true; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Throttle value for bookkeeper client" |
| ) |
| private int bookkeeperClientThrottleValue = 0; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Number of BookKeeper client worker threads. Default is Runtime.getRuntime().availableProcessors()" |
| ) |
| private int bookkeeperClientNumWorkerThreads = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Number of BookKeeper client IO threads. Default is Runtime.getRuntime().availableProcessors() * 2" |
| ) |
| private int bookkeeperClientNumIoThreads = Runtime.getRuntime().availableProcessors() * 2; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_BK, |
| doc = "Use separated IO threads for BookKeeper client. Default is false, which will use Pulsar IO threads" |
| ) |
| private boolean bookkeeperClientSeparatedIoThreadsEnabled = false; |
| |
| /**** --- Managed Ledger. --- ****/ |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Ensemble (E) size, Number of bookies to use for storing entries in a ledger.\n" |
| + "Please notice that sticky reads enabled by bookkeeperEnableStickyReads=true aren’t used " |
| + " unless ensemble size (E) equals write quorum (Qw) size." |
| ) |
| private int managedLedgerDefaultEnsembleSize = 2; |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Write quorum (Qw) size, Replication factor for storing entries (messages) in a ledger." |
| ) |
| private int managedLedgerDefaultWriteQuorum = 2; |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Ack quorum (Qa) size, Number of guaranteed copies " |
| + "(acks to wait for before a write is considered completed)" |
| ) |
| private int managedLedgerDefaultAckQuorum = 2; |
| |
| @FieldContext(minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds)." |
| + " Default is 60 seconds") |
| private int managedLedgerCursorPositionFlushSeconds = 60; |
| |
| @FieldContext(minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds") |
| private int managedLedgerStatsPeriodSeconds = 60; |
| |
| // |
| // |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Default type of checksum to use when writing to BookKeeper. \n\nDefault is `CRC32C`." |
| + " Other possible options are `CRC32`, `MAC` or `DUMMY` (no checksum)." |
| ) |
| private DigestType managedLedgerDigestType = DigestType.CRC32C; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Default password to use when writing to BookKeeper. \n\nDefault is ``." |
| ) |
| @ToString.Exclude |
| private String managedLedgerPassword = ""; |
| |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of bookies to use when creating a ledger" |
| ) |
| private int managedLedgerMaxEnsembleSize = 5; |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of copies to store for each message" |
| ) |
| private int managedLedgerMaxWriteQuorum = 5; |
| @FieldContext( |
| minValue = 1, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of guaranteed copies (acks to wait before write is complete)" |
| ) |
| private int managedLedgerMaxAckQuorum = 5; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis" |
| + " memory is allocated from JVM direct memory and it's shared across all the topics" |
| + " running in the same broker. By default, uses 1/5th of available direct memory") |
| private int managedLedgerCacheSizeMB = Math.max(64, |
| (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 5 / (1024 * 1024))); |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " |
| + "inserting in cache") |
| private boolean managedLedgerCacheCopyEntries = false; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size for bytes read from storage." |
| + " This is the memory retained by data read from storage (or cache) until it has been delivered to the" |
| + " Consumer Netty channel. Use O to disable") |
| private long managedLedgerMaxReadsInFlightSizeInMB = 0; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "Threshold to which bring down the cache level when eviction is triggered" |
| ) |
| private double managedLedgerCacheEvictionWatermark = 0.9; |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "Configure the cache eviction frequency for the managed ledger cache.") |
| @Deprecated |
| private double managedLedgerCacheEvictionFrequency = 0; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms") |
| private long managedLedgerCacheEvictionIntervalMs = 10; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "All entries that have stayed in cache for more than the configured time, will be evicted") |
| private long managedLedgerCacheEvictionTimeThresholdMillis = 1000; |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'" |
| + " and thus should be set as inactive.") |
| private long managedLedgerCursorBackloggedThreshold = 1000; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Rate limit the amount of writes per second generated by consumer acking the messages" |
| ) |
| private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "Allow automated creation of topics if set to true (default value)." |
| ) |
| private boolean allowAutoTopicCreation = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)" |
| ) |
| private TopicType allowAutoTopicCreationType = TopicType.NON_PARTITIONED; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "Allow automated creation of subscriptions if set to true (default value)." |
| ) |
| private boolean allowAutoSubscriptionCreation = true; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| dynamic = true, |
| doc = "The number of partitioned topics that is allowed to be automatically created" |
| + " if allowAutoTopicCreationType is partitioned." |
| ) |
| private int defaultNumPartitions = 1; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "The class of the managed ledger storage" |
| ) |
| private String managedLedgerStorageClassName = "org.apache.pulsar.broker.ManagedLedgerClientFactory"; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Number of threads to be used for managed ledger scheduled tasks" |
| ) |
| private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n" |
| + "A ledger rollover is triggered after the min rollover time has passed" |
| + " and one of the following conditions is true:" |
| + " the max rollover time has been reached," |
| + " the max entries have been written to the ledger, or" |
| + " the max ledger size has been written to the ledger") |
| private int managedLedgerMaxEntriesPerLedger = 50000; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Minimum time between ledger rollover for a topic" |
| ) |
| private int managedLedgerMinLedgerRolloverTimeMinutes = 10; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Maximum time before forcing a ledger rollover for a topic" |
| ) |
| private int managedLedgerMaxLedgerRolloverTimeMinutes = 240; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Maximum ledger size before triggering a rollover for a topic (MB)" |
| ) |
| private int managedLedgerMaxSizePerLedgerMbytes = 2048; |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "Delay between a ledger being successfully offloaded to long term storage," |
| + " and the ledger being deleted from bookkeeper" |
| ) |
| private long managedLedgerOffloadDeletionLagMs = TimeUnit.HOURS.toMillis(4); |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "The number of bytes before triggering automatic offload to long term storage" |
| ) |
| private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L; |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "The threshold to triggering automatic offload to long term storage" |
| ) |
| private long managedLedgerOffloadThresholdInSeconds = -1L; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of entries to append to a cursor ledger" |
| ) |
| private int managedLedgerCursorMaxEntriesPerLedger = 50000; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max time before triggering a rollover on a cursor ledger" |
| ) |
| private int managedLedgerCursorRolloverTimeInSeconds = 14400; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n" |
| + "When acknowledging out of order, a consumer will leave holes that are supposed" |
| + " to be quickly filled by acking all the messages. The information of which" |
| + " messages are acknowledged is persisted by compressing in `ranges` of messages" |
| + " that were acknowledged. After the max number of ranges is reached, the information" |
| + " will only be tracked in memory and messages will be redelivered in case of" |
| + " crashes.") |
| private int managedLedgerMaxUnackedRangesToPersist = 10000; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" " |
| + "are stored in multiple entries.") |
| private boolean persistentUnackedRangesWithMultipleEntriesEnabled = false; |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| deprecated = true, |
| doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n" |
| + "If number of unack message range is higher than this limit then broker will persist" |
| + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.\n" |
| + "@deprecated - use managedLedgerMaxUnackedRangesToPersistInMetadataStore.") |
| private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = -1; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Max number of `acknowledgment holes` that can be stored in MetadataStore.\n\n" |
| + "If number of unack message range is higher than this limit then broker will persist" |
| + " unacked ranges into bookkeeper to avoid additional data overhead into MetadataStore.") |
| private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000; |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)" |
| ) |
| private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is" |
| + " too large to persist, it will help to reduce the duplicates caused by the ack state that can not be" |
| + " fully persistent. Default false.") |
| private boolean dispatcherPauseOnAckStatePersistentEnabled = false; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n" |
| + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger." |
| ) |
| private boolean autoSkipNonRecoverableData = false; |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "operation timeout while updating managed-ledger metadata." |
| ) |
| private long managedLedgerMetadataOperationsTimeoutSeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Read entries timeout when broker tries to read messages from bookkeeper " |
| + "(0 to disable it)" |
| ) |
| private long managedLedgerReadEntryTimeoutSeconds = 0; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)") |
| private long managedLedgerAddEntryTimeoutSeconds = 0; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Managed ledger prometheus stats latency rollover seconds" |
| ) |
| private int managedLedgerPrometheusStatsLatencyRolloverSeconds = 60; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Whether trace managed ledger task execution time" |
| ) |
| private boolean managedLedgerTraceTaskExecution = true; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "New entries check delay for the cursor under the managed ledger. \n" |
| + "If no new messages in the topic, the cursor will try to check again after the delay time. \n" |
| + "For consumption latency sensitive scenario, can set to a smaller value or set to 0.\n" |
| + "Of course, this may degrade consumption throughput. Default is 10ms.") |
| private int managedLedgerNewEntriesCheckDelayInMillis = 10; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.") |
| private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST |
| .getValue(); |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n" |
| + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.") |
| private String managedLedgerInfoCompressionType = "NONE"; |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "ManagedLedgerInfo compression size threshold (bytes), " |
| + "only compress metadata when origin size more then this value.\n" |
| + "0 means compression will always apply.\n") |
| private long managedLedgerInfoCompressionThresholdInBytes = 16 * 1024; |
| |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n" |
| + "If value is NONE, then save the ManagedCursorInfo bytes data directly.") |
| private String managedCursorInfoCompressionType = "NONE"; |
| |
| |
| @FieldContext(category = CATEGORY_STORAGE_ML, |
| doc = "ManagedCursorInfo compression size threshold (bytes), " |
| + "only compress metadata when origin size more then this value.\n" |
| + "0 means compression will always apply.\n") |
| private long managedCursorInfoCompressionThresholdInBytes = 16 * 1024; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Minimum cursors that must be in backlog state to cache and reuse the read entries." |
| + "(Default =0 to disable backlog reach cache)" |
| ) |
| private int managedLedgerMinimumBacklogCursorsForCaching = 0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Minimum backlog entries for any cursor before start caching reads" |
| ) |
| private int managedLedgerMinimumBacklogEntriesForCaching = 1000; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Maximum backlog entry difference to prevent caching entries that can't be reused" |
| ) |
| private int managedLedgerMaxBacklogBetweenCursorsForCaching = 1000; |
| |
| /*** --- Load balancer. --- ****/ |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Enable load balancer" |
| ) |
| private boolean loadBalancerEnabled = true; |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| deprecated = true, |
| doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by " |
| + "SimpleLoadManagerImpl)" |
| ) |
| private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "load balance load shedding strategy " |
| + "(It requires broker restart if value is changed using dynamic config). " |
| + "Default is ThresholdShedder since 2.10.0" |
| ) |
| private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder"; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "When [current usage < average usage - threshold], " |
| + "the broker with the highest load will be triggered to unload" |
| ) |
| private boolean lowerBoundarySheddingEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "load balance placement strategy" |
| ) |
| private String loadBalancerLoadPlacementStrategy = |
| "org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate"; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Percentage of change to trigger load report update" |
| ) |
| private int loadBalancerReportUpdateThresholdPercentage = 10; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "maximum interval to update load report" |
| ) |
| private int loadBalancerReportUpdateMinIntervalMillis = 5000; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Min delay of load report to collect, in minutes" |
| ) |
| private int loadBalancerReportUpdateMaxIntervalMinutes = 15; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Frequency of report to collect, in minutes" |
| ) |
| private int loadBalancerHostUsageCheckIntervalMinutes = 1; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Enable/disable automatic bundle unloading for load-shedding" |
| ) |
| private boolean loadBalancerSheddingEnabled = true; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Load shedding interval. \n\nBroker periodically checks whether some traffic" |
| + " should be offload from some over-loaded broker to other under-loaded brokers" |
| ) |
| private int loadBalancerSheddingIntervalMinutes = 1; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "enable/disable distribute bundles evenly" |
| ) |
| private boolean loadBalancerDistributeBundlesEvenlyEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Prevent the same topics to be shed and moved to other broker more than" |
| + " once within this timeframe" |
| ) |
| private long loadBalancerSheddingGracePeriodMinutes = 30; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| deprecated = true, |
| doc = "Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)" |
| ) |
| @Deprecated |
| private int loadBalancerBrokerUnderloadedThresholdPercentage = 50; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Usage threshold to allocate max number of topics to broker" |
| ) |
| private int loadBalancerBrokerMaxTopics = 50000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Usage threshold to determine a broker as over-loaded" |
| ) |
| private int loadBalancerBrokerOverloadedThresholdPercentage = 85; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Usage threshold to determine a broker whether to start threshold shedder" |
| ) |
| private int loadBalancerBrokerThresholdShedderPercentage = 10; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Average resource usage difference threshold to determine a broker whether to be a best candidate in " |
| + "LeastResourceUsageWithWeight.(eg: broker1 with 10% resource usage with weight " |
| + "and broker2 with 30% and broker3 with 80% will have 40% average resource usage. " |
| + "The placement strategy can select broker1 and broker2 as best candidates.)" |
| ) |
| private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10; |
| |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In FlowOrQpsEquallyDivideBundleSplitAlgorithm," |
| + " if msgRate >= loadBalancerNamespaceBundleMaxMsgRate * " |
| + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0 " |
| + " or throughput >= loadBalancerNamespaceBundleMaxBandwidthMbytes * " |
| + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0, " |
| + " execute split bundle" |
| ) |
| private int flowOrQpsDifferenceThresholdPercentage = 10; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload." |
| ) |
| private int minUnloadMessage = 1000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload." |
| ) |
| private int minUnloadMessageThroughput = 1 * 1024 * 1024; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the maximum unload ratio." |
| ) |
| private double maxUnloadPercentage = 0.2; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Message-rate percentage threshold between highest and least loaded brokers for " |
| + "uniform load shedding. (eg: broker1 with 50K msgRate and broker2 with 30K msgRate " |
| + "will have 66% msgRate difference and load balancer can unload bundles from broker-1 " |
| + "to broker-2)" |
| ) |
| private double loadBalancerMsgRateDifferenceShedderThreshold = 50; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Message-throughput threshold between highest and least loaded brokers for " |
| + "uniform load shedding. (eg: broker1 with 450MB msgRate and broker2 with 100MB msgRate " |
| + "will have 4.5 times msgThroughout difference and load balancer can unload bundles " |
| + "from broker-1 to broker-2)" |
| ) |
| private double loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold = 4; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "For each uniform balanced unload, the maximum number of bundles that can be unloaded." |
| + " The default value is -1, which means no limit" |
| ) |
| private int maxUnloadBundleNumPerShedding = -1; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Resource history Usage Percentage When adding new resource usage info" |
| ) |
| private double loadBalancerHistoryResourcePercentage = 0.9; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "BandwithIn Resource Usage Weight" |
| ) |
| private double loadBalancerBandwithInResourceWeight = 1.0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "BandwithOut Resource Usage Weight" |
| ) |
| private double loadBalancerBandwithOutResourceWeight = 1.0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "CPU Resource Usage Weight" |
| ) |
| private double loadBalancerCPUResourceWeight = 1.0; |
| |
| @Deprecated(since = "3.0.0") |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Memory Resource Usage Weight. Deprecated: Memory is no longer used as a load balancing item.", |
| deprecated = true |
| ) |
| private double loadBalancerMemoryResourceWeight = 1.0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Direct Memory Resource Usage Weight. Direct memory usage cannot accurately reflect the " |
| + "machine's load, and it is not recommended to use it to score the machine's load." |
| ) |
| private double loadBalancerDirectMemoryResourceWeight = 0; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Bundle unload minimum throughput threshold (MB)" |
| ) |
| private double loadBalancerBundleUnloadMinThroughputThreshold = 10; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Interval to flush dynamic resource quota to ZooKeeper" |
| ) |
| private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15; |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| deprecated = true, |
| doc = "Usage threshold to determine a broker is having just right level of load" |
| + " (only used by SimpleLoadManagerImpl)" |
| ) |
| private int loadBalancerBrokerComfortLoadLevelPercentage = 65; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "enable/disable automatic namespace bundle split" |
| ) |
| private boolean loadBalancerAutoBundleSplitEnabled = true; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "enable/disable automatic unloading of split bundles" |
| ) |
| private boolean loadBalancerAutoUnloadSplitBundlesEnabled = true; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "maximum topics in a bundle, otherwise bundle split will be triggered" |
| ) |
| private int loadBalancerNamespaceBundleMaxTopics = 1000; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered" |
| + "(disable threshold check with value -1)" |
| ) |
| private int loadBalancerNamespaceBundleMaxSessions = 1000; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered" |
| ) |
| private int loadBalancerNamespaceBundleMaxMsgRate = 30000; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered" |
| ) |
| private int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "maximum number of bundles in a namespace" |
| ) |
| private int loadBalancerNamespaceMaximumBundles = 128; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Name of load manager to use" |
| ) |
| private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; |
| |
| @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") |
| private String topicBundleAssignmentStrategy = |
| "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Supported algorithms name for namespace bundle split" |
| ) |
| private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", |
| "topic_count_equally_divide", "specified_positions_divide", "flow_or_qps_equally_divide"); |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Default algorithm name for namespace bundle split" |
| ) |
| private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide"; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Option to override the auto-detected network interfaces max speed" |
| ) |
| private Optional<Double> loadBalancerOverrideBrokerNicSpeedGbps = Optional.empty(); |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Time to wait for the unloading of a namespace bundle" |
| ) |
| private long namespaceBundleUnloadingTimeoutMs = 60000; |
| |
| /**** --- Load Balancer Extension. --- ****/ |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Option to enable the debug mode for the load balancer logics. " |
| + "The debug mode prints more logs to provide more information " |
| + "such as load balance states and decisions. " |
| + "(only used in load balancer extension logics)" |
| ) |
| private boolean loadBalancerDebugModeEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "The target standard deviation of the resource usage across brokers " |
| + "(100% resource usage is 1.0 load). " |
| + "The shedder logic tries to distribute bundle load across brokers to meet this target std. " |
| + "The smaller value will incur load balancing more frequently. " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private double loadBalancerBrokerLoadTargetStd = 0.25; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Threshold to the consecutive count of fulfilled shedding(unload) conditions. " |
| + "If the unload scheduler consecutively finds bundles that meet unload conditions " |
| + "many times bigger than this threshold, the scheduler will shed the bundles. " |
| + "The bigger value will incur less bundle unloading/transfers. " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private int loadBalancerSheddingConditionHitCountThreshold = 3; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Option to enable the bundle transfer mode when distributing bundle loads. " |
| + "On: transfer bundles from overloaded brokers to underloaded " |
| + "-- pre-assigns the destination broker upon unloading). " |
| + "Off: unload bundles from overloaded brokers " |
| + "-- post-assigns the destination broker upon lookups). " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private boolean loadBalancerTransferEnabled = true; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Maximum number of brokers to unload bundle load for each unloading cycle. " |
| + "The bigger value will incur more unloading/transfers for each unloading cycle. " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private int loadBalancerMaxNumberOfBrokerSheddingPerCycle = 3; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Delay (in seconds) to the next unloading cycle after unloading. " |
| + "The logic tries to give enough time for brokers to recompute load after unloading. " |
| + "The bigger value will delay the next unloading cycle longer. " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private long loadBalanceSheddingDelayInSeconds = 180; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Broker load data time to live (TTL in seconds). " |
| + "The logic tries to avoid (possibly unavailable) brokers with out-dated load data, " |
| + "and those brokers will be ignored in the load computation. " |
| + "When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes. " |
| + "The current default value is loadBalancerReportUpdateMaxIntervalMinutes * 120, reflecting " |
| + "twice the duration in seconds. " |
| + "(only used in load balancer extension TransferSheddeer)" |
| ) |
| private long loadBalancerBrokerLoadDataTTLInSeconds = 1800; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Max number of bundles in bundle load report from each broker. " |
| + "The load balancer distributes bundles across brokers, " |
| + "based on topK bundle load data and other broker load data." |
| + "The bigger value will increase the overhead of reporting many bundles in load data. " |
| + "(only used in load balancer extension logics)" |
| ) |
| private int loadBalancerMaxNumberOfBundlesInBundleLoadReport = 10; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Service units'(bundles) split interval. Broker periodically checks whether " |
| + "some service units(e.g. bundles) should split if they become hot-spots. " |
| + "(only used in load balancer extension logics)" |
| ) |
| private int loadBalancerSplitIntervalMinutes = 1; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Max number of bundles to split to per cycle. " |
| + "(only used in load balancer extension logics)" |
| ) |
| private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10; |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Threshold to the consecutive count of fulfilled split conditions. " |
| + "If the split scheduler consecutively finds bundles that meet split conditions " |
| + "many times bigger than this threshold, the scheduler will trigger splits on the bundles " |
| + "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). " |
| + "(only used in load balancer extension logics)" |
| ) |
| private int loadBalancerNamespaceBundleSplitConditionHitCountThreshold = 3; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "After this delay, the service-unit state channel tombstones any service units (e.g., bundles) " |
| + "in semi-terminal states. For example, after splits, parent bundles will be `deleted`, " |
| + "and then after this delay, the parent bundles' state will be `tombstoned` " |
| + "in the service-unit state channel. " |
| + "Pulsar does not immediately remove such semi-terminal states " |
| + "to avoid unnecessary system confusion, " |
| + "as the bundles in the `tombstoned` state might temporarily look available to reassign. " |
| + "Rarely, one could lower this delay in order to aggressively clean " |
| + "the service-unit state channel when there are a large number of bundles. " |
| + "minimum value = 30 secs" |
| + "(only used in load balancer extension logics)" |
| ) |
| private long loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds = 3600; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| dynamic = true, |
| doc = "Option to automatically unload namespace bundles with affinity(isolation) " |
| + "or anti-affinity group policies." |
| + "Such bundles are not ideal targets to auto-unload as destination brokers are limited." |
| + "(only used in load balancer extension logics)" |
| ) |
| private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Time to wait before fixing any stuck in-flight service unit states. " |
| + "The leader monitor fixes any in-flight service unit(bundle) states " |
| + "by reassigning the ownerships if stuck too long, longer than this period." |
| + "(only used in load balancer extension logics)" |
| ) |
| private long loadBalancerInFlightServiceUnitStateWaitingTimeInMillis = 30 * 1000; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Interval between service unit state monitor checks. " |
| + "The service unit(bundle) state channel is periodically monitored" |
| + " by the leader broker at this interval" |
| + " to fix any orphan bundle ownerships, stuck in-flight states, and other cleanup jobs." |
| + "`loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds` * 1000 must be bigger than " |
| + "`loadBalancerInFlightServiceUnitStateWaitingTimeInMillis`." |
| + "(only used in load balancer extension logics)" |
| ) |
| private long loadBalancerServiceUnitStateMonitorIntervalInSeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "Enables the multi-phase unloading of bundles. Set to true, forwards destination broker information " |
| + "to consumers and producers during bundle unload, allowing them to quickly reconnect to the " |
| + "broker without performing an additional topic lookup." |
| ) |
| private boolean loadBalancerMultiPhaseBundleUnload = true; |
| |
| /**** --- Replication. --- ****/ |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| doc = "Enable replication metrics" |
| ) |
| private boolean replicationMetricsEnabled = true; |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| doc = "Max number of connections to open for each broker in a remote cluster.\n\n" |
| + "More connections host-to-host lead to better throughput over high-latency links" |
| ) |
| private int replicationConnectionsPerBroker = 16; |
| @FieldContext( |
| required = false, |
| category = CATEGORY_REPLICATION, |
| doc = "replicator prefix used for replicator producer name and cursor name" |
| ) |
| private String replicatorPrefix = "pulsar.repl"; |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| dynamic = true, |
| doc = "Replicator producer queue size. " |
| + "When dynamically modified, it only takes effect for the newly added replicators" |
| ) |
| private int replicationProducerQueueSize = 1000; |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| doc = "Duration to check replication policy to avoid replicator " |
| + "inconsistency due to missing ZooKeeper watch (disable with value 0)" |
| ) |
| private int replicationPolicyCheckDurationSeconds = 600; |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| deprecated = true, |
| doc = "@deprecated - Use brokerClientTlsEnabled instead." |
| ) |
| private boolean replicationTlsEnabled = false; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default message retention time." |
| + " 0 means retention is disabled. -1 means data is not removed by time quota" |
| ) |
| private int defaultRetentionTimeInMinutes = 0; |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Default retention size." |
| + " 0 means retention is disabled. -1 means data is not removed by size quota" |
| ) |
| private int defaultRetentionSizeInMB = 0; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "How often to check pulsar connection is still alive" |
| ) |
| private int keepAliveIntervalSeconds = 30; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Timeout for connection liveness check used to check liveness of possible consumer or producer " |
| + "duplicates. Helps prevent ProducerFencedException with exclusive producer, " |
| + "ConsumerAssignException with range conflict for Key Shared with sticky hash ranges or " |
| + "ConsumerBusyException in the case of an exclusive consumer. Set to 0 to disable connection " |
| + "liveness check." |
| ) |
| private long connectionLivenessCheckTimeoutMillis = 5000L; |
| @Deprecated |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| deprecated = true, |
| doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one " |
| + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`\n" |
| + "@deprecated - unused." |
| ) |
| private int brokerServicePurgeInactiveFrequencyInSeconds = 60; |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "A comma-separated list of namespaces to bootstrap" |
| ) |
| private List<String> bootstrapNamespaces = new ArrayList<String>(); |
| |
| @ToString.Exclude |
| @com.fasterxml.jackson.annotation.JsonIgnore |
| private Properties properties = new Properties(); |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_SERVER, |
| doc = "If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to " |
| + "use only brokers running the latest software version (to minimize impact to bundles)" |
| ) |
| private boolean preferLaterVersions = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Interval between checks to see if topics with compaction policies need to be compacted" |
| ) |
| private int brokerServiceCompactionMonitorIntervalInSeconds = 60; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The estimated backlog size is greater than this threshold, compression will be triggered.\n" |
| + "Using a value of 0, is disabling compression check." |
| ) |
| private long brokerServiceCompactionThresholdInBytes = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Timeout for each read request in the compaction phase one loop, If the execution time of one " |
| + "single message read operation exceeds this time, the compaction will not proceed." |
| ) |
| private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Whether retain null-key message during topic compaction." |
| ) |
| private boolean topicCompactionRetainNullKey = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Interval between checks to see if cluster is migrated and marks topic migrated " |
| + " if cluster is marked migrated. Disable with value 0. (Default disabled)." |
| ) |
| private int clusterMigrationCheckDurationSeconds = 0; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "Flag to start cluster migration for topic only after creating all topic's resources" |
| + " such as tenant, namespaces, subscriptions at new green cluster. (Default disabled)." |
| ) |
| private boolean clusterMigrationAutoResourceCreation = false; |
| |
| @FieldContext( |
| category = CATEGORY_SCHEMA, |
| doc = "Enforce schema validation on following cases:\n\n" |
| + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n" |
| + " failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.\n" |
| + " if you enable this setting, it will cause non-java clients failed to produce." |
| ) |
| private boolean isSchemaValidationEnforced = false; |
| |
| @FieldContext( |
| category = CATEGORY_SCHEMA, |
| doc = "The schema storage implementation used by this broker" |
| ) |
| private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema" |
| + ".BookkeeperSchemaStorageFactory"; |
| |
| @FieldContext( |
| category = CATEGORY_SCHEMA, |
| doc = "The list compatibility checkers to be used in schema registry" |
| ) |
| private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet( |
| "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck", |
| "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck", |
| "org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck" |
| ); |
| |
| @FieldContext( |
| category = CATEGORY_SCHEMA, |
| doc = "The schema compatibility strategy in broker level" |
| ) |
| private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; |
| |
| /**** --- WebSocket. --- ****/ |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Number of IO threads in Pulsar Client used in WebSocket proxy" |
| ) |
| private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext(category = CATEGORY_WEBSOCKET, |
| doc = "Number of threads used by Websocket service") |
| private int webSocketNumServiceThreads = 20; |
| |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy" |
| ) |
| private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors(); |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Time in milliseconds that idle WebSocket session times out" |
| ) |
| private int webSocketSessionIdleTimeoutMillis = 300000; |
| |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "Interval of time to sending the ping to keep alive in WebSocket proxy. " |
| + "This value greater than 0 means enabled") |
| private int webSocketPingDurationSeconds = -1; |
| |
| @FieldContext( |
| category = CATEGORY_WEBSOCKET, |
| doc = "The maximum size of a text message during parsing in WebSocket proxy." |
| ) |
| private int webSocketMaxTextFrameSize = 1048576; |
| |
| /**** --- Metrics. --- ****/ |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Whether the '/metrics' endpoint requires authentication. Defaults to false." |
| + "'authenticationEnabled' must also be set for this to take effect." |
| ) |
| private boolean authenticateMetricsEndpoint = false; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export topic level metrics otherwise namespace level" |
| ) |
| private boolean exposeTopicLevelMetricsInPrometheus = true; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Set to true to enable the broker to cache the metrics response; the default is false. " |
| + "The caching period is defined by `managedLedgerStatsPeriodSeconds`. " |
| + "The broker returns the same response for subsequent requests within the same period. " |
| + "Ensure that the scrape interval of your monitoring system matches the caching period.") |
| private boolean metricsBufferResponse = false; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export consumer level metrics otherwise namespace level" |
| ) |
| private boolean exposeConsumerLevelMetricsInPrometheus = false; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export producer level metrics otherwise namespace level" |
| ) |
| private boolean exposeProducerLevelMetricsInPrometheus = false; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export managed ledger metrics (aggregated by namespace)" |
| ) |
| private boolean exposeManagedLedgerMetricsInPrometheus = true; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export managed cursor metrics" |
| ) |
| private boolean exposeManagedCursorMetricsInPrometheus = false; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics") |
| private String jvmGCMetricsLoggerClassName; |
| |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Enable expose the precise backlog stats.\n" |
| + " Set false to use published counter and consumed counter to calculate,\n" |
| + " this would be more efficient but may be inaccurate. Default is false." |
| ) |
| private boolean exposePreciseBacklogInPrometheus = false; |
| |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Time in milliseconds that metrics endpoint would time out. Default is 30s.\n" |
| + " Increase it if there are a lot of topics to expose topic-level metrics.\n" |
| + " Set it to 0 to disable timeout." |
| ) |
| private long metricsServletTimeoutMs = 30000; |
| |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Enable expose the backlog size for each subscription when generating stats.\n" |
| + " Locking is used for fetching the status so default to false." |
| ) |
| private boolean exposeSubscriptionBacklogSizeInPrometheus = false; |
| |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Enable splitting topic and partition label in Prometheus.\n" |
| + " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" |
| + " another one is partition index, e.g. (topic=xxx, partition=0).\n" |
| + " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" |
| + " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" |
| + " Default is false." |
| ) |
| private boolean splitTopicAndPartitionLabelInPrometheus = false; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_METRICS, |
| doc = "Enable expose the broker bundles metrics." |
| ) |
| private boolean exposeBundlesMetricsInPrometheus = false; |
| |
| /**** --- Functions. --- ****/ |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Flag indicates enabling or disabling function worker on brokers" |
| ) |
| private boolean functionsWorkerEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "The nar package for the function worker service" |
| ) |
| private String functionsWorkerServiceNarPackage = ""; |
| |
| @FieldContext( |
| category = CATEGORY_FUNCTIONS, |
| doc = "Flag indicates enabling or disabling function worker using unified PackageManagement service." |
| ) |
| private boolean functionsWorkerEnablePackageManagement = false; |
| |
| /**** --- Broker Web Stats. --- ****/ |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, export publisher stats when returning topics stats from the admin rest api" |
| ) |
| private boolean exposePublisherStats = true; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| minValue = 1, |
| doc = "Stats update frequency in seconds" |
| ) |
| private int statsUpdateFrequencyInSecs = 60; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "Stats update initial delay in seconds" |
| ) |
| private int statsUpdateInitialDelayInSecs = 60; |
| @FieldContext( |
| category = CATEGORY_METRICS, |
| doc = "If true, aggregate publisher stats of PartitionedTopicStats by producerName" |
| ) |
| private boolean aggregatePublisherStatsByProducerName = false; |
| |
| /**** --- Ledger Offloading. --- ****/ |
| /**** |
| * NOTES: all implementation related settings should be put in implementation package. |
| * only common settings like driver name, io threads can be added here. |
| ****/ |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "The directory to locate offloaders" |
| ) |
| private String offloadersDirectory = "./offloaders"; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "Driver to use to offload old data to long term storage" |
| ) |
| private String managedLedgerOffloadDriver = null; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "Maximum number of thread pool threads for ledger offloading" |
| ) |
| private int managedLedgerOffloadMaxThreads = 2; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "The directory where nar Extraction of offloaders happens" |
| ) |
| private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_OFFLOADING, |
| doc = "Maximum prefetch rounds for ledger reading for offloading" |
| ) |
| private int managedLedgerOffloadPrefetchRounds = 1; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_STORAGE_ML, |
| doc = "Time to rollover ledger for inactive topic (duration without any publish on that topic). " |
| + "Disable rollover with value 0 (Default value 0)" |
| ) |
| private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0; |
| |
| @FieldContext( |
| category = CATEGORY_STORAGE_ML, |
| doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " |
| + "The default is to evict through readPosition." |
| ) |
| private boolean cacheEvictionByMarkDeletedPosition = false; |
| |
| /**** --- Transaction config variables. --- ****/ |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Enable transaction coordinator in broker" |
| ) |
| private boolean transactionCoordinatorEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Class name for transaction metadata store provider" |
| ) |
| private String transactionMetadataStoreProviderClassName = |
| "org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider"; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Class name for transaction buffer provider" |
| ) |
| private String transactionBufferProviderClassName = |
| "org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider"; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Class name for transaction pending ack store provider" |
| ) |
| private String transactionPendingAckStoreProviderClassName = |
| "org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider"; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Number of threads to use for pulsar transaction replay PendingAckStore or TransactionBuffer." |
| + "Default is 5" |
| ) |
| private int numTransactionReplayThreadPoolSize = Runtime.getRuntime().availableProcessors(); |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Transaction buffer take snapshot transaction count" |
| + "If transaction buffer enables snapshot segment, transaction buffer updates snapshot metadata" |
| + "after the number of transaction operations reaches this value." |
| ) |
| private int transactionBufferSnapshotMaxTransactionCount = 1000; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "The interval time for transaction buffer to take snapshots." |
| + "If transaction buffer enables snapshot segment, " |
| + "it is the interval time for transaction buffer to update snapshot metadata." |
| ) |
| private int transactionBufferSnapshotMinTimeInMillis = 5000; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Transaction buffer stores the transaction ID of aborted transactions and takes snapshots." |
| + "This configuration determines the size of the snapshot segment. " |
| + "The default value is 256 KB (262144 bytes)." |
| ) |
| private int transactionBufferSnapshotSegmentSize = 262144; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "Whether to enable segmented transaction buffer snapshot " |
| + "to handle a large number of aborted transactions." |
| ) |
| private boolean transactionBufferSegmentedSnapshotEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "The max concurrent requests for transaction buffer client." |
| ) |
| private int transactionBufferClientMaxConcurrentRequests = 1000; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "The transaction buffer client's operation timeout in milliseconds." |
| ) |
| private long transactionBufferClientOperationTimeoutInMills = 3000L; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "The max active transactions per transaction coordinator, default value 0 indicates no limit." |
| ) |
| private long maxActiveTransactionsPerCoordinator = 0L; |
| |
| @FieldContext( |
| category = CATEGORY_TRANSACTION, |
| doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`," |
| + "it store the position in pendingAckStore as value and save a position used to determine" |
| + "whether the previous data can be cleaned up as a key." |
| + "transactionPendingAckLogIndexMinLag is used to configure the minimum lag between indexes" |
| ) |
| private long transactionPendingAckLogIndexMinLag = 500L; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Provide a mechanism allowing the Transaction Log Store to aggregate multiple records into a batched" |
| + " record and persist into a single BK entry. This will make Pulsar transactions work more" |
| + " efficiently, aka batched log. see: https://github.com/apache/pulsar/issues/15370. Default false" |
| ) |
| private boolean transactionLogBatchedWriteEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction log batch, this attribute means maximum log records count" |
| + " in a batch, default 512." |
| ) |
| private int transactionLogBatchedWriteMaxRecords = 512; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction log batch, this attribute means bytes size in a" |
| + " batch, default 4m." |
| ) |
| private int transactionLogBatchedWriteMaxSize = 1024 * 1024 * 4; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction log batch, this attribute means maximum wait time(in millis)" |
| + " for the first record in a batch, default 1 millisecond." |
| ) |
| private int transactionLogBatchedWriteMaxDelayInMillis = 1; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| dynamic = true, |
| doc = "Provide a mechanism allowing the transaction pending ack Log Store to aggregate multiple records" |
| + " into a batched record and persist into a single BK entry. This will make Pulsar transactions" |
| + " work more efficiently, aka batched log. see: https://github.com/apache/pulsar/issues/15370." |
| + " Default false." |
| ) |
| private boolean transactionPendingAckBatchedWriteEnabled = false; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction log batch, this attribute means maximum log records count" |
| + " in a batch, default 512." |
| ) |
| private int transactionPendingAckBatchedWriteMaxRecords = 512; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction pending ack log batch, this attribute means bytes size in" |
| + " a batch, default 4m." |
| ) |
| private int transactionPendingAckBatchedWriteMaxSize = 1024 * 1024 * 4; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "If enabled the feature that transaction pending ack log batch, this attribute means maximum wait" |
| + " time(in millis) for the first record in a batch, default 1 millisecond." |
| ) |
| private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1; |
| |
| @FieldContext( |
| category = CATEGORY_SERVER, |
| doc = "The class name of the factory that implements the topic compaction service." |
| ) |
| private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; |
| |
| /**** --- KeyStore TLS config variables. --- ****/ |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Enable TLS with KeyStore type configuration in broker" |
| ) |
| private boolean tlsEnabledWithKeyStore = false; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Specify the TLS provider for the broker service: \n" |
| + "When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.\n" |
| + "When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc." |
| ) |
| private String tlsProvider = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore type configuration in broker: JKS, PKCS12" |
| ) |
| private String tlsKeyStoreType = "JKS"; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore path in broker" |
| ) |
| private String tlsKeyStore = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore password for broker" |
| ) |
| @ToString.Exclude |
| private String tlsKeyStorePassword = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore type configuration in broker: JKS, PKCS12" |
| ) |
| private String tlsTrustStoreType = "JKS"; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore path in broker" |
| ) |
| private String tlsTrustStore = null; |
| |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore password for broker, null means empty password." |
| ) |
| @ToString.Exclude |
| private String tlsTrustStorePassword = null; |
| |
| /**** --- Config variables used for internal client/admin to auth with other broker. --- ****/ |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| dynamic = true, |
| doc = "Authentication settings of the broker itself. \n\nUsed when the broker connects" |
| + " to other brokers, either in same or other clusters. " |
| + "Default uses plugin which disables authentication" |
| ) |
| private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled"; |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| dynamic = true, |
| doc = "Authentication parameters of the authentication plugin the broker is using to connect " |
| + "to other brokers" |
| ) |
| @ToString.Exclude |
| private String brokerClientAuthenticationParameters = ""; |
| @FieldContext( |
| category = CATEGORY_REPLICATION, |
| dynamic = true, |
| doc = "Enable TLS when talking with other brokers in the same cluster (admin operation) " |
| + "or different clusters (replication)" |
| ) |
| private boolean brokerClientTlsEnabled = false; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Whether internal client use KeyStore type to authenticate with other Pulsar brokers" |
| ) |
| private boolean brokerClientTlsEnabledWithKeyStore = false; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "The TLS Provider used by internal client to authenticate with other Pulsar brokers" |
| ) |
| private String brokerClientSslProvider = null; |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "TLS trusted certificate file for internal client, " |
| + "used by the internal client to authenticate with Pulsar brokers") |
| private String brokerClientTrustCertsFilePath = ""; |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "TLS private key file for internal client, " |
| + "used by the internal client to authenticate with Pulsar brokers") |
| private String brokerClientKeyFilePath = ""; |
| @FieldContext( |
| category = CATEGORY_AUTHENTICATION, |
| doc = "TLS certificate file for internal client, " |
| + "used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientCertificateFilePath = ""; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore type configuration for internal client: JKS, PKCS12 " |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientTlsTrustStoreType = "JKS"; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore path for internal client, " |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientTlsTrustStore = null; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS TrustStore password for internal client, " |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| @ToString.Exclude |
| private String brokerClientTlsTrustStorePassword = null; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore type configuration for internal client: JKS, PKCS12," |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientTlsKeyStoreType = "JKS"; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore path for internal client, " |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private String brokerClientTlsKeyStore = null; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "TLS KeyStore password for internal client, " |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| @ToString.Exclude |
| private String brokerClientTlsKeyStorePassword = null; |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Specify the tls cipher the internal client will use to negotiate during TLS Handshake" |
| + " (a comma-separated list of ciphers).\n\n" |
| + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n" |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private Set<String> brokerClientTlsCiphers = new TreeSet<>(); |
| @FieldContext( |
| category = CATEGORY_KEYSTORE_TLS, |
| doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake" |
| + " (a comma-separated list of protocol names).\n\n" |
| + "Examples:- [TLSv1.3, TLSv1.2] \n" |
| + " used by the internal client to authenticate with Pulsar brokers" |
| ) |
| private Set<String> brokerClientTlsProtocols = new TreeSet<>(); |
| |
| /* packages management service configurations (begin) */ |
| |
| @FieldContext( |
| category = CATEGORY_PACKAGES_MANAGEMENT, |
| doc = "Enable the packages management service or not" |
| ) |
| private boolean enablePackagesManagement = false; |
| |
| @FieldContext( |
| category = CATEGORY_PACKAGES_MANAGEMENT, |
| doc = "The packages management service storage service provider" |
| ) |
| private String packagesManagementStorageProvider = "org.apache.pulsar.packages.management.storage.bookkeeper" |
| + ".BookKeeperPackagesStorageProvider"; |
| |
| @FieldContext( |
| category = CATEGORY_PACKAGES_MANAGEMENT, |
| doc = "When the packages storage provider is bookkeeper, you can use this configuration to\n" |
| + "control the number of replicas for storing the package" |
| ) |
| private int packagesReplicas = 1; |
| |
| @FieldContext( |
| category = CATEGORY_PACKAGES_MANAGEMENT, |
| doc = "The bookkeeper ledger root path" |
| ) |
| private String packagesManagementLedgerRootPath = "/ledgers"; |
| |
| /* packages management service configurations (end) */ |
| |
| @FieldContext( |
| category = CATEGORY_PLUGIN, |
| doc = "The directory to locate broker additional servlet" |
| ) |
| private String additionalServletDirectory = "./brokerAdditionalServlet"; |
| |
| @FieldContext( |
| category = CATEGORY_PLUGIN, |
| doc = "List of broker additional servlet to load, which is a list of broker additional servlet names" |
| ) |
| private Set<String> additionalServlets = new TreeSet<>(); |
| |
| /** |
| * @deprecated Use {@link #getSubscriptionTypesEnabled()} instead |
| */ |
| @Deprecated |
| public boolean isSubscriptionKeySharedEnable() { |
| return subscriptionKeySharedEnable && subscriptionTypesEnabled.contains("Key_Shared"); |
| } |
| |
| public String getMetadataStoreUrl() { |
| if (StringUtils.isNotBlank(metadataStoreUrl)) { |
| return metadataStoreUrl; |
| } else if (StringUtils.isNotBlank(zookeeperServers)) { |
| // Fallback to old setting |
| return ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers; |
| } else { |
| return ""; |
| } |
| } |
| |
| /** |
| * Tells whether the selected metadata store implementation is based on ZooKeeper. |
| */ |
| public boolean isMetadataStoreBackedByZookeeper() { |
| return MetadataStoreFactory.isBasedOnZookeeper(getMetadataStoreUrl()); |
| } |
| |
| public String getConfigurationMetadataStoreUrl() { |
| if (StringUtils.isNotBlank(configurationMetadataStoreUrl)) { |
| return configurationMetadataStoreUrl; |
| } else if (StringUtils.isNotBlank(configurationStoreServers)) { |
| return configurationStoreServers; |
| } else if (StringUtils.isNotBlank(globalZookeeperServers)) { |
| return globalZookeeperServers; |
| } else { |
| // Fallback to local zookeeper |
| return getMetadataStoreUrl(); |
| } |
| } |
| |
| public boolean isConfigurationStoreSeparated() { |
| return !Objects.equals(getConfigurationMetadataStoreUrl(), getMetadataStoreUrl()); |
| } |
| |
| public boolean isBookkeeperMetadataStoreSeparated() { |
| return StringUtils.isNotBlank(bookkeeperMetadataServiceUri); |
| } |
| |
| public String getBookkeeperMetadataStoreUrl() { |
| if (isBookkeeperMetadataStoreSeparated()) { |
| return bookkeeperMetadataServiceUri; |
| } else { |
| // Fallback to same metadata service used by broker, adding the "metadata-store" to specify the BK |
| // metadata adapter |
| // Note: chroot is not settable by using 'zookeeperServers' config. |
| return "metadata-store:" + getMetadataStoreUrl() + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; |
| } |
| } |
| |
| public Object getProperty(String key) { |
| return properties.get(key); |
| } |
| |
| @Override |
| public Properties getProperties() { |
| return properties; |
| } |
| |
| @Override |
| public void setProperties(Properties properties) { |
| this.properties = properties; |
| } |
| |
| public boolean isDefaultTopicTypePartitioned() { |
| return TopicType.PARTITIONED.equals(allowAutoTopicCreationType); |
| } |
| |
| public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() { |
| if (brokerDeleteInactiveTopicsMaxInactiveDurationSeconds == null) { |
| return brokerDeleteInactiveTopicsFrequencySeconds; |
| } else { |
| return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds; |
| } |
| } |
| |
| public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() { |
| if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { |
| return SchemaCompatibilityStrategy.FULL; |
| } |
| return schemaCompatibilityStrategy; |
| } |
| |
| public int getManagedLedgerMaxUnackedRangesToPersistInMetadataStore() { |
| return managedLedgerMaxUnackedRangesToPersistInZooKeeper > 0 |
| ? managedLedgerMaxUnackedRangesToPersistInZooKeeper : |
| managedLedgerMaxUnackedRangesToPersistInMetadataStore; |
| } |
| |
| public long getMetadataStoreSessionTimeoutMillis() { |
| return zooKeeperSessionTimeoutMillis > 0 ? zooKeeperSessionTimeoutMillis : metadataStoreSessionTimeoutMillis; |
| } |
| |
| public int getMetadataStoreOperationTimeoutSeconds() { |
| return zooKeeperOperationTimeoutSeconds > 0 ? zooKeeperOperationTimeoutSeconds |
| : metadataStoreOperationTimeoutSeconds; |
| } |
| |
| public int getMetadataStoreCacheExpirySeconds() { |
| return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds; |
| } |
| |
| public boolean isMetadataStoreAllowReadOnlyOperations() { |
| return zooKeeperAllowReadOnlyOperations || metadataStoreAllowReadOnlyOperations; |
| } |
| |
| public long getManagedLedgerCacheEvictionIntervalMs() { |
| return managedLedgerCacheEvictionFrequency > 0 |
| ? (long) (1000 / Math.max( |
| Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY), |
| MIN_ML_CACHE_EVICTION_FREQUENCY)) |
| : Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs); |
| } |
| |
| public int getTopicOrderedExecutorThreadNum() { |
| return numWorkerThreadsForNonPersistentTopic > 0 |
| ? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum; |
| } |
| |
| public boolean isSystemTopicAndTopicLevelPoliciesEnabled() { |
| return topicLevelPoliciesEnabled && systemTopicEnabled; |
| } |
| } |